Source code for penaltyblog.matchflow.executor

from typing import Any, Dict, List

from .steps import group, source, transform

PlanNode = Dict[str, Any]


[docs] def is_materializing_op(op_name: str) -> bool: """ Returns True if the operation is expected to materialize or buffer records in memory (i.e., requires full dataset to proceed). """ return op_name in { "sort", "limit", "dropna", "distinct", "cache", "summary", "group_by", "group_summary", "group_cumulative", "pivot", "schema", }
[docs] class FlowExecutor: def __init__(self, plan: List[PlanNode]): self.plan = plan
[docs] def execute(self): from .flow import Flow gen = source.dispatch(self.plan[0]) i = 1 while i < len(self.plan): step = self.plan[i] op = step["op"] if op == "map": gen = transform.apply_map(gen, step) elif op == "assign": gen = transform.apply_assign(gen, step) elif op == "filter": gen = transform.apply_filter(gen, step) elif op == "select": gen = transform.apply_select(gen, step) elif op == "rename": gen = transform.apply_rename(gen, step) elif op == "group_by": gen = group.apply_group_by(gen, step) elif op == "group_summary": gen = group.apply_group_summary(gen, step) elif op == "group_cumulative": gen = group.apply_group_cumulative(gen, step) elif op == "group_rolling_summary": gen = group.apply_group_rolling_summary(gen, step) elif op == "group_time_bucket": gen = group.apply_group_time_bucket(gen, step) elif op == "summary": gen = transform.apply_summary(gen, step) elif op == "sort": gen = transform.apply_sort(gen, step) elif op == "limit": gen = transform.apply_limit(gen, step) elif op == "drop": gen = transform.apply_drop(gen, step) elif op == "dropna": gen = transform.apply_dropna(gen, step) elif op == "explode": gen = transform.apply_explode(gen, step) elif op == "flatten": gen = transform.apply_flatten(gen, step) elif op == "distinct": gen = transform.apply_distinct(gen, step) elif op == "join": gen = transform.apply_join(gen, step) elif op == "split_array": gen = transform.apply_split_array(gen, step) elif op == "pivot": gen = transform.apply_pivot(gen, step) elif op == "sample_fraction": gen = transform.apply_sample_fraction(gen, step) elif op == "sample_n": gen = transform.apply_sample_n(gen, step) elif op == "pipe": func = step["func"] flow = func(Flow(self.plan[:i])) return FlowExecutor(flow.plan).execute() elif op == "fused": gen = transform.apply_fused(gen, step) elif op == "from_materialized": gen = iter(step["records"]) elif op == "from_concat": gen = source.from_concat(step) else: raise ValueError(f"Unknown plan op: {op}") i += 1 return gen