Source code for penaltyblog.matchflow.group

from typing import Any, Callable, Dict, List, Literal, Optional, Union

from .aggs_registry import resolve_aggregator
from .executor import FlowExecutor
from .flow import Flow
from .helpers import explain_plan
from .optimizer import FlowOptimizer
from .plotting import plot_flow_plan

PlanNode = Dict[str, Any]


[docs] class FlowGroup: """ A FlowGroup is a collection of records grouped by one or more keys. """ def __init__(self, plan: List[PlanNode], optimize: bool = False): """ Args: plan (List[PlanNode]): The plan to execute. optimize (bool, optional): Whether to optimize the plan. Defaults to False. """ self.plan = plan self.optimize = optimize def __iter__(self): """ Iterate over the groups. """ plan_to_execute = ( FlowOptimizer(self.plan).optimize() if self.optimize else self.plan ) for group in FlowExecutor(plan_to_execute).execute(): key = group.get("__group_key__") records = group.get("__group_records__") yield (key if isinstance(key, tuple) and len(key) == 1 else key), records
[docs] def summary(self, aggregators: Union[Callable, Dict[str, Any]]) -> Flow: """ Apply group-summary without eagerly optimizing previous steps. Args: aggregators (Union[Callable, Dict[str, Any]]): The aggregators to apply. Returns: Flow: A new Flow with the group-summary applied. """ if callable(aggregators): agg_func = aggregators elif isinstance(aggregators, dict): def agg_func(rows): return { alias: resolve_aggregator(value, alias)(rows) for alias, value in aggregators.items() } else: raise TypeError("summary() requires a callable or dict") # Find original group_by keys group_keys = None for step in reversed(self.plan): if step.get("op") == "group_by": group_keys = step.get("keys") break # Build new Flow with raw plan + summary, carry optimize flag return Flow( plan=self.plan + [ { "op": "group_summary", "agg": agg_func, "group_keys": group_keys, } ], optimize=self.optimize, )
[docs] def sort_by(self, *keys: str, ascending: bool = True) -> "FlowGroup": """ Sort the groups by one or more fields. Args: *keys (str): Field names to sort by. ascending (bool or list[bool], optional): Sort order(s). Either a single bool applied to all keys or a list of bools (one per key). Returns: FlowGroup: A new FlowGroup with sorted groups. """ return FlowGroup( plan=self.plan + [ { "op": "sort", "keys": list(keys), "ascending": ( [ascending] * len(keys) if isinstance(ascending, bool) else ascending ), } ], optimize=self.optimize, )
[docs] def cumulative(self, field: str, alias: Optional[str] = None) -> Flow: """ Apply group-cumulative without eagerly optimizing previous steps. Args: field (str): The field to cumulative. alias (Optional[str], optional): The alias for the cumulative field. Defaults to None. Returns: Flow: A new Flow with the group-cumulative applied. """ return Flow( plan=self.plan + [ { "op": "group_cumulative", "field": field, "alias": alias or f"cumulative_{field}", } ], optimize=self.optimize, )
[docs] def rolling_summary( self, window, aggregators, time_field=None, min_periods=1, step=None ): """ Lazily apply a rolling summary within each group in a FlowGroup. This appends a 'group_rolling_summary' step to the plan and returns a new Flow. Args: window (int | str): Number of records or time window (e.g., '5m'). aggregators (dict): Mapping of output field -> (agg_func_name, field). time_field (str | None): Field used for time-based windows. min_periods (int): Minimum rows needed to compute result. step (int | None): Step size for window movement. Returns: Flow: A new Flow with a group_rolling_summary step added. """ group_keys = None for plan_step in reversed(self.plan): if plan_step.get("op") == "group_by": group_keys = plan_step.get("keys") break return Flow( plan=self.plan + [ { "op": "group_rolling_summary", "window": window, "aggregators": aggregators, "time_field": time_field, "min_periods": min_periods, "step": step, "__group_keys": group_keys, } ], optimize=self.optimize, )
[docs] def time_bucket( self, freq: str, aggregators: dict[str, Any], time_field: str, label: Literal["left", "right"] = "left", bucket_name: str = "bucket", ) -> Flow: """ Create fixed (non-overlapping) time buckets of size `freq` (e.g. "5m") and aggregate each bucket. Args: freq (str): e.g. "5m", "10m", "1h", "30s". Must end with 's','m','h', or 'd'. aggregators (dict): mapping output_field -> (agg_name_or_callable, input_field) time_field (str): dot-path to a field that is either a `timedelta` or a `datetime` in each record. label (str): "left" (default) → bucket labeled at the interval's start; "right" → label at interval's end. bucket_name (str): Name of the output field for the bucket label. Defaults to "bucket". Returns: Flow: a new Flow whose records are one row per bucket per group, with the bucket label and aggregated values. """ # Find the group_by keys (same logic as rolling_summary) group_keys = None for step in reversed(self.plan): if step.get("op") == "group_by": group_keys = step.get("keys") break return Flow( plan=self.plan + [ { "op": "group_time_bucket", "freq": freq, "aggregators": aggregators, "time_field": time_field, "label": label, "bucket_name": bucket_name, "__group_keys": group_keys or [], } ], optimize=self.optimize, )
[docs] def select(self, *fields: str) -> "FlowGroup": """ Select specific fields from each record. Args: *fields (str): The fields to select. Returns: FlowGroup: A new FlowGroup with selected fields. """ return FlowGroup( plan=self.plan + [{"op": "select", "fields": list(fields)}], optimize=self.optimize, )
[docs] def to_flow(self) -> Flow: """ Convert the FlowGroup to a Flow. Returns: Flow: A new Flow with the same plan and optimize flag. """ return Flow(plan=self.plan, optimize=self.optimize)
[docs] def collect(self) -> List[Dict[str, Any]]: """ Collect the FlowGroup into a list of records. Returns: List[Dict[str, Any]]: The collected records. """ return list( FlowExecutor(Flow(self.plan, optimize=self.optimize).plan).execute() )
[docs] def plot_plan(self, compare: bool = False): """ Visualize the flow group plan. Args: compare (bool): - True: show two subplots (raw vs. optimized). - False: show a single subplot. If this FlowGroup was constructed with optimize=True, show the optimized plan; otherwise the raw. """ plot_flow_plan( self.plan, optimize=self.optimize, compare=compare, title_prefix="Group: ", )
[docs] def explain(self, optimize: Optional[bool] = None, compare: bool = False): """ Explain the plan. Returns: None """ effective_opt = self.optimize if optimize is None else optimize raw = self.plan opt_plan = FlowOptimizer(raw).optimize() if effective_opt or compare else None explain_plan(raw, optimized_plan=opt_plan, compare=compare)