Source code for penaltyblog.matchflow.helpers

"""
Helpers for handling a streaming data pipeline, specifically the Flow class.
"""

__all__ = ["get_field", "resolve_path", "get_index"]

from typing import Any, Callable, Optional

from tabulate import tabulate

from .steps.utils import flatten_dict

# --- Accessors ---


[docs] def get_field( path: str, default: Any = None ) -> Callable[[Optional[dict[Any, Any]]], Any]: """ Safely access a nested field using dot notation. Args: path (str): Dot-separated path to a field, e.g. "player.name". default (Any, optional): Value to return if the path is invalid. Returns: Callable[[dict], Any]: A function that retrieves the field value or default. Example: >>> f = get_field("player.name") >>> f({"player": {"name": "Bukayo Saka"}}) # → "Bukayo Saka" """ keys = path.split(".") def accessor(d: Optional[dict[Any, Any]]) -> Any: for key in keys: if not isinstance(d, dict): return default d = d.get(key, default) return d return accessor
[docs] def resolve_path(record: dict, path: str, default=None): """ Safely access a nested field using dot notation. Args: record (dict): The record to resolve the path from. path (str): The path to resolve. default (Any, optional): Value to return if the path is invalid. Returns: Any: The resolved value or default if not found. Example: >>> resolve_path({"player": {"name": "Bukayo Saka"}}, "player.name") "Bukayo Saka" """ return get_field(path, default)(record)
[docs] def get_index( path: str, index: int, default: Any = None ) -> Callable[[Optional[dict[Any, Any]]], Any]: """ Safely access an index in a nested list using dot-separated path. Args: path (str): Dot-separated path to a list field, e.g. "pass.end_location". index (int): The index to extract from the list. default (Any, optional): Value to return if the path is invalid or index out of bounds. Returns: Callable[[dict], Any]: A function that retrieves the indexed value or default. Example: >>> f = get_index("pass.end_location", 0) >>> f({"pass": {"end_location": [100, 40]}}) # → 100 """ keys = path.split(".") def accessor(d: Optional[dict[Any, Any]]) -> Any: for key in keys: if not isinstance(d, dict): return default d = d.get(key) if d is None: return default if isinstance(d, list): try: return d[index] except IndexError: return default return default return accessor
# --- Predicates --- def where_equals(path: str, value: Any) -> Callable[[dict], bool]: """ Create a predicate that checks if a nested field equals a given value. Args: path (str): Dot-separated path to a field, e.g. "player.name". value (Any): The value to compare against. Returns: Callable[[dict], bool]: A function that returns True if the field equals the value. Example: >>> f = where_equals("player.name", "Bukayo Saka") >>> f({"player": {"name": "Bukayo Saka"}}) # → True """ accessor = get_field(path) def predicate(record: dict) -> bool: return accessor(record) == value return predicate def where_in(path: str, values: set | list | tuple) -> Callable[[dict], bool]: """ Create a predicate that checks if a nested field is in a list of values. Args: path (str): Dot-separated path to a field, e.g. "player.name". values (set | list | tuple): The values to check against. Returns: Callable[[dict], bool]: A function that returns True if the field is in the values. Example: >>> f = where_in("player.name", {"Bukayo Saka", "Mohamed Salah"}) >>> f({"player": {"name": "Bukayo Saka"}}) # → True """ accessor = get_field(path) values = set(values) def predicate(record: dict) -> bool: return accessor(record) in values return predicate def where_exists(path: str) -> Callable[[dict], bool]: """ Check if a nested field exists and is not None. Args: path (str): Dot-separated path to a field, e.g. "player.name". Returns: Callable[[dict], bool]: A function that returns True if the field exists and is not None. Example: >>> f = where_exists("player.name") >>> f({"player": {"name": "Bukayo Saka"}}) # → True """ accessor = get_field(path) def predicate(record: dict) -> bool: return accessor(record) is not None return predicate def where_not_none(path: str) -> Callable[[dict], bool]: """ Check if a nested field exists and is not None (alias of where_exists). Args: path (str): Dot-separated path to a field, e.g. "player.name". Returns: Callable[[dict], bool]: A function that returns True if the field exists and is not None. Example: >>> f = where_not_none("player.name") >>> f({"player": {"name": "Bukayo Saka"}}) # → True """ return where_exists(path) # --- Transformers --- def combine_fields(out_field: str, *paths: str, join_str: str = " ") -> Callable: """ Combine multiple fields into a single field. Args: out_field (str): The name of the output field. *paths (str): The paths to the fields to combine. join_str (str, optional): The string to join the fields with. Defaults to " ". Returns: Callable: A function that takes a record and returns a new record with the combined fields. Example: >>> f = combine_fields("full_name", "first_name", "last_name") >>> f({"first_name": "Bukayo", "last_name": "Saka"}) # → {"full_name": "Bukayo Saka"} """ def transformer(d): parts = [str(get_field(p)(d) or "") for p in paths] return {out_field: join_str.join(parts)} return transformer def coalesce(*paths: str, default=None) -> Callable: """ Return the first non-None value from a list of paths. Args: *paths (str): The paths to the fields to check. default (Any, optional): The default value to return if all paths are None. Defaults to None. Returns: Callable: A function that takes a record and returns the first non-None value from the paths. Example: >>> f = coalesce("player.name", "player.alias", default="Unknown") >>> f({"player": {"name": "Bukayo Saka"}}) # → "Bukayo Saka" """ def fn(d): for path in paths: val = get_field(path)(d) if val is not None: return val return default return fn def set_path(record: dict, path: str, value: Any): """ Set a nested field using dot notation. Creates intermediate dicts. Args: record (dict): The record to set the path on. path (str): The path to set. value (Any): The value to set. """ keys = path.split(".") current = record # walk/create intermediate dicts for k in keys[:-1]: if not isinstance(current.get(k), dict): current[k] = {} current = current[k] current[keys[-1]] = value def delete_path(record: dict, path: str): """ Delete a nested field using dot notation. Silently does nothing if path doesn't exist. Args: record (dict): The record to delete the path from. path (str): The path to delete. """ keys = path.split(".") current: Any = record for k in keys[:-1]: if not isinstance(current, dict): return current = current.get(k) if isinstance(current, dict): current.pop(keys[-1], None) def explain_plan( raw_plan: list[dict], optimized_plan: Optional[list[dict]] = None, compare: bool = False, indent: int = 2, ): """ Print a plan. If compare=True and optimized_plan is given, show both raw and optimized side by side. """ def _print(title: str, plan: list[dict]): print(title) for i, step in enumerate(plan, 1): op = step["op"] # strip out internal keys details = {k: v for k, v in step.items() if k not in ("op", "_notes")} notes = step.get("_notes", []) line = f"{i:>2}. {op:<15} {details}" if notes: line += " // " + "; ".join(notes) print(" " * indent + line) if compare and optimized_plan is not None: _print("=== Pre-optimization plan ===", raw_plan) print() _print("=== Post-optimization plan ===", optimized_plan) else: title = "Optimized Plan" if optimized_plan is not None else "Plan" plan = optimized_plan or raw_plan _print(f"=== {title} ===", plan) def show_tabular(sample: list[dict]) -> None: """ Show a tabular representation of a sample of records. Args: sample (list[dict]): A sample of records to show. """ # flatten each record to a flat dict flat = [flatten_dict(r) for r in sample] if not flat: print("<no rows>") return # collect columns in sorted order cols = sorted({k for row in flat for k in row}) # build rows rows = [[row.get(c, "") for c in cols] for row in flat] print(tabulate(rows, headers=cols, tablefmt="github"))