Back to Catalog
Data Science
Real-time
Real-time Aggregation
Maintains continuously updated aggregations over streaming data for instant insights.
Intent & Description
📋 Context
Pre-computing aggregations on streaming data enables instant queries without scanning the entire dataset. Windowing functions allow temporal aggregations.
Real-world Use Case
Live dashboards, real-time metrics, monitoring systems, and applications requiring instant aggregation results.
Source
Advantages
- Instant query response
- Reduced compute at query time
- Always current data
- Efficient for repeated queries
Disadvantages
- State management complexity
- Memory requirements
- Late data handling
- Resource intensive
Implementation Example
# Real-time Aggregation Pattern from collections import defaultdict
class RealTimeAggregator: def __init__(self): self.counters = defaultdict(int)
def process_event(self, event): key = event["category"] value = event["value"] self.counters[key] += value return self.counters[key]
def get_aggregation(self, key): return self.counters[key]
aggregator = RealTimeAggregator()
# Process streaming events aggregator.process_event({"category": "sales", "value": 100}) aggregator.process_event({"category": "sales", "value": 50})