MatchFlow#
- class penaltyblog.matchflow.Flow(plan: List[Dict[str, Any]] | None = None, optimize: bool = False)[source]#
Bases:
object- assign(**fields: Callable[[dict], Any]) Flow[source]#
Assign new fields to each record. :param **fields: The fields to assign. :type **fields: dict[str, Callable[[dict], Any]]
- Returns:
A new Flow with assigned fields.
- Return type:
- cache() Flow[source]#
Cache the records in memory.
- Returns:
A new Flow with the records cached.
- Return type:
- cast(**casts: type | Callable[[Any], Any]) Flow[source]#
Cast fields to specified types or functions.
- Parameters:
**casts (type or Callable[[Any], Any]) – The types or functions to cast to.
- Returns:
A new Flow with the fields cast.
- Return type:
- collect(optimize: bool | None = None, progress: Literal['output', 'input'] | None = None, total_records: int | None = None) list[source]#
Collect all records from the flow.
- Parameters:
optimize (bool) – Whether to optimize the plan before execution.
progress (Optional[Literal["output", "input"]]) – Whether to show progress.
total_records (Optional[int]) – Total number of rows to expect.
- Returns:
A list of records.
- Return type:
list
- count() int[source]#
Count the number of records in the flow.
- Returns:
The number of records.
- Return type:
int
- distinct(*keys: str, keep: str = 'first') Flow[source]#
Remove duplicate records.
- Parameters:
*keys (str) – Optional field names to determine uniqueness.
keep (str) – ‘first’ (default) or ‘last’ to control which duplicate is retained.
- Returns:
A new Flow with duplicates removed.
- Return type:
- drop(*keys: str) Flow[source]#
Drop one or more fields from each record. Supports dot notation for nested fields.
- Parameters:
*keys (str) – Field names to remove.
- Returns:
A new Flow with fields removed.
- Return type:
- dropna(*fields: str) Flow[source]#
Drop records where any of the specified fields are None or missing. If no fields are given, drops records where any top-level value is None.
- Parameters:
*fields (str) – Optional field paths (dot notation) to check for None.
- Returns:
A new Flow with records containing nulls removed.
- Return type:
- explain(optimize: bool | None = None, compare: bool = False)[source]#
Print a readable version of the plan.
- Parameters:
optimize (bool) – Whether to show the optimized plan (default True).
compare (bool) – If True, show both pre- and post-optimization plans.
- explode(*fields: str) Flow[source]#
Explode one or more list fields into multiple records (in sync). All fields must be lists of the same length in each record.
- Parameters:
*fields (str) – One or more field names (dot notation allowed).
- Returns:
A new Flow with records exploded along the given fields.
- Return type:
- flatten() Flow[source]#
Flatten nested dictionaries into a single-level dictionary using dot notation.
- Returns:
A new Flow with flattened records.
- Return type:
- static from_folder(path: str, optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#
Create a Flow from a folder of records. :param path: The path to the folder containing the records. :type path: str :param optimize: Whether to optimize the flow. :type optimize: bool :param storage_options: Additional options for cloud storage backends.
For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- Returns:
A new Flow pointing to the records.
- Return type:
- static from_glob(pattern: str, optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#
Create a Flow from a glob pattern.
- Parameters:
optimize (bool) – Whether to optimize the flow.
storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- Returns:
A new Flow streaming matching files.
- Return type:
- static from_json(path: str, optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#
Lazily load a list of records from a JSON file.
- Parameters:
path (str) – The path to the JSON file.
optimize (bool) – Whether to optimize the flow.
storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- Returns:
A new Flow pointing to the records.
- Return type:
- static from_jsonl(path: str, optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#
Lazily load records from a JSONL file.
- Parameters:
path (str) – The path to the JSONL file.
optimize (bool) – Whether to optimize the flow.
storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- Returns:
A new Flow pointing to the records.
- Return type:
- static from_list(records: List[dict], optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#
Create a Flow from a list of records. :param records: The list of records to create a Flow from. :type records: List[dict] :param optimize: Whether to optimize the flow. :type optimize: bool :param storage_options: Additional options for cloud storage backends.
For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- Returns:
A new Flow with the records.
- Return type:
- static from_records(records: List[dict], optimize: bool = False) Flow[source]#
Create a Flow from a list of records. :param records: The list of records to create a Flow from. :type records: List[dict]
- Returns:
A new Flow with the records.
- Return type:
- get_url() str[source]#
For a Flow created from an API source (e.g. Opta), construct and return the URL that will be called. This is a debugging utility and may not work for all flows.
- group_by(*keys: str) FlowGroup[source]#
Group records by one or more fields.
- Parameters:
*keys (str) – Field names to group by.
- Returns:
A new FlowGroup with grouped records.
- Return type:
- grouped(key: str)[source]#
Group records by a single field.
- Parameters:
key (str) – The field to group by.
- Returns:
The groups.
- Return type:
Iterator[Tuple[Any, List[Dict[str, Any]]]]
- head(n=5) list[source]#
Runs the flow and returns the first n records.
- Parameters:
n (int) – The number of records to return.
- Returns:
A list of the first n records.
- Return type:
list
- is_empty() bool[source]#
Check if the flow yields any records without fully collecting it.
- Returns:
True if there are no records, False otherwise.
- Return type:
bool
- join(other: Flow, on: str | List[str] | None = None, left_on: str | List[str] | None = None, right_on: str | List[str] | None = None, how: Literal['left', 'right', 'outer', 'inner', 'anti'] = 'left', lsuffix: str = '', rsuffix: str = '_right', type_coercion: Literal['strict', 'auto', 'string'] = 'strict') Flow[source]#
Join with another Flow.
- Parameters:
other (Flow) – The right-hand flow to join with.
on (str, list[str], or None) – Field(s) to join on when key names are the same.
left_on (str, list[str], or None) – Left-side field(s) to join on.
right_on (str, list[str], or None) – Right-side field(s) to join on.
how (str) – Type of join - ‘left’, ‘right’, ‘outer’, ‘inner’, or ‘anti’.
lsuffix (str) – Suffix for conflicting left-side keys.
rsuffix (str) – Suffix for conflicting right-side keys.
type_coercion (str) – How to handle type differences in join keys: - ‘strict’: Exact type matching (default, preserves current behavior) - ‘auto’: Smart coercion (1 matches ‘1’ matches 1.0) - ‘string’: Convert all join keys to strings for comparison
- Returns:
A new Flow representing the join.
- Return type:
- keys(limit: int = 100) set[str][source]#
Infer the schema of the flow.
- Parameters:
limit (int) – Number of records to sample for schema inference.
- Returns:
The set of keys.
- Return type:
set[str]
- limit(n: int) Flow[source]#
Limit the number of records returned.
- Parameters:
n (int) – The maximum number of records.
- Returns:
A new Flow that yields up to n records.
- Return type:
- map(func: Callable[[dict], dict]) Flow[source]#
Apply a function to each record. Should return a full record. If the function returns None, the record is dropped.
- Parameters:
func (Callable[[dict], dict]) – A function that takes a record and returns a modified one.
- Returns:
A new Flow with the transformed records.
- Return type:
- opta = <penaltyblog.matchflow.contrib.opta.Opta object>#
- pipe(func: Callable[[Flow], Flow]) Flow[source]#
Lazily apply a function to this Flow and return the resulting Flow. The function will be executed at collect-time, not immediately.
The function should return a new Flow, typically using this one as input.
- pivot(index: str | list[str], columns: str, values: str) Flow[source]#
Pivot records: turn row values into columns.
- Parameters:
index (str or list[str]) – Fields to group by.
columns (str) – Field whose values become column names.
values (str) – Field whose values fill the new columns.
- Returns:
A new Flow with records pivoted into wide format.
- Return type:
- plot_plan(compare: bool = False)[source]#
Visualize the flow plan.
- Parameters:
compare (bool) –
True: show two subplots (raw vs. optimized).
False: show a single subplot. If this Flow was constructed with optimize=True, show the optimized plan; otherwise the raw.
- profile(optimize: bool | None = None, fmt: Literal['table', 'records'] = 'table')[source]#
Profile each step in the plan. Returns a report of (step_index, op_name, time_s, rows_emitted).
- Parameters:
optimize – whether to optimize the plan (default = self.optimize)
fmt – ‘table’ to print a table, ‘records’ to return the raw list of dicts.
- query(expr: str)[source]#
Filter rows using query string
- Parameters:
expr (str) – Query string
- Returns:
A new Flow with the filtered records.
- Return type:
Examples
# Basic comparisons flow.query(“age > 30 and name == ‘Phil Foden’”)
# Using variables with @ player = “Mohamed Salah” flow.query(“type.name == ‘Shot’ and player.name == @player”)
# Date filtering flow.query(“date > datetime(2024, 1, 1)”) cutoff_date = datetime(2024, 6, 15) flow.query(“match_date >= @cutoff_date”)
# String operations flow.query(“name.contains(‘son’) and status == ‘active’”)
# Regular expression matching flow.query(“name.regex(‘^[A-Z][a-z]+$’)”) # Names starting with capital letter flow.query(“name.match(’d{4}’, 0)”) # Contains 4 digits in a row
- rename(**mapping: str)[source]#
Rename keys in each record according to mapping of old=new. :param mapping: The mapping of old keys to new keys. :type mapping: dict[str, str]
- Returns:
A new Flow with renamed keys.
- Return type:
- sample_fraction(p: float, seed: int | None = None) Flow[source]#
Lazily sample a fraction of records.
- Parameters:
p (float) – Sampling probability (0 < p < 1).
seed (int, optional) – Random seed.
- Returns:
A new Flow with sampling applied.
- Return type:
- sample_n(n: int, seed: int | None = None) Flow[source]#
Lazily sample n records using reservoir sampling.
- Parameters:
n (int) – Number of records to sample.
seed (int, optional) – Random seed.
- Returns:
A new Flow with sampling applied.
- Return type:
- schema(n=100) dict[str, type][source]#
Infer the schema of the flow.
- Parameters:
n (int) – Number of records to sample for schema inference.
- Returns:
The inferred schema.
- Return type:
dict[str, type]
- select(*fields: str) Flow[source]#
Select specific fields from each record. :param *fields: The fields to select. :type *fields: str
- Returns:
A new Flow with selected fields.
- Return type:
- show(n: int = 5, format: Literal['table', 'record'] = 'table')[source]#
Print the first n records in a pretty format.
- Parameters:
n (int) – Number of records to show.
format (Literal["table", "record"]) – Format to use for display.
- sort_by(*keys: str, ascending: bool | list[bool] = True) Flow[source]#
Sort records by one or more fields.
- Parameters:
*keys (str) – Field names to sort by.
ascending (bool or list[bool], optional) – Sort order(s). Either a single bool applied to all keys or a list of bools (one per key).
- Returns:
A new Flow with sorted records.
- Return type:
- split_array(field: str, into: list[str]) Flow[source]#
Split an array field into separate fields by index.
- Parameters:
field (str) – The field containing a list/array.
into (list[str]) – The names of the output fields.
- Returns:
A new Flow with the array split into separate fields.
- Return type:
- statsbomb = <penaltyblog.matchflow.contrib.statsbomb.StatsBomb object>#
- summary(aggregators: Callable | dict[str, Any]) Flow[source]#
Supports: - Callable (e.g. lambda rows: {…}) - Dict of {alias: callable} - Dict of {alias: “name”} or (name/callable, field)
- to_json(path: str, indent=4, storage_options: Dict[str, Any] | None = None)[source]#
Write the flow to a JSON file (as a list of records).
- Parameters:
path (str) – The path to the JSON file.
indent (int, optional) – The number of spaces to use for indentation.
storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- to_jsonl(path: str, storage_options: Dict[str, Any] | None = None)[source]#
Write the flow to a JSONL file (one record per line).
- Parameters:
path (str) – The path to the JSONL file.
storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- to_pandas() DataFrame[source]#
Collect the flow into a pandas DataFrame.
- Returns:
The collected records as a DataFrame.
- Return type:
pd.DataFrame
- with_schema(schema: dict[str, type | Callable[[Any], Any]], strict: bool = False, drop_extra: bool = False) Flow[source]#
Cast fields to specified types/functions and optionally validate or prune fields.
- Parameters:
schema (dict) – Mapping of field names (dot paths allowed) to types or casting functions.
strict (bool) – If True, raise an error on cast failure. Otherwise, fallback to original value.
drop_extra (bool) – If True, only retain fields explicitly listed in the schema.
- Returns:
A new Flow with schema enforcement applied.
- Return type:
StatsBomb integration for Flow. Provides lazy-loading access to StatsBomb API via plan-based execution.
- class penaltyblog.matchflow.contrib.statsbomb.StatsBomb[source]#
Bases:
object- property DEFAULT_CREDS: dict#
Get default credentials from environment variables.
- competitions(optimize: bool = False) Flow[source]#
Return a Flow for competitions from StatsBomb.
- Parameters:
optimize (bool, optional) – Whether to optimize the plan (default: False).
- Returns:
Flow object for competitions resource.
- Return type:
- events(match_id: int, include_360_metrics=False, creds: dict | None = None, optimize: bool = False) Flow[source]#
Return a Flow for events from StatsBomb.
- Parameters:
match_id (int) – Match ID.
include_360_metrics (bool, optional) – Whether to include 360 metrics (default: False).
creds (dict, optional) – Credentials for StatsBomb API. Defaults to StatsBomb.DEFAULT_CREDS.
optimize (bool, optional) – Whether to optimize the plan (default: False).
- Returns:
Flow object for events resource.
- Return type:
- lineups(match_id: int, creds: dict | None = None, optimize: bool = False) Flow[source]#
Return a Flow for player lineups for a given match.
- Parameters:
match_id (int) – The StatsBomb match identifier for which to fetch lineups.
creds (dict, optional) – Optional credentials to use for the StatsBomb API. If omitted, StatsBomb.DEFAULT_CREDS will be used.
optimize (bool, optional) – If True, the returned Flow will have optimization enabled.
- Returns:
A Flow that will fetch the lineups when executed.
- Return type:
- matches(competition_id: int, season_id: int, creds: dict | None = None, optimize: bool = False) Flow[source]#
Return a Flow for matches from StatsBomb.
- Parameters:
competition_id (int) – Competition ID.
season_id (int) – Season ID.
creds (dict, optional) – Credentials for StatsBomb API. Defaults to StatsBomb.DEFAULT_CREDS.
optimize (bool, optional) – Whether to optimize the plan (default: False).
- Returns:
Flow object for matches resource.
- Return type:
- player_match_stats(match_id: int, creds: dict | None = None, optimize: bool = False) Flow[source]#
Return a Flow for per-player statistics for a single match.
- Parameters:
match_id (int) – The StatsBomb match identifier.
creds (dict, optional) – Credentials to authenticate with the StatsBomb API. Defaults to StatsBomb.DEFAULT_CREDS when not provided.
optimize (bool, optional) – If True, the returned Flow will be optimized.
- Returns:
A Flow that will fetch player match statistics when executed.
- Return type:
- player_season_stats(competition_id: int, season_id: int, creds: dict | None = None, optimize: bool = False) Flow[source]#
Return a Flow for per-player aggregated statistics for a season.
- Parameters:
competition_id (int) – StatsBomb competition identifier.
season_id (int) – StatsBomb season identifier.
creds (dict, optional) – Optional credentials for the StatsBomb API. Defaults to StatsBomb.DEFAULT_CREDS when omitted.
optimize (bool, optional) – If True, create the Flow with optimization enabled.
- Returns:
A Flow that will fetch player season statistics when executed.
- Return type:
- team_match_stats(match_id: int, creds: dict | None = None, optimize: bool = False) Flow[source]#
Return a Flow for team-level statistics for a single match.
- Parameters:
match_id (int) – The StatsBomb match identifier.
creds (dict, optional) – Credentials for StatsBomb; falls back to StatsBomb.DEFAULT_CREDS if not provided.
optimize (bool, optional) – Whether to optimize the constructed Flow.
- Returns:
A Flow that will fetch team match statistics when executed.
- Return type:
- team_season_stats(competition_id: int, season_id: int, creds: dict | None = None, optimize: bool = False) Flow[source]#
Return a Flow for team-level aggregated statistics for a season.
- Parameters:
competition_id (int) – StatsBomb competition identifier.
season_id (int) – StatsBomb season identifier.
creds (dict, optional) – Optional credentials to use with the StatsBomb API; defaults to StatsBomb.DEFAULT_CREDS when omitted.
optimize (bool, optional) – Whether to enable optimizations for the created Flow.
- Returns:
A Flow that will fetch team season statistics when executed.
- Return type:
MatchFlow API Documentation#
Flow class for handling a streaming data pipeline.
- class penaltyblog.matchflow.flow.Flow(plan: List[Dict[str, Any]] | None = None, optimize: bool = False)[source]#
Bases:
object- assign(**fields: Callable[[dict], Any]) Flow[source]#
Assign new fields to each record. :param **fields: The fields to assign. :type **fields: dict[str, Callable[[dict], Any]]
- Returns:
A new Flow with assigned fields.
- Return type:
- cache() Flow[source]#
Cache the records in memory.
- Returns:
A new Flow with the records cached.
- Return type:
- cast(**casts: type | Callable[[Any], Any]) Flow[source]#
Cast fields to specified types or functions.
- Parameters:
**casts (type or Callable[[Any], Any]) – The types or functions to cast to.
- Returns:
A new Flow with the fields cast.
- Return type:
- collect(optimize: bool | None = None, progress: Literal['output', 'input'] | None = None, total_records: int | None = None) list[source]#
Collect all records from the flow.
- Parameters:
optimize (bool) – Whether to optimize the plan before execution.
progress (Optional[Literal["output", "input"]]) – Whether to show progress.
total_records (Optional[int]) – Total number of rows to expect.
- Returns:
A list of records.
- Return type:
list
- count() int[source]#
Count the number of records in the flow.
- Returns:
The number of records.
- Return type:
int
- distinct(*keys: str, keep: str = 'first') Flow[source]#
Remove duplicate records.
- Parameters:
*keys (str) – Optional field names to determine uniqueness.
keep (str) – ‘first’ (default) or ‘last’ to control which duplicate is retained.
- Returns:
A new Flow with duplicates removed.
- Return type:
- drop(*keys: str) Flow[source]#
Drop one or more fields from each record. Supports dot notation for nested fields.
- Parameters:
*keys (str) – Field names to remove.
- Returns:
A new Flow with fields removed.
- Return type:
- dropna(*fields: str) Flow[source]#
Drop records where any of the specified fields are None or missing. If no fields are given, drops records where any top-level value is None.
- Parameters:
*fields (str) – Optional field paths (dot notation) to check for None.
- Returns:
A new Flow with records containing nulls removed.
- Return type:
- explain(optimize: bool | None = None, compare: bool = False)[source]#
Print a readable version of the plan.
- Parameters:
optimize (bool) – Whether to show the optimized plan (default True).
compare (bool) – If True, show both pre- and post-optimization plans.
- explode(*fields: str) Flow[source]#
Explode one or more list fields into multiple records (in sync). All fields must be lists of the same length in each record.
- Parameters:
*fields (str) – One or more field names (dot notation allowed).
- Returns:
A new Flow with records exploded along the given fields.
- Return type:
- flatten() Flow[source]#
Flatten nested dictionaries into a single-level dictionary using dot notation.
- Returns:
A new Flow with flattened records.
- Return type:
- static from_folder(path: str, optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#
Create a Flow from a folder of records. :param path: The path to the folder containing the records. :type path: str :param optimize: Whether to optimize the flow. :type optimize: bool :param storage_options: Additional options for cloud storage backends.
For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- Returns:
A new Flow pointing to the records.
- Return type:
- static from_glob(pattern: str, optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#
Create a Flow from a glob pattern.
- Parameters:
optimize (bool) – Whether to optimize the flow.
storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- Returns:
A new Flow streaming matching files.
- Return type:
- static from_json(path: str, optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#
Lazily load a list of records from a JSON file.
- Parameters:
path (str) – The path to the JSON file.
optimize (bool) – Whether to optimize the flow.
storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- Returns:
A new Flow pointing to the records.
- Return type:
- static from_jsonl(path: str, optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#
Lazily load records from a JSONL file.
- Parameters:
path (str) – The path to the JSONL file.
optimize (bool) – Whether to optimize the flow.
storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- Returns:
A new Flow pointing to the records.
- Return type:
- static from_list(records: List[dict], optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#
Create a Flow from a list of records. :param records: The list of records to create a Flow from. :type records: List[dict] :param optimize: Whether to optimize the flow. :type optimize: bool :param storage_options: Additional options for cloud storage backends.
For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- Returns:
A new Flow with the records.
- Return type:
- static from_records(records: List[dict], optimize: bool = False) Flow[source]#
Create a Flow from a list of records. :param records: The list of records to create a Flow from. :type records: List[dict]
- Returns:
A new Flow with the records.
- Return type:
- get_url() str[source]#
For a Flow created from an API source (e.g. Opta), construct and return the URL that will be called. This is a debugging utility and may not work for all flows.
- group_by(*keys: str) FlowGroup[source]#
Group records by one or more fields.
- Parameters:
*keys (str) – Field names to group by.
- Returns:
A new FlowGroup with grouped records.
- Return type:
- grouped(key: str)[source]#
Group records by a single field.
- Parameters:
key (str) – The field to group by.
- Returns:
The groups.
- Return type:
Iterator[Tuple[Any, List[Dict[str, Any]]]]
- head(n=5) list[source]#
Runs the flow and returns the first n records.
- Parameters:
n (int) – The number of records to return.
- Returns:
A list of the first n records.
- Return type:
list
- is_empty() bool[source]#
Check if the flow yields any records without fully collecting it.
- Returns:
True if there are no records, False otherwise.
- Return type:
bool
- join(other: Flow, on: str | List[str] | None = None, left_on: str | List[str] | None = None, right_on: str | List[str] | None = None, how: Literal['left', 'right', 'outer', 'inner', 'anti'] = 'left', lsuffix: str = '', rsuffix: str = '_right', type_coercion: Literal['strict', 'auto', 'string'] = 'strict') Flow[source]#
Join with another Flow.
- Parameters:
other (Flow) – The right-hand flow to join with.
on (str, list[str], or None) – Field(s) to join on when key names are the same.
left_on (str, list[str], or None) – Left-side field(s) to join on.
right_on (str, list[str], or None) – Right-side field(s) to join on.
how (str) – Type of join - ‘left’, ‘right’, ‘outer’, ‘inner’, or ‘anti’.
lsuffix (str) – Suffix for conflicting left-side keys.
rsuffix (str) – Suffix for conflicting right-side keys.
type_coercion (str) – How to handle type differences in join keys: - ‘strict’: Exact type matching (default, preserves current behavior) - ‘auto’: Smart coercion (1 matches ‘1’ matches 1.0) - ‘string’: Convert all join keys to strings for comparison
- Returns:
A new Flow representing the join.
- Return type:
- keys(limit: int = 100) set[str][source]#
Infer the schema of the flow.
- Parameters:
limit (int) – Number of records to sample for schema inference.
- Returns:
The set of keys.
- Return type:
set[str]
- limit(n: int) Flow[source]#
Limit the number of records returned.
- Parameters:
n (int) – The maximum number of records.
- Returns:
A new Flow that yields up to n records.
- Return type:
- map(func: Callable[[dict], dict]) Flow[source]#
Apply a function to each record. Should return a full record. If the function returns None, the record is dropped.
- Parameters:
func (Callable[[dict], dict]) – A function that takes a record and returns a modified one.
- Returns:
A new Flow with the transformed records.
- Return type:
- opta = <penaltyblog.matchflow.contrib.opta.Opta object>#
- pipe(func: Callable[[Flow], Flow]) Flow[source]#
Lazily apply a function to this Flow and return the resulting Flow. The function will be executed at collect-time, not immediately.
The function should return a new Flow, typically using this one as input.
- pivot(index: str | list[str], columns: str, values: str) Flow[source]#
Pivot records: turn row values into columns.
- Parameters:
index (str or list[str]) – Fields to group by.
columns (str) – Field whose values become column names.
values (str) – Field whose values fill the new columns.
- Returns:
A new Flow with records pivoted into wide format.
- Return type:
- plot_plan(compare: bool = False)[source]#
Visualize the flow plan.
- Parameters:
compare (bool) –
True: show two subplots (raw vs. optimized).
False: show a single subplot. If this Flow was constructed with optimize=True, show the optimized plan; otherwise the raw.
- profile(optimize: bool | None = None, fmt: Literal['table', 'records'] = 'table')[source]#
Profile each step in the plan. Returns a report of (step_index, op_name, time_s, rows_emitted).
- Parameters:
optimize – whether to optimize the plan (default = self.optimize)
fmt – ‘table’ to print a table, ‘records’ to return the raw list of dicts.
- query(expr: str)[source]#
Filter rows using query string
- Parameters:
expr (str) – Query string
- Returns:
A new Flow with the filtered records.
- Return type:
Examples
# Basic comparisons flow.query(“age > 30 and name == ‘Phil Foden’”)
# Using variables with @ player = “Mohamed Salah” flow.query(“type.name == ‘Shot’ and player.name == @player”)
# Date filtering flow.query(“date > datetime(2024, 1, 1)”) cutoff_date = datetime(2024, 6, 15) flow.query(“match_date >= @cutoff_date”)
# String operations flow.query(“name.contains(‘son’) and status == ‘active’”)
# Regular expression matching flow.query(“name.regex(‘^[A-Z][a-z]+$’)”) # Names starting with capital letter flow.query(“name.match(’d{4}’, 0)”) # Contains 4 digits in a row
- rename(**mapping: str)[source]#
Rename keys in each record according to mapping of old=new. :param mapping: The mapping of old keys to new keys. :type mapping: dict[str, str]
- Returns:
A new Flow with renamed keys.
- Return type:
- sample_fraction(p: float, seed: int | None = None) Flow[source]#
Lazily sample a fraction of records.
- Parameters:
p (float) – Sampling probability (0 < p < 1).
seed (int, optional) – Random seed.
- Returns:
A new Flow with sampling applied.
- Return type:
- sample_n(n: int, seed: int | None = None) Flow[source]#
Lazily sample n records using reservoir sampling.
- Parameters:
n (int) – Number of records to sample.
seed (int, optional) – Random seed.
- Returns:
A new Flow with sampling applied.
- Return type:
- schema(n=100) dict[str, type][source]#
Infer the schema of the flow.
- Parameters:
n (int) – Number of records to sample for schema inference.
- Returns:
The inferred schema.
- Return type:
dict[str, type]
- select(*fields: str) Flow[source]#
Select specific fields from each record. :param *fields: The fields to select. :type *fields: str
- Returns:
A new Flow with selected fields.
- Return type:
- show(n: int = 5, format: Literal['table', 'record'] = 'table')[source]#
Print the first n records in a pretty format.
- Parameters:
n (int) – Number of records to show.
format (Literal["table", "record"]) – Format to use for display.
- sort_by(*keys: str, ascending: bool | list[bool] = True) Flow[source]#
Sort records by one or more fields.
- Parameters:
*keys (str) – Field names to sort by.
ascending (bool or list[bool], optional) – Sort order(s). Either a single bool applied to all keys or a list of bools (one per key).
- Returns:
A new Flow with sorted records.
- Return type:
- split_array(field: str, into: list[str]) Flow[source]#
Split an array field into separate fields by index.
- Parameters:
field (str) – The field containing a list/array.
into (list[str]) – The names of the output fields.
- Returns:
A new Flow with the array split into separate fields.
- Return type:
- statsbomb = <penaltyblog.matchflow.contrib.statsbomb.StatsBomb object>#
- summary(aggregators: Callable | dict[str, Any]) Flow[source]#
Supports: - Callable (e.g. lambda rows: {…}) - Dict of {alias: callable} - Dict of {alias: “name”} or (name/callable, field)
- to_json(path: str, indent=4, storage_options: Dict[str, Any] | None = None)[source]#
Write the flow to a JSON file (as a list of records).
- Parameters:
path (str) – The path to the JSON file.
indent (int, optional) – The number of spaces to use for indentation.
storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- to_jsonl(path: str, storage_options: Dict[str, Any] | None = None)[source]#
Write the flow to a JSONL file (one record per line).
- Parameters:
path (str) – The path to the JSONL file.
storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- to_pandas() DataFrame[source]#
Collect the flow into a pandas DataFrame.
- Returns:
The collected records as a DataFrame.
- Return type:
pd.DataFrame
- with_schema(schema: dict[str, type | Callable[[Any], Any]], strict: bool = False, drop_extra: bool = False) Flow[source]#
Cast fields to specified types/functions and optionally validate or prune fields.
- Parameters:
schema (dict) – Mapping of field names (dot paths allowed) to types or casting functions.
strict (bool) – If True, raise an error on cast failure. Otherwise, fallback to original value.
drop_extra (bool) – If True, only retain fields explicitly listed in the schema.
- Returns:
A new Flow with schema enforcement applied.
- Return type:
- class penaltyblog.matchflow.group.FlowGroup(plan: List[Dict[str, Any]], optimize: bool = False)[source]#
Bases:
objectA FlowGroup is a collection of records grouped by one or more keys.
- collect() List[Dict[str, Any]][source]#
Collect the FlowGroup into a list of records.
- Returns:
The collected records.
- Return type:
List[Dict[str, Any]]
- cumulative(field: str, alias: str | None = None) Flow[source]#
Apply group-cumulative without eagerly optimizing previous steps.
- Parameters:
field (str) – The field to cumulative.
alias (Optional[str], optional) – The alias for the cumulative field. Defaults to None.
- Returns:
A new Flow with the group-cumulative applied.
- Return type:
- explain(optimize: bool | None = None, compare: bool = False)[source]#
Explain the plan.
- Returns:
None
- plot_plan(compare: bool = False)[source]#
Visualize the flow group plan.
- Parameters:
compare (bool) –
True: show two subplots (raw vs. optimized).
False: show a single subplot. If this FlowGroup was constructed with optimize=True, show the optimized plan; otherwise the raw.
- rolling_summary(window, aggregators, time_field=None, min_periods=1, step=None)[source]#
Lazily apply a rolling summary within each group in a FlowGroup.
This appends a ‘group_rolling_summary’ step to the plan and returns a new Flow.
- Parameters:
window (int | str) – Number of records or time window (e.g., ‘5m’).
aggregators (dict) – Mapping of output field -> (agg_func_name, field).
time_field (str | None) – Field used for time-based windows.
min_periods (int) – Minimum rows needed to compute result.
step (int | None) – Step size for window movement.
- Returns:
A new Flow with a group_rolling_summary step added.
- Return type:
- select(*fields: str) FlowGroup[source]#
Select specific fields from each record.
- Parameters:
*fields (str) – The fields to select.
- Returns:
A new FlowGroup with selected fields.
- Return type:
- sort_by(*keys: str, ascending: bool = True) FlowGroup[source]#
Sort the groups by one or more fields.
- Parameters:
*keys (str) – Field names to sort by.
ascending (bool or list[bool], optional) – Sort order(s). Either a single bool applied to all keys or a list of bools (one per key).
- Returns:
A new FlowGroup with sorted groups.
- Return type:
- summary(aggregators: Callable | Dict[str, Any]) Flow[source]#
Apply group-summary without eagerly optimizing previous steps.
- Parameters:
aggregators (Union[Callable, Dict[str, Any]]) – The aggregators to apply.
- Returns:
A new Flow with the group-summary applied.
- Return type:
- time_bucket(freq: str, aggregators: dict[str, Any], time_field: str, label: Literal['left', 'right'] = 'left', bucket_name: str = 'bucket') Flow[source]#
Create fixed (non-overlapping) time buckets of size freq (e.g. “5m”) and aggregate each bucket.
- Parameters:
freq (str) – e.g. “5m”, “10m”, “1h”, “30s”. Must end with ‘s’,’m’,’h’, or ‘d’.
aggregators (dict) – mapping output_field -> (agg_name_or_callable, input_field)
time_field (str) – dot-path to a field that is either a timedelta or a datetime in each record.
label (str) – “left” (default) → bucket labeled at the interval’s start; “right” → label at interval’s end.
bucket_name (str) – Name of the output field for the bucket label. Defaults to “bucket”.
- Returns:
- a new Flow whose records are one row per bucket per group, with the bucket label
and aggregated values.
- Return type:
- class penaltyblog.matchflow.executor.FlowExecutor(plan: List[Dict[str, Any]])[source]#
Bases:
object
- penaltyblog.matchflow.executor.is_materializing_op(op_name: str) bool[source]#
Returns True if the operation is expected to materialize or buffer records in memory (i.e., requires full dataset to proceed).
- class penaltyblog.matchflow.optimizer.FlowOptimizer(plan)[source]#
Bases:
objectOptimizer for a flow plan.
Performs conservative optimizations: it fuses simple operations, pushes down filters, limits, and select/drop operations only when provably safe, and eliminates redundant steps.
- FIELD_USAGE_HANDLERS = {'assign': <function FlowOptimizer.<lambda>>, 'cast': <function FlowOptimizer.<lambda>>, 'drop': <function FlowOptimizer.<lambda>>, 'dropna': <function FlowOptimizer.<lambda>>, 'filter': <function FlowOptimizer.<lambda>>, 'group_by': <function FlowOptimizer.<lambda>>, 'group_rolling_summary': <function FlowOptimizer.<lambda>>, 'join': <function FlowOptimizer.<lambda>>, 'rename': <function FlowOptimizer.<lambda>>, 'select': <function FlowOptimizer.<lambda>>, 'sort': <function FlowOptimizer.<lambda>>}#
- MAX_PASSES = 5#
Helpers for handling a streaming data pipeline, specifically the Flow class.
- penaltyblog.matchflow.helpers.get_field(path: str, default: Any | None = None) Callable[[dict[Any, Any] | None], Any][source]#
Safely access a nested field using dot notation.
- Parameters:
path (str) – Dot-separated path to a field, e.g. “player.name”.
default (Any, optional) – Value to return if the path is invalid.
- Returns:
A function that retrieves the field value or default.
- Return type:
Callable[[dict], Any]
Example
>>> f = get_field("player.name") >>> f({"player": {"name": "Bukayo Saka"}}) # → "Bukayo Saka"
- penaltyblog.matchflow.helpers.get_index(path: str, index: int, default: Any | None = None) Callable[[dict[Any, Any] | None], Any][source]#
Safely access an index in a nested list using dot-separated path.
- Parameters:
path (str) – Dot-separated path to a list field, e.g. “pass.end_location”.
index (int) – The index to extract from the list.
default (Any, optional) – Value to return if the path is invalid or index out of bounds.
- Returns:
A function that retrieves the indexed value or default.
- Return type:
Callable[[dict], Any]
Example
>>> f = get_index("pass.end_location", 0) >>> f({"pass": {"end_location": [100, 40]}}) # → 100
- penaltyblog.matchflow.helpers.resolve_path(record: dict, path: str, default=None)[source]#
Safely access a nested field using dot notation.
- Parameters:
record (dict) – The record to resolve the path from.
path (str) – The path to resolve.
default (Any, optional) – Value to return if the path is invalid.
- Returns:
The resolved value or default if not found.
- Return type:
Any
Example
>>> resolve_path({"player": {"name": "Bukayo Saka"}}, "player.name") "Bukayo Saka"
- penaltyblog.matchflow.plotting.plot_flow_plan(plan, optimize=False, compare=False, title_prefix='')[source]#
Helper to visualize a plan (single or compare mode), optionally optimizing. optimizer_cls: class to use for optimization (must have .optimize()).
- class penaltyblog.matchflow.predicates.FieldPredicate(field: str, fn: Callable[[Any], bool])[source]#
Bases:
Predicate
- penaltyblog.matchflow.predicates_helpers.where_regex_match(field: str, pattern: str, flags: int | RegexFlag = 0)[source]#
Create a predicate that tests if a field matches a regex pattern.
- Parameters:
field (str) – The field to check.
pattern (str) – The regex pattern to match against.
flags (int or re.RegexFlag, optional) – Regex flags (e.g., re.IGNORECASE).
- Returns:
A predicate that tests if the field matches the pattern.
- Return type:
- penaltyblog.matchflow.steps.group.apply_group_by(records: Iterator[dict[str, Any]], step: dict[str, Any]) Iterator[dict[str, Any]][source]#
Group records by one or more fields.
- Parameters:
records (Iterator[dict]) – Iterator of records to group.
step (dict) – Step configuration dict, must include ‘keys’.
- Returns:
Iterator of group dicts, each with ‘__group_key__’ and ‘__group_records__’.
- Return type:
Iterator[dict]
- penaltyblog.matchflow.steps.group.apply_group_cumulative(records: Iterator[dict[str, Any]], step: dict[str, Any]) Iterator[dict[str, Any]][source]#
Apply a cumulative sum to a field for each group of records.
- Parameters:
records (Iterator[dict]) – Iterator of group dicts, each with ‘__group_key__’ and ‘__group_records__’.
step (dict) – Step configuration dict, must include ‘field’ and ‘alias’.
- Returns:
Iterator of records with cumulative field attached.
- Return type:
Iterator[dict]
- penaltyblog.matchflow.steps.group.apply_group_rolling_summary(records: Iterator[dict[str, Any]], step: dict[str, Any]) Iterator[dict[str, Any]][source]#
Lazily apply a rolling summary within each group.
- Two modes:
Count mode: window is int → last N rows
Time mode: window is str ending in s/m/h/d → last T seconds
In time mode, time_field must be datetime or timedelta.
- Parameters:
records (Iterator[dict]) – Iterator of records, each a dict.
step (dict) – Step configuration dict, must include ‘window’, ‘aggregators’, and optionally ‘time_field’, ‘min_periods’, ‘step’, ‘__group_keys’.
- Returns:
Iterator of records with rolling summary fields attached.
- Return type:
Iterator[dict]
- penaltyblog.matchflow.steps.group.apply_group_summary(records: Iterator[dict[str, Any]], step: dict[str, Any]) Iterator[dict[str, Any]][source]#
Apply a summary function to each group of records.
- Parameters:
records (Iterator[dict]) – Iterator of group dicts, each with ‘__group_key__’ and ‘__group_records__’.
step (dict) – Step configuration dict, must include ‘agg’ and optionally ‘group_keys’.
- Returns:
Iterator of summary dicts for each group.
- Return type:
Iterator[dict]
- penaltyblog.matchflow.steps.group.apply_group_time_bucket(records: Iterator[dict[str, Any]], step: dict[str, Any]) Iterator[dict[str, Any]][source]#
Assign each record in a group to a fixed, non-overlapping time bin.
- Two modes:
String freq with suffix (e.g. ‘5m’, ‘1h’): requires datetime or timedelta time_field.
Numeric freq (int/float): buckets numeric values directly.
- Parameters:
records (Iterator[dict]) – Iterator of group dicts, each with ‘__group_key__’ and ‘__group_records__’.
step (dict) – Step configuration dict, must include ‘freq’, ‘aggregators’, ‘time_field’, and optionally ‘label’, ‘bucket_name’, ‘__group_keys’.
- Returns:
Iterator of records with bucket assignments and aggregated fields.
- Return type:
Iterator[dict]
- penaltyblog.matchflow.steps.group.get_time_window_details(window: int | float | str, time_field: str | None) Tuple[bool, int | None, float | None, datetime | None, bool][source]#
Determine the mode (count or time) and parse the window size.
- Parameters:
window (int, float, or str) – Window size as integer/float (row count) or string (e.g. ‘5m’, ‘1h’).
time_field (str or None) – Name of the time field, required for time-based windows.
- Returns:
(count_mode: bool, count_window: Optional[int], time_window_seconds: Optional[float], origin: Optional[datetime], is_datetime: bool)
- Return type:
tuple
- penaltyblog.matchflow.steps.group.parse_window_size(window_str: str) float[source]#
Parse a window size string like ‘5m’, ‘10m’, ‘1h’, ’30s’, ‘1d’ to seconds (float).
- Parameters:
window_str (str) – Window size string, must end with ‘s’, ‘m’, ‘h’, or ‘d’.
- Returns:
Window size in seconds.
- Return type:
float
- Raises:
ValueError – If the string cannot be parsed or has an unrecognized unit.
- penaltyblog.matchflow.steps.source.from_concat(step) Iterator[Dict[Any, Any]][source]#
Create a Flow from a list of plans.
- Parameters:
step (dict) – A dictionary containing the plans to concatenate.
- Returns:
A new Flow streaming matching files.
- Return type:
Iterator[dict]
- penaltyblog.matchflow.steps.source.from_folder(step) Iterator[Dict[Any, Any]][source]#
Create a Flow from a folder of JSON or JSONL files.
- Parameters:
step (dict) –
A dictionary containing: - path (str): The path to the folder containing the records. - storage_options (dict, optional): Additional options for cloud storage backends.
For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- Returns:
A new Flow streaming matching files.
- Return type:
Iterator[dict]
- penaltyblog.matchflow.steps.source.from_glob(step) Iterator[Dict[Any, Any]][source]#
Create a Flow from a glob pattern.
- Parameters:
step (dict) –
A dictionary containing: - pattern (str): Glob pattern (e.g., “data/**/*.json”). - storage_options (dict, optional): Additional options for cloud storage backends.
For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- Returns:
A new Flow streaming matching files.
- Return type:
Iterator[dict]
- penaltyblog.matchflow.steps.source.from_json(step) Iterator[Dict[Any, Any]][source]#
Create a Flow from a JSON file.
- Parameters:
step (dict) –
A dictionary containing: - path (str): The path to the JSON file. - storage_options (dict, optional): Additional options for cloud storage backends.
For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- Returns:
A new Flow streaming matching files.
- Return type:
Iterator[dict]
- penaltyblog.matchflow.steps.source.from_jsonl(step) Iterator[Dict[Any, Any]][source]#
Create a Flow from a JSONL file.
- Parameters:
step (dict) –
A dictionary containing: - path (str): The path to the JSONL file. - storage_options (dict, optional): Additional options for cloud storage backends.
For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}
- Returns:
A new Flow streaming matching files.
- Return type:
Iterator[dict]
- penaltyblog.matchflow.steps.transform.apply_assign(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Assign new fields to each record.
- Parameters:
records – A stream of records to assign fields to.
step – Configuration dict containing the fields to assign.
- Returns:
A stream of records with assigned fields.
- penaltyblog.matchflow.steps.transform.apply_distinct(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
- penaltyblog.matchflow.steps.transform.apply_drop(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Drop specified fields from each record.
- Parameters:
records – A stream of records to drop fields from.
step – Configuration dict containing the keys to drop.
- Returns:
A stream of records with dropped fields.
- penaltyblog.matchflow.steps.transform.apply_dropna(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Drop records with missing values.
- Parameters:
records – A stream of records to drop missing values from.
step – Configuration dict containing the fields to drop missing values from.
- Returns:
A stream of records with dropped records.
- penaltyblog.matchflow.steps.transform.apply_explode(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Explode records based on a list of fields.
- Parameters:
records – A stream of records to explode.
step – Configuration dict containing the fields to explode.
- Returns:
A stream of exploded records.
- penaltyblog.matchflow.steps.transform.apply_filter(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Filter records based on a predicate.
- Parameters:
records – A stream of records to filter.
step – Configuration dict containing the predicate to apply.
- Returns:
A stream of filtered records.
- penaltyblog.matchflow.steps.transform.apply_flatten(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Flatten nested dictionaries into a single-level dictionary using dot notation.
- Parameters:
records – A stream of records to flatten.
step – Configuration dict containing the keys to flatten.
- Returns:
A stream of flattened records.
- penaltyblog.matchflow.steps.transform.apply_fused(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Apply a fused sequence of map/assign/filter operations.
- Parameters:
records – A stream of records to apply fused operations to.
step – Configuration dict with an ‘ops’ list and potentially embedded steps.
- Returns:
A stream of records with the fused operations applied.
- penaltyblog.matchflow.steps.transform.apply_join(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Join records based on a list of keys. Dispatcher function that selects the appropriate join strategy.
- Parameters:
records – A stream of records to join.
step – Configuration dict containing the keys to join by.
- Returns:
A stream of joined records.
- penaltyblog.matchflow.steps.transform.apply_limit(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Limit the number of records.
- Parameters:
records – A stream of records to limit.
step – Configuration dict containing the count to limit by.
- Returns:
A stream of limited records.
- penaltyblog.matchflow.steps.transform.apply_map(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Apply a function to each record.
- Parameters:
records – A stream of records to apply the function to.
step – Configuration dict containing the function to apply.
- Returns:
A stream of mapped records.
- penaltyblog.matchflow.steps.transform.apply_pivot(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Pivot records based on a list of index fields.
- Parameters:
records – A stream of records to pivot.
step – Configuration dict containing the index fields to pivot by.
- Returns:
A stream of pivoted records.
- penaltyblog.matchflow.steps.transform.apply_rename(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Rename fields in each record.
- Parameters:
records – A stream of records to rename fields in.
step – Configuration dict containing the mapping of old to new field names.
- Returns:
A stream of records with renamed fields.
- penaltyblog.matchflow.steps.transform.apply_sample_fraction(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Sample a fraction of the records.
- Parameters:
records – A stream of records to sample a fraction from.
step – Configuration dict containing the fraction to sample.
- Returns:
A stream of sampled records.
- penaltyblog.matchflow.steps.transform.apply_sample_n(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Sample a fixed number of records.
- Parameters:
records – A stream of records to sample from.
step – Configuration dict with ‘n’ and optional ‘seed’.
- Returns:
A stream of sampled records.
- penaltyblog.matchflow.steps.transform.apply_select(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Select specific fields from each record.
- Parameters:
records – A stream of records to select fields from.
step – Configuration dict containing the fields to select.
- Returns:
A stream of records with selected fields.
- penaltyblog.matchflow.steps.transform.apply_sort(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Sort records based on a list of keys.
- Parameters:
records – A stream of records to sort.
step – Configuration dict containing the keys to sort by.
- Returns:
A stream of sorted records.
- penaltyblog.matchflow.steps.transform.apply_split_array(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Split an array into multiple records.
- Parameters:
records – A stream of records to split arrays from.
step – Configuration dict containing the field to split.
- Returns:
A stream of split records.
- penaltyblog.matchflow.steps.transform.apply_summary(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
Apply a summary function to the records.
- Parameters:
records – A stream of records to apply the summary function to.
step – Configuration dict containing the summary function to apply.
- Returns:
A stream of summary results.
- penaltyblog.matchflow.steps.utils.fast_get_field(record: dict, parts: list[str]) Any[source]#
Retrieve a nested value from a mapping or list using a pre-split path.
This is a slightly optimized helper for looking up nested values when the caller has already split a dotted path into parts. Each element of
partsis used to traverse dictionaries by key. If the current value is alistand the part is a numeric string it will be treated as an integer index. If any step cannot be resolved the function returnsNone.- Parameters:
record (dict) – The mapping to search. May contain nested dicts/lists.
parts (list[str]) – Pre-split path components (for example
["a", "0", "b"]).
- Returns:
The found value, or
Noneif the path does not exist or an index is out of range.- Return type:
Any
Example
>>> fast_get_field({"a": [{"b": 1}]}, ["a", "0", "b"]) 1
- penaltyblog.matchflow.steps.utils.flatten_dict(d: dict, parent_key: str = '', sep: str = '.') dict[source]#
Flatten a nested dictionary into a single-level dictionary using dot notation.
- Parameters:
d (dict) – The dictionary to flatten.
parent_key (str) – The prefix for nested keys.
sep (str) – The separator to use (default is “.”).
- Returns:
Flattened dictionary with dot-notated keys.
- Return type:
dict
- penaltyblog.matchflow.steps.utils.get_field(record: dict, path: str | List[str]) Any[source]#
Retrieve a nested value from a mapping or list using dot-notation.
The
pathmay either be a dotted string (for example"a.0.b") or a pre-split list of parts (["a", "0", "b"]). Dictionary keys are looked up by name. If a traversal step yields a list and the corresponding part is a numeric string, it will be used as a list index. Any lookup failure returnsNonerather than raising.- Parameters:
record (dict) – The mapping to search. May contain nested dicts/lists.
path (Union[str, List[str]]) – Dotted path or list of path components.
- Returns:
The found value, or
Noneif the path does not exist or an index is invalid.- Return type:
Any
Example
>>> get_field({"loc": [50, 40]}, "loc.0") 50
- penaltyblog.matchflow.steps.utils.get_index(field: str, index: int) Callable[[dict], Any][source]#
Returns a function that extracts the `index`th value from a list field (dot path).
Example
f = get_index(“location”, 0) f({“location”: [50, 40]}) → 50
If the field or index is missing, returns None.
- penaltyblog.matchflow.steps.utils.reservoir_sample(iterable, k, seed=None) list[source]#
Sample a fixed number of records from an iterable using the reservoir sampling algorithm.
- Parameters:
iterable (iterable) – The iterable to sample from.
k (int) – The number of records to sample.
seed (int, optional) – The seed for the random number generator.
- Returns:
The sampled records.
- Return type:
list
- penaltyblog.matchflow.steps.utils.schema(records: list[dict], sample_size: int = 50) dict[source]#
Get the schema of a list of records.
- Parameters:
records (list[dict]) – The list of records to get the schema from.
sample_size (int) – The number of records to sample.
- Returns:
The schema of the records.
- Return type:
dict