MatchFlow#

class penaltyblog.matchflow.Flow(plan: List[Dict[str, Any]] | None = None, optimize: bool = False)[source]#

Bases: object

assign(**fields: Callable[[dict], Any]) Flow[source]#

Assign new fields to each record. :param **fields: The fields to assign. :type **fields: dict[str, Callable[[dict], Any]]

Returns:

A new Flow with assigned fields.

Return type:

Flow

cache() Flow[source]#

Cache the records in memory.

Returns:

A new Flow with the records cached.

Return type:

Flow

cast(**casts: type | Callable[[Any], Any]) Flow[source]#

Cast fields to specified types or functions.

Parameters:

**casts (type or Callable[[Any], Any]) – The types or functions to cast to.

Returns:

A new Flow with the fields cast.

Return type:

Flow

collect(optimize: bool | None = None, progress: Literal['output', 'input'] | None = None, total_records: int | None = None) list[source]#

Collect all records from the flow.

Parameters:
  • optimize (bool) – Whether to optimize the plan before execution.

  • progress (Optional[Literal["output", "input"]]) – Whether to show progress.

  • total_records (Optional[int]) – Total number of rows to expect.

Returns:

A list of records.

Return type:

list

concat(*others: Flow) Flow[source]#

Concatenate this flow with one or more other flows.

Parameters:

*others (Flow) – One or more flows to concatenate.

Returns:

A new Flow representing the concatenated sequence.

Return type:

Flow

count() int[source]#

Count the number of records in the flow.

Returns:

The number of records.

Return type:

int

distinct(*keys: str, keep: str = 'first') Flow[source]#

Remove duplicate records.

Parameters:
  • *keys (str) – Optional field names to determine uniqueness.

  • keep (str) – ‘first’ (default) or ‘last’ to control which duplicate is retained.

Returns:

A new Flow with duplicates removed.

Return type:

Flow

drop(*keys: str) Flow[source]#

Drop one or more fields from each record. Supports dot notation for nested fields.

Parameters:

*keys (str) – Field names to remove.

Returns:

A new Flow with fields removed.

Return type:

Flow

dropna(*fields: str) Flow[source]#

Drop records where any of the specified fields are None or missing. If no fields are given, drops records where any top-level value is None.

Parameters:

*fields (str) – Optional field paths (dot notation) to check for None.

Returns:

A new Flow with records containing nulls removed.

Return type:

Flow

explain(optimize: bool | None = None, compare: bool = False)[source]#

Print a readable version of the plan.

Parameters:
  • optimize (bool) – Whether to show the optimized plan (default True).

  • compare (bool) – If True, show both pre- and post-optimization plans.

explode(*fields: str) Flow[source]#

Explode one or more list fields into multiple records (in sync). All fields must be lists of the same length in each record.

Parameters:

*fields (str) – One or more field names (dot notation allowed).

Returns:

A new Flow with records exploded along the given fields.

Return type:

Flow

filter(*predicates: Callable[[dict], bool]) Flow[source]#
flatten() Flow[source]#

Flatten nested dictionaries into a single-level dictionary using dot notation.

Returns:

A new Flow with flattened records.

Return type:

Flow

static from_folder(path: str, optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#

Create a Flow from a folder of records. :param path: The path to the folder containing the records. :type path: str :param optimize: Whether to optimize the flow. :type optimize: bool :param storage_options: Additional options for cloud storage backends.

For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

Returns:

A new Flow pointing to the records.

Return type:

Flow

static from_glob(pattern: str, optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#

Create a Flow from a glob pattern.

Parameters:
  • pattern (str) – Glob pattern (e.g., “data/**/*.json”).

  • optimize (bool) – Whether to optimize the flow.

  • storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

Returns:

A new Flow streaming matching files.

Return type:

Flow

static from_json(path: str, optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#

Lazily load a list of records from a JSON file.

Parameters:
  • path (str) – The path to the JSON file.

  • optimize (bool) – Whether to optimize the flow.

  • storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

Returns:

A new Flow pointing to the records.

Return type:

Flow

static from_jsonl(path: str, optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#

Lazily load records from a JSONL file.

Parameters:
  • path (str) – The path to the JSONL file.

  • optimize (bool) – Whether to optimize the flow.

  • storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

Returns:

A new Flow pointing to the records.

Return type:

Flow

static from_list(records: List[dict], optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#

Create a Flow from a list of records. :param records: The list of records to create a Flow from. :type records: List[dict] :param optimize: Whether to optimize the flow. :type optimize: bool :param storage_options: Additional options for cloud storage backends.

For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

Returns:

A new Flow with the records.

Return type:

Flow

static from_records(records: List[dict], optimize: bool = False) Flow[source]#

Create a Flow from a list of records. :param records: The list of records to create a Flow from. :type records: List[dict]

Returns:

A new Flow with the records.

Return type:

Flow

get_url() str[source]#

For a Flow created from an API source (e.g. Opta), construct and return the URL that will be called. This is a debugging utility and may not work for all flows.

group_by(*keys: str) FlowGroup[source]#

Group records by one or more fields.

Parameters:

*keys (str) – Field names to group by.

Returns:

A new FlowGroup with grouped records.

Return type:

FlowGroup

grouped(key: str)[source]#

Group records by a single field.

Parameters:

key (str) – The field to group by.

Returns:

The groups.

Return type:

Iterator[Tuple[Any, List[Dict[str, Any]]]]

head(n=5) list[source]#

Runs the flow and returns the first n records.

Parameters:

n (int) – The number of records to return.

Returns:

A list of the first n records.

Return type:

list

is_empty() bool[source]#

Check if the flow yields any records without fully collecting it.

Returns:

True if there are no records, False otherwise.

Return type:

bool

join(other: Flow, on: str | List[str] | None = None, left_on: str | List[str] | None = None, right_on: str | List[str] | None = None, how: Literal['left', 'right', 'outer', 'inner', 'anti'] = 'left', lsuffix: str = '', rsuffix: str = '_right', type_coercion: Literal['strict', 'auto', 'string'] = 'strict') Flow[source]#

Join with another Flow.

Parameters:
  • other (Flow) – The right-hand flow to join with.

  • on (str, list[str], or None) – Field(s) to join on when key names are the same.

  • left_on (str, list[str], or None) – Left-side field(s) to join on.

  • right_on (str, list[str], or None) – Right-side field(s) to join on.

  • how (str) – Type of join - ‘left’, ‘right’, ‘outer’, ‘inner’, or ‘anti’.

  • lsuffix (str) – Suffix for conflicting left-side keys.

  • rsuffix (str) – Suffix for conflicting right-side keys.

  • type_coercion (str) – How to handle type differences in join keys: - ‘strict’: Exact type matching (default, preserves current behavior) - ‘auto’: Smart coercion (1 matches ‘1’ matches 1.0) - ‘string’: Convert all join keys to strings for comparison

Returns:

A new Flow representing the join.

Return type:

Flow

keys(limit: int = 100) set[str][source]#

Infer the schema of the flow.

Parameters:

limit (int) – Number of records to sample for schema inference.

Returns:

The set of keys.

Return type:

set[str]

limit(n: int) Flow[source]#

Limit the number of records returned.

Parameters:

n (int) – The maximum number of records.

Returns:

A new Flow that yields up to n records.

Return type:

Flow

map(func: Callable[[dict], dict]) Flow[source]#

Apply a function to each record. Should return a full record. If the function returns None, the record is dropped.

Parameters:

func (Callable[[dict], dict]) – A function that takes a record and returns a modified one.

Returns:

A new Flow with the transformed records.

Return type:

Flow

opta = <penaltyblog.matchflow.contrib.opta.Opta object>#
pipe(func: Callable[[Flow], Flow]) Flow[source]#

Lazily apply a function to this Flow and return the resulting Flow. The function will be executed at collect-time, not immediately.

The function should return a new Flow, typically using this one as input.

pivot(index: str | list[str], columns: str, values: str) Flow[source]#

Pivot records: turn row values into columns.

Parameters:
  • index (str or list[str]) – Fields to group by.

  • columns (str) – Field whose values become column names.

  • values (str) – Field whose values fill the new columns.

Returns:

A new Flow with records pivoted into wide format.

Return type:

Flow

plot_plan(compare: bool = False)[source]#

Visualize the flow plan.

Parameters:

compare (bool) –

  • True: show two subplots (raw vs. optimized).

  • False: show a single subplot. If this Flow was constructed with optimize=True, show the optimized plan; otherwise the raw.

profile(optimize: bool | None = None, fmt: Literal['table', 'records'] = 'table')[source]#

Profile each step in the plan. Returns a report of (step_index, op_name, time_s, rows_emitted).

Parameters:
  • optimize – whether to optimize the plan (default = self.optimize)

  • fmt – ‘table’ to print a table, ‘records’ to return the raw list of dicts.

query(expr: str)[source]#

Filter rows using query string

Parameters:

expr (str) – Query string

Returns:

A new Flow with the filtered records.

Return type:

Flow

Examples

# Basic comparisons flow.query(“age > 30 and name == ‘Phil Foden’”)

# Using variables with @ player = “Mohamed Salah” flow.query(“type.name == ‘Shot’ and player.name == @player”)

# Date filtering flow.query(“date > datetime(2024, 1, 1)”) cutoff_date = datetime(2024, 6, 15) flow.query(“match_date >= @cutoff_date”)

# String operations flow.query(“name.contains(‘son’) and status == ‘active’”)

# Regular expression matching flow.query(“name.regex(‘^[A-Z][a-z]+$’)”) # Names starting with capital letter flow.query(“name.match(’d{4}’, 0)”) # Contains 4 digits in a row

rename(**mapping: str)[source]#

Rename keys in each record according to mapping of old=new. :param mapping: The mapping of old keys to new keys. :type mapping: dict[str, str]

Returns:

A new Flow with renamed keys.

Return type:

Flow

sample_fraction(p: float, seed: int | None = None) Flow[source]#

Lazily sample a fraction of records.

Parameters:
  • p (float) – Sampling probability (0 < p < 1).

  • seed (int, optional) – Random seed.

Returns:

A new Flow with sampling applied.

Return type:

Flow

sample_n(n: int, seed: int | None = None) Flow[source]#

Lazily sample n records using reservoir sampling.

Parameters:
  • n (int) – Number of records to sample.

  • seed (int, optional) – Random seed.

Returns:

A new Flow with sampling applied.

Return type:

Flow

schema(n=100) dict[str, type][source]#

Infer the schema of the flow.

Parameters:

n (int) – Number of records to sample for schema inference.

Returns:

The inferred schema.

Return type:

dict[str, type]

select(*fields: str) Flow[source]#

Select specific fields from each record. :param *fields: The fields to select. :type *fields: str

Returns:

A new Flow with selected fields.

Return type:

Flow

show(n: int = 5, format: Literal['table', 'record'] = 'table')[source]#

Print the first n records in a pretty format.

Parameters:
  • n (int) – Number of records to show.

  • format (Literal["table", "record"]) – Format to use for display.

sort_by(*keys: str, ascending: bool | list[bool] = True) Flow[source]#

Sort records by one or more fields.

Parameters:
  • *keys (str) – Field names to sort by.

  • ascending (bool or list[bool], optional) – Sort order(s). Either a single bool applied to all keys or a list of bools (one per key).

Returns:

A new Flow with sorted records.

Return type:

Flow

split_array(field: str, into: list[str]) Flow[source]#

Split an array field into separate fields by index.

Parameters:
  • field (str) – The field containing a list/array.

  • into (list[str]) – The names of the output fields.

Returns:

A new Flow with the array split into separate fields.

Return type:

Flow

statsbomb = <penaltyblog.matchflow.contrib.statsbomb.StatsBomb object>#
summary(aggregators: Callable | dict[str, Any]) Flow[source]#

Supports: - Callable (e.g. lambda rows: {…}) - Dict of {alias: callable} - Dict of {alias: “name”} or (name/callable, field)

to_json(path: str, indent=4, storage_options: Dict[str, Any] | None = None)[source]#

Write the flow to a JSON file (as a list of records).

Parameters:
  • path (str) – The path to the JSON file.

  • indent (int, optional) – The number of spaces to use for indentation.

  • storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

to_jsonl(path: str, storage_options: Dict[str, Any] | None = None)[source]#

Write the flow to a JSONL file (one record per line).

Parameters:
  • path (str) – The path to the JSONL file.

  • storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

to_pandas() DataFrame[source]#

Collect the flow into a pandas DataFrame.

Returns:

The collected records as a DataFrame.

Return type:

pd.DataFrame

with_schema(schema: dict[str, type | Callable[[Any], Any]], strict: bool = False, drop_extra: bool = False) Flow[source]#

Cast fields to specified types/functions and optionally validate or prune fields.

Parameters:
  • schema (dict) – Mapping of field names (dot paths allowed) to types or casting functions.

  • strict (bool) – If True, raise an error on cast failure. Otherwise, fallback to original value.

  • drop_extra (bool) – If True, only retain fields explicitly listed in the schema.

Returns:

A new Flow with schema enforcement applied.

Return type:

Flow

penaltyblog.matchflow.and_(*preds)[source]#
penaltyblog.matchflow.not_(pred)[source]#
penaltyblog.matchflow.or_(*preds)[source]#
penaltyblog.matchflow.where_contains(field: str, substring: str)[source]#
penaltyblog.matchflow.where_equals(field: str, value: Any)[source]#
penaltyblog.matchflow.where_exists(field: str)[source]#
penaltyblog.matchflow.where_gt(field: str, threshold)[source]#
penaltyblog.matchflow.where_gte(field: str, threshold)[source]#
penaltyblog.matchflow.where_in(field: str, values: list)[source]#
penaltyblog.matchflow.where_is_null(field: str)[source]#
penaltyblog.matchflow.where_lt(field: str, threshold)[source]#
penaltyblog.matchflow.where_lte(field: str, threshold)[source]#
penaltyblog.matchflow.where_not_equals(field: str, value: Any)[source]#
penaltyblog.matchflow.where_not_in(field: str, values: list)[source]#

StatsBomb integration for Flow. Provides lazy-loading access to StatsBomb API via plan-based execution.

class penaltyblog.matchflow.contrib.statsbomb.StatsBomb[source]#

Bases: object

property DEFAULT_CREDS: dict#

Get default credentials from environment variables.

competitions(optimize: bool = False) Flow[source]#

Return a Flow for competitions from StatsBomb.

Parameters:

optimize (bool, optional) – Whether to optimize the plan (default: False).

Returns:

Flow object for competitions resource.

Return type:

Flow

events(match_id: int, include_360_metrics=False, creds: dict | None = None, optimize: bool = False) Flow[source]#

Return a Flow for events from StatsBomb.

Parameters:
  • match_id (int) – Match ID.

  • include_360_metrics (bool, optional) – Whether to include 360 metrics (default: False).

  • creds (dict, optional) – Credentials for StatsBomb API. Defaults to StatsBomb.DEFAULT_CREDS.

  • optimize (bool, optional) – Whether to optimize the plan (default: False).

Returns:

Flow object for events resource.

Return type:

Flow

lineups(match_id: int, creds: dict | None = None, optimize: bool = False) Flow[source]#

Return a Flow for player lineups for a given match.

Parameters:
  • match_id (int) – The StatsBomb match identifier for which to fetch lineups.

  • creds (dict, optional) – Optional credentials to use for the StatsBomb API. If omitted, StatsBomb.DEFAULT_CREDS will be used.

  • optimize (bool, optional) – If True, the returned Flow will have optimization enabled.

Returns:

A Flow that will fetch the lineups when executed.

Return type:

Flow

matches(competition_id: int, season_id: int, creds: dict | None = None, optimize: bool = False) Flow[source]#

Return a Flow for matches from StatsBomb.

Parameters:
  • competition_id (int) – Competition ID.

  • season_id (int) – Season ID.

  • creds (dict, optional) – Credentials for StatsBomb API. Defaults to StatsBomb.DEFAULT_CREDS.

  • optimize (bool, optional) – Whether to optimize the plan (default: False).

Returns:

Flow object for matches resource.

Return type:

Flow

player_match_stats(match_id: int, creds: dict | None = None, optimize: bool = False) Flow[source]#

Return a Flow for per-player statistics for a single match.

Parameters:
  • match_id (int) – The StatsBomb match identifier.

  • creds (dict, optional) – Credentials to authenticate with the StatsBomb API. Defaults to StatsBomb.DEFAULT_CREDS when not provided.

  • optimize (bool, optional) – If True, the returned Flow will be optimized.

Returns:

A Flow that will fetch player match statistics when executed.

Return type:

Flow

player_season_stats(competition_id: int, season_id: int, creds: dict | None = None, optimize: bool = False) Flow[source]#

Return a Flow for per-player aggregated statistics for a season.

Parameters:
  • competition_id (int) – StatsBomb competition identifier.

  • season_id (int) – StatsBomb season identifier.

  • creds (dict, optional) – Optional credentials for the StatsBomb API. Defaults to StatsBomb.DEFAULT_CREDS when omitted.

  • optimize (bool, optional) – If True, create the Flow with optimization enabled.

Returns:

A Flow that will fetch player season statistics when executed.

Return type:

Flow

team_match_stats(match_id: int, creds: dict | None = None, optimize: bool = False) Flow[source]#

Return a Flow for team-level statistics for a single match.

Parameters:
  • match_id (int) – The StatsBomb match identifier.

  • creds (dict, optional) – Credentials for StatsBomb; falls back to StatsBomb.DEFAULT_CREDS if not provided.

  • optimize (bool, optional) – Whether to optimize the constructed Flow.

Returns:

A Flow that will fetch team match statistics when executed.

Return type:

Flow

team_season_stats(competition_id: int, season_id: int, creds: dict | None = None, optimize: bool = False) Flow[source]#

Return a Flow for team-level aggregated statistics for a season.

Parameters:
  • competition_id (int) – StatsBomb competition identifier.

  • season_id (int) – StatsBomb season identifier.

  • creds (dict, optional) – Optional credentials to use with the StatsBomb API; defaults to StatsBomb.DEFAULT_CREDS when omitted.

  • optimize (bool, optional) – Whether to enable optimizations for the created Flow.

Returns:

A Flow that will fetch team season statistics when executed.

Return type:

Flow

MatchFlow API Documentation#

Flow class for handling a streaming data pipeline.

class penaltyblog.matchflow.flow.Flow(plan: List[Dict[str, Any]] | None = None, optimize: bool = False)[source]#

Bases: object

assign(**fields: Callable[[dict], Any]) Flow[source]#

Assign new fields to each record. :param **fields: The fields to assign. :type **fields: dict[str, Callable[[dict], Any]]

Returns:

A new Flow with assigned fields.

Return type:

Flow

cache() Flow[source]#

Cache the records in memory.

Returns:

A new Flow with the records cached.

Return type:

Flow

cast(**casts: type | Callable[[Any], Any]) Flow[source]#

Cast fields to specified types or functions.

Parameters:

**casts (type or Callable[[Any], Any]) – The types or functions to cast to.

Returns:

A new Flow with the fields cast.

Return type:

Flow

collect(optimize: bool | None = None, progress: Literal['output', 'input'] | None = None, total_records: int | None = None) list[source]#

Collect all records from the flow.

Parameters:
  • optimize (bool) – Whether to optimize the plan before execution.

  • progress (Optional[Literal["output", "input"]]) – Whether to show progress.

  • total_records (Optional[int]) – Total number of rows to expect.

Returns:

A list of records.

Return type:

list

concat(*others: Flow) Flow[source]#

Concatenate this flow with one or more other flows.

Parameters:

*others (Flow) – One or more flows to concatenate.

Returns:

A new Flow representing the concatenated sequence.

Return type:

Flow

count() int[source]#

Count the number of records in the flow.

Returns:

The number of records.

Return type:

int

distinct(*keys: str, keep: str = 'first') Flow[source]#

Remove duplicate records.

Parameters:
  • *keys (str) – Optional field names to determine uniqueness.

  • keep (str) – ‘first’ (default) or ‘last’ to control which duplicate is retained.

Returns:

A new Flow with duplicates removed.

Return type:

Flow

drop(*keys: str) Flow[source]#

Drop one or more fields from each record. Supports dot notation for nested fields.

Parameters:

*keys (str) – Field names to remove.

Returns:

A new Flow with fields removed.

Return type:

Flow

dropna(*fields: str) Flow[source]#

Drop records where any of the specified fields are None or missing. If no fields are given, drops records where any top-level value is None.

Parameters:

*fields (str) – Optional field paths (dot notation) to check for None.

Returns:

A new Flow with records containing nulls removed.

Return type:

Flow

explain(optimize: bool | None = None, compare: bool = False)[source]#

Print a readable version of the plan.

Parameters:
  • optimize (bool) – Whether to show the optimized plan (default True).

  • compare (bool) – If True, show both pre- and post-optimization plans.

explode(*fields: str) Flow[source]#

Explode one or more list fields into multiple records (in sync). All fields must be lists of the same length in each record.

Parameters:

*fields (str) – One or more field names (dot notation allowed).

Returns:

A new Flow with records exploded along the given fields.

Return type:

Flow

filter(*predicates: Callable[[dict], bool]) Flow[source]#
flatten() Flow[source]#

Flatten nested dictionaries into a single-level dictionary using dot notation.

Returns:

A new Flow with flattened records.

Return type:

Flow

static from_folder(path: str, optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#

Create a Flow from a folder of records. :param path: The path to the folder containing the records. :type path: str :param optimize: Whether to optimize the flow. :type optimize: bool :param storage_options: Additional options for cloud storage backends.

For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

Returns:

A new Flow pointing to the records.

Return type:

Flow

static from_glob(pattern: str, optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#

Create a Flow from a glob pattern.

Parameters:
  • pattern (str) – Glob pattern (e.g., “data/**/*.json”).

  • optimize (bool) – Whether to optimize the flow.

  • storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

Returns:

A new Flow streaming matching files.

Return type:

Flow

static from_json(path: str, optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#

Lazily load a list of records from a JSON file.

Parameters:
  • path (str) – The path to the JSON file.

  • optimize (bool) – Whether to optimize the flow.

  • storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

Returns:

A new Flow pointing to the records.

Return type:

Flow

static from_jsonl(path: str, optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#

Lazily load records from a JSONL file.

Parameters:
  • path (str) – The path to the JSONL file.

  • optimize (bool) – Whether to optimize the flow.

  • storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

Returns:

A new Flow pointing to the records.

Return type:

Flow

static from_list(records: List[dict], optimize: bool = False, storage_options: Dict[str, Any] | None = None) Flow[source]#

Create a Flow from a list of records. :param records: The list of records to create a Flow from. :type records: List[dict] :param optimize: Whether to optimize the flow. :type optimize: bool :param storage_options: Additional options for cloud storage backends.

For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

Returns:

A new Flow with the records.

Return type:

Flow

static from_records(records: List[dict], optimize: bool = False) Flow[source]#

Create a Flow from a list of records. :param records: The list of records to create a Flow from. :type records: List[dict]

Returns:

A new Flow with the records.

Return type:

Flow

get_url() str[source]#

For a Flow created from an API source (e.g. Opta), construct and return the URL that will be called. This is a debugging utility and may not work for all flows.

group_by(*keys: str) FlowGroup[source]#

Group records by one or more fields.

Parameters:

*keys (str) – Field names to group by.

Returns:

A new FlowGroup with grouped records.

Return type:

FlowGroup

grouped(key: str)[source]#

Group records by a single field.

Parameters:

key (str) – The field to group by.

Returns:

The groups.

Return type:

Iterator[Tuple[Any, List[Dict[str, Any]]]]

head(n=5) list[source]#

Runs the flow and returns the first n records.

Parameters:

n (int) – The number of records to return.

Returns:

A list of the first n records.

Return type:

list

is_empty() bool[source]#

Check if the flow yields any records without fully collecting it.

Returns:

True if there are no records, False otherwise.

Return type:

bool

join(other: Flow, on: str | List[str] | None = None, left_on: str | List[str] | None = None, right_on: str | List[str] | None = None, how: Literal['left', 'right', 'outer', 'inner', 'anti'] = 'left', lsuffix: str = '', rsuffix: str = '_right', type_coercion: Literal['strict', 'auto', 'string'] = 'strict') Flow[source]#

Join with another Flow.

Parameters:
  • other (Flow) – The right-hand flow to join with.

  • on (str, list[str], or None) – Field(s) to join on when key names are the same.

  • left_on (str, list[str], or None) – Left-side field(s) to join on.

  • right_on (str, list[str], or None) – Right-side field(s) to join on.

  • how (str) – Type of join - ‘left’, ‘right’, ‘outer’, ‘inner’, or ‘anti’.

  • lsuffix (str) – Suffix for conflicting left-side keys.

  • rsuffix (str) – Suffix for conflicting right-side keys.

  • type_coercion (str) – How to handle type differences in join keys: - ‘strict’: Exact type matching (default, preserves current behavior) - ‘auto’: Smart coercion (1 matches ‘1’ matches 1.0) - ‘string’: Convert all join keys to strings for comparison

Returns:

A new Flow representing the join.

Return type:

Flow

keys(limit: int = 100) set[str][source]#

Infer the schema of the flow.

Parameters:

limit (int) – Number of records to sample for schema inference.

Returns:

The set of keys.

Return type:

set[str]

limit(n: int) Flow[source]#

Limit the number of records returned.

Parameters:

n (int) – The maximum number of records.

Returns:

A new Flow that yields up to n records.

Return type:

Flow

map(func: Callable[[dict], dict]) Flow[source]#

Apply a function to each record. Should return a full record. If the function returns None, the record is dropped.

Parameters:

func (Callable[[dict], dict]) – A function that takes a record and returns a modified one.

Returns:

A new Flow with the transformed records.

Return type:

Flow

opta = <penaltyblog.matchflow.contrib.opta.Opta object>#
pipe(func: Callable[[Flow], Flow]) Flow[source]#

Lazily apply a function to this Flow and return the resulting Flow. The function will be executed at collect-time, not immediately.

The function should return a new Flow, typically using this one as input.

pivot(index: str | list[str], columns: str, values: str) Flow[source]#

Pivot records: turn row values into columns.

Parameters:
  • index (str or list[str]) – Fields to group by.

  • columns (str) – Field whose values become column names.

  • values (str) – Field whose values fill the new columns.

Returns:

A new Flow with records pivoted into wide format.

Return type:

Flow

plot_plan(compare: bool = False)[source]#

Visualize the flow plan.

Parameters:

compare (bool) –

  • True: show two subplots (raw vs. optimized).

  • False: show a single subplot. If this Flow was constructed with optimize=True, show the optimized plan; otherwise the raw.

profile(optimize: bool | None = None, fmt: Literal['table', 'records'] = 'table')[source]#

Profile each step in the plan. Returns a report of (step_index, op_name, time_s, rows_emitted).

Parameters:
  • optimize – whether to optimize the plan (default = self.optimize)

  • fmt – ‘table’ to print a table, ‘records’ to return the raw list of dicts.

query(expr: str)[source]#

Filter rows using query string

Parameters:

expr (str) – Query string

Returns:

A new Flow with the filtered records.

Return type:

Flow

Examples

# Basic comparisons flow.query(“age > 30 and name == ‘Phil Foden’”)

# Using variables with @ player = “Mohamed Salah” flow.query(“type.name == ‘Shot’ and player.name == @player”)

# Date filtering flow.query(“date > datetime(2024, 1, 1)”) cutoff_date = datetime(2024, 6, 15) flow.query(“match_date >= @cutoff_date”)

# String operations flow.query(“name.contains(‘son’) and status == ‘active’”)

# Regular expression matching flow.query(“name.regex(‘^[A-Z][a-z]+$’)”) # Names starting with capital letter flow.query(“name.match(’d{4}’, 0)”) # Contains 4 digits in a row

rename(**mapping: str)[source]#

Rename keys in each record according to mapping of old=new. :param mapping: The mapping of old keys to new keys. :type mapping: dict[str, str]

Returns:

A new Flow with renamed keys.

Return type:

Flow

sample_fraction(p: float, seed: int | None = None) Flow[source]#

Lazily sample a fraction of records.

Parameters:
  • p (float) – Sampling probability (0 < p < 1).

  • seed (int, optional) – Random seed.

Returns:

A new Flow with sampling applied.

Return type:

Flow

sample_n(n: int, seed: int | None = None) Flow[source]#

Lazily sample n records using reservoir sampling.

Parameters:
  • n (int) – Number of records to sample.

  • seed (int, optional) – Random seed.

Returns:

A new Flow with sampling applied.

Return type:

Flow

schema(n=100) dict[str, type][source]#

Infer the schema of the flow.

Parameters:

n (int) – Number of records to sample for schema inference.

Returns:

The inferred schema.

Return type:

dict[str, type]

select(*fields: str) Flow[source]#

Select specific fields from each record. :param *fields: The fields to select. :type *fields: str

Returns:

A new Flow with selected fields.

Return type:

Flow

show(n: int = 5, format: Literal['table', 'record'] = 'table')[source]#

Print the first n records in a pretty format.

Parameters:
  • n (int) – Number of records to show.

  • format (Literal["table", "record"]) – Format to use for display.

sort_by(*keys: str, ascending: bool | list[bool] = True) Flow[source]#

Sort records by one or more fields.

Parameters:
  • *keys (str) – Field names to sort by.

  • ascending (bool or list[bool], optional) – Sort order(s). Either a single bool applied to all keys or a list of bools (one per key).

Returns:

A new Flow with sorted records.

Return type:

Flow

split_array(field: str, into: list[str]) Flow[source]#

Split an array field into separate fields by index.

Parameters:
  • field (str) – The field containing a list/array.

  • into (list[str]) – The names of the output fields.

Returns:

A new Flow with the array split into separate fields.

Return type:

Flow

statsbomb = <penaltyblog.matchflow.contrib.statsbomb.StatsBomb object>#
summary(aggregators: Callable | dict[str, Any]) Flow[source]#

Supports: - Callable (e.g. lambda rows: {…}) - Dict of {alias: callable} - Dict of {alias: “name”} or (name/callable, field)

to_json(path: str, indent=4, storage_options: Dict[str, Any] | None = None)[source]#

Write the flow to a JSON file (as a list of records).

Parameters:
  • path (str) – The path to the JSON file.

  • indent (int, optional) – The number of spaces to use for indentation.

  • storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

to_jsonl(path: str, storage_options: Dict[str, Any] | None = None)[source]#

Write the flow to a JSONL file (one record per line).

Parameters:
  • path (str) – The path to the JSONL file.

  • storage_options (dict, optional) – Additional options for cloud storage backends. For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

to_pandas() DataFrame[source]#

Collect the flow into a pandas DataFrame.

Returns:

The collected records as a DataFrame.

Return type:

pd.DataFrame

with_schema(schema: dict[str, type | Callable[[Any], Any]], strict: bool = False, drop_extra: bool = False) Flow[source]#

Cast fields to specified types/functions and optionally validate or prune fields.

Parameters:
  • schema (dict) – Mapping of field names (dot paths allowed) to types or casting functions.

  • strict (bool) – If True, raise an error on cast failure. Otherwise, fallback to original value.

  • drop_extra (bool) – If True, only retain fields explicitly listed in the schema.

Returns:

A new Flow with schema enforcement applied.

Return type:

Flow

class penaltyblog.matchflow.group.FlowGroup(plan: List[Dict[str, Any]], optimize: bool = False)[source]#

Bases: object

A FlowGroup is a collection of records grouped by one or more keys.

collect() List[Dict[str, Any]][source]#

Collect the FlowGroup into a list of records.

Returns:

The collected records.

Return type:

List[Dict[str, Any]]

cumulative(field: str, alias: str | None = None) Flow[source]#

Apply group-cumulative without eagerly optimizing previous steps.

Parameters:
  • field (str) – The field to cumulative.

  • alias (Optional[str], optional) – The alias for the cumulative field. Defaults to None.

Returns:

A new Flow with the group-cumulative applied.

Return type:

Flow

explain(optimize: bool | None = None, compare: bool = False)[source]#

Explain the plan.

Returns:

None

plot_plan(compare: bool = False)[source]#

Visualize the flow group plan.

Parameters:

compare (bool) –

  • True: show two subplots (raw vs. optimized).

  • False: show a single subplot. If this FlowGroup was constructed with optimize=True, show the optimized plan; otherwise the raw.

rolling_summary(window, aggregators, time_field=None, min_periods=1, step=None)[source]#

Lazily apply a rolling summary within each group in a FlowGroup.

This appends a ‘group_rolling_summary’ step to the plan and returns a new Flow.

Parameters:
  • window (int | str) – Number of records or time window (e.g., ‘5m’).

  • aggregators (dict) – Mapping of output field -> (agg_func_name, field).

  • time_field (str | None) – Field used for time-based windows.

  • min_periods (int) – Minimum rows needed to compute result.

  • step (int | None) – Step size for window movement.

Returns:

A new Flow with a group_rolling_summary step added.

Return type:

Flow

select(*fields: str) FlowGroup[source]#

Select specific fields from each record.

Parameters:

*fields (str) – The fields to select.

Returns:

A new FlowGroup with selected fields.

Return type:

FlowGroup

sort_by(*keys: str, ascending: bool = True) FlowGroup[source]#

Sort the groups by one or more fields.

Parameters:
  • *keys (str) – Field names to sort by.

  • ascending (bool or list[bool], optional) – Sort order(s). Either a single bool applied to all keys or a list of bools (one per key).

Returns:

A new FlowGroup with sorted groups.

Return type:

FlowGroup

summary(aggregators: Callable | Dict[str, Any]) Flow[source]#

Apply group-summary without eagerly optimizing previous steps.

Parameters:

aggregators (Union[Callable, Dict[str, Any]]) – The aggregators to apply.

Returns:

A new Flow with the group-summary applied.

Return type:

Flow

time_bucket(freq: str, aggregators: dict[str, Any], time_field: str, label: Literal['left', 'right'] = 'left', bucket_name: str = 'bucket') Flow[source]#

Create fixed (non-overlapping) time buckets of size freq (e.g. “5m”) and aggregate each bucket.

Parameters:
  • freq (str) – e.g. “5m”, “10m”, “1h”, “30s”. Must end with ‘s’,’m’,’h’, or ‘d’.

  • aggregators (dict) – mapping output_field -> (agg_name_or_callable, input_field)

  • time_field (str) – dot-path to a field that is either a timedelta or a datetime in each record.

  • label (str) – “left” (default) → bucket labeled at the interval’s start; “right” → label at interval’s end.

  • bucket_name (str) – Name of the output field for the bucket label. Defaults to “bucket”.

Returns:

a new Flow whose records are one row per bucket per group, with the bucket label

and aggregated values.

Return type:

Flow

to_flow() Flow[source]#

Convert the FlowGroup to a Flow.

Returns:

A new Flow with the same plan and optimize flag.

Return type:

Flow

class penaltyblog.matchflow.executor.FlowExecutor(plan: List[Dict[str, Any]])[source]#

Bases: object

execute()[source]#
penaltyblog.matchflow.executor.is_materializing_op(op_name: str) bool[source]#

Returns True if the operation is expected to materialize or buffer records in memory (i.e., requires full dataset to proceed).

class penaltyblog.matchflow.optimizer.FlowOptimizer(plan)[source]#

Bases: object

Optimizer for a flow plan.

Performs conservative optimizations: it fuses simple operations, pushes down filters, limits, and select/drop operations only when provably safe, and eliminates redundant steps.

FIELD_USAGE_HANDLERS = {'assign': <function FlowOptimizer.<lambda>>, 'cast': <function FlowOptimizer.<lambda>>, 'drop': <function FlowOptimizer.<lambda>>, 'dropna': <function FlowOptimizer.<lambda>>, 'filter': <function FlowOptimizer.<lambda>>, 'group_by': <function FlowOptimizer.<lambda>>, 'group_rolling_summary': <function FlowOptimizer.<lambda>>, 'join': <function FlowOptimizer.<lambda>>, 'rename': <function FlowOptimizer.<lambda>>, 'select': <function FlowOptimizer.<lambda>>, 'sort': <function FlowOptimizer.<lambda>>}#
MAX_PASSES = 5#
optimize()[source]#

Helpers for handling a streaming data pipeline, specifically the Flow class.

penaltyblog.matchflow.helpers.get_field(path: str, default: Any | None = None) Callable[[dict[Any, Any] | None], Any][source]#

Safely access a nested field using dot notation.

Parameters:
  • path (str) – Dot-separated path to a field, e.g. “player.name”.

  • default (Any, optional) – Value to return if the path is invalid.

Returns:

A function that retrieves the field value or default.

Return type:

Callable[[dict], Any]

Example

>>> f = get_field("player.name")
>>> f({"player": {"name": "Bukayo Saka"}})  # → "Bukayo Saka"
penaltyblog.matchflow.helpers.get_index(path: str, index: int, default: Any | None = None) Callable[[dict[Any, Any] | None], Any][source]#

Safely access an index in a nested list using dot-separated path.

Parameters:
  • path (str) – Dot-separated path to a list field, e.g. “pass.end_location”.

  • index (int) – The index to extract from the list.

  • default (Any, optional) – Value to return if the path is invalid or index out of bounds.

Returns:

A function that retrieves the indexed value or default.

Return type:

Callable[[dict], Any]

Example

>>> f = get_index("pass.end_location", 0)
>>> f({"pass": {"end_location": [100, 40]}})  # → 100
penaltyblog.matchflow.helpers.resolve_path(record: dict, path: str, default=None)[source]#

Safely access a nested field using dot notation.

Parameters:
  • record (dict) – The record to resolve the path from.

  • path (str) – The path to resolve.

  • default (Any, optional) – Value to return if the path is invalid.

Returns:

The resolved value or default if not found.

Return type:

Any

Example

>>> resolve_path({"player": {"name": "Bukayo Saka"}}, "player.name")
"Bukayo Saka"
penaltyblog.matchflow.plotting.plot_flow_plan(plan, optimize=False, compare=False, title_prefix='')[source]#

Helper to visualize a plan (single or compare mode), optionally optimizing. optimizer_cls: class to use for optimization (must have .optimize()).

penaltyblog.matchflow.plotting.plot_plan(plan, ax, title='')[source]#
class penaltyblog.matchflow.predicates.AndPredicate(*preds: Predicate)[source]#

Bases: Predicate

class penaltyblog.matchflow.predicates.FieldPredicate(field: str, fn: Callable[[Any], bool])[source]#

Bases: Predicate

class penaltyblog.matchflow.predicates.NotPredicate(pred: Predicate)[source]#

Bases: Predicate

class penaltyblog.matchflow.predicates.OrPredicate(*preds: Predicate)[source]#

Bases: Predicate

class penaltyblog.matchflow.predicates.Predicate[source]#

Bases: object

penaltyblog.matchflow.predicates_helpers.and_(*preds)[source]#
penaltyblog.matchflow.predicates_helpers.not_(pred)[source]#
penaltyblog.matchflow.predicates_helpers.or_(*preds)[source]#
penaltyblog.matchflow.predicates_helpers.where_contains(field: str, substring: str)[source]#
penaltyblog.matchflow.predicates_helpers.where_endswith(field: str, suffix: str)[source]#
penaltyblog.matchflow.predicates_helpers.where_equals(field: str, value: Any)[source]#
penaltyblog.matchflow.predicates_helpers.where_exists(field: str)[source]#
penaltyblog.matchflow.predicates_helpers.where_gt(field: str, threshold)[source]#
penaltyblog.matchflow.predicates_helpers.where_gte(field: str, threshold)[source]#
penaltyblog.matchflow.predicates_helpers.where_in(field: str, values: list)[source]#
penaltyblog.matchflow.predicates_helpers.where_is_null(field: str)[source]#
penaltyblog.matchflow.predicates_helpers.where_lt(field: str, threshold)[source]#
penaltyblog.matchflow.predicates_helpers.where_lte(field: str, threshold)[source]#
penaltyblog.matchflow.predicates_helpers.where_not_equals(field: str, value: Any)[source]#
penaltyblog.matchflow.predicates_helpers.where_not_in(field: str, values: list)[source]#
penaltyblog.matchflow.predicates_helpers.where_regex_match(field: str, pattern: str, flags: int | RegexFlag = 0)[source]#

Create a predicate that tests if a field matches a regex pattern.

Parameters:
  • field (str) – The field to check.

  • pattern (str) – The regex pattern to match against.

  • flags (int or re.RegexFlag, optional) – Regex flags (e.g., re.IGNORECASE).

Returns:

A predicate that tests if the field matches the pattern.

Return type:

FieldPredicate

penaltyblog.matchflow.predicates_helpers.where_startswith(field: str, prefix: str)[source]#
penaltyblog.matchflow.steps.group.apply_group_by(records: Iterator[dict[str, Any]], step: dict[str, Any]) Iterator[dict[str, Any]][source]#

Group records by one or more fields.

Parameters:
  • records (Iterator[dict]) – Iterator of records to group.

  • step (dict) – Step configuration dict, must include ‘keys’.

Returns:

Iterator of group dicts, each with ‘__group_key__’ and ‘__group_records__’.

Return type:

Iterator[dict]

penaltyblog.matchflow.steps.group.apply_group_cumulative(records: Iterator[dict[str, Any]], step: dict[str, Any]) Iterator[dict[str, Any]][source]#

Apply a cumulative sum to a field for each group of records.

Parameters:
  • records (Iterator[dict]) – Iterator of group dicts, each with ‘__group_key__’ and ‘__group_records__’.

  • step (dict) – Step configuration dict, must include ‘field’ and ‘alias’.

Returns:

Iterator of records with cumulative field attached.

Return type:

Iterator[dict]

penaltyblog.matchflow.steps.group.apply_group_rolling_summary(records: Iterator[dict[str, Any]], step: dict[str, Any]) Iterator[dict[str, Any]][source]#

Lazily apply a rolling summary within each group.

Two modes:
  • Count mode: window is int → last N rows

  • Time mode: window is str ending in s/m/h/d → last T seconds

In time mode, time_field must be datetime or timedelta.

Parameters:
  • records (Iterator[dict]) – Iterator of records, each a dict.

  • step (dict) – Step configuration dict, must include ‘window’, ‘aggregators’, and optionally ‘time_field’, ‘min_periods’, ‘step’, ‘__group_keys’.

Returns:

Iterator of records with rolling summary fields attached.

Return type:

Iterator[dict]

penaltyblog.matchflow.steps.group.apply_group_summary(records: Iterator[dict[str, Any]], step: dict[str, Any]) Iterator[dict[str, Any]][source]#

Apply a summary function to each group of records.

Parameters:
  • records (Iterator[dict]) – Iterator of group dicts, each with ‘__group_key__’ and ‘__group_records__’.

  • step (dict) – Step configuration dict, must include ‘agg’ and optionally ‘group_keys’.

Returns:

Iterator of summary dicts for each group.

Return type:

Iterator[dict]

penaltyblog.matchflow.steps.group.apply_group_time_bucket(records: Iterator[dict[str, Any]], step: dict[str, Any]) Iterator[dict[str, Any]][source]#

Assign each record in a group to a fixed, non-overlapping time bin.

Two modes:
  • String freq with suffix (e.g. ‘5m’, ‘1h’): requires datetime or timedelta time_field.

  • Numeric freq (int/float): buckets numeric values directly.

Parameters:
  • records (Iterator[dict]) – Iterator of group dicts, each with ‘__group_key__’ and ‘__group_records__’.

  • step (dict) – Step configuration dict, must include ‘freq’, ‘aggregators’, ‘time_field’, and optionally ‘label’, ‘bucket_name’, ‘__group_keys’.

Returns:

Iterator of records with bucket assignments and aggregated fields.

Return type:

Iterator[dict]

penaltyblog.matchflow.steps.group.get_time_window_details(window: int | float | str, time_field: str | None) Tuple[bool, int | None, float | None, datetime | None, bool][source]#

Determine the mode (count or time) and parse the window size.

Parameters:
  • window (int, float, or str) – Window size as integer/float (row count) or string (e.g. ‘5m’, ‘1h’).

  • time_field (str or None) – Name of the time field, required for time-based windows.

Returns:

(count_mode: bool, count_window: Optional[int], time_window_seconds: Optional[float], origin: Optional[datetime], is_datetime: bool)

Return type:

tuple

penaltyblog.matchflow.steps.group.parse_window_size(window_str: str) float[source]#

Parse a window size string like ‘5m’, ‘10m’, ‘1h’, ’30s’, ‘1d’ to seconds (float).

Parameters:

window_str (str) – Window size string, must end with ‘s’, ‘m’, ‘h’, or ‘d’.

Returns:

Window size in seconds.

Return type:

float

Raises:

ValueError – If the string cannot be parsed or has an unrecognized unit.

penaltyblog.matchflow.steps.source.dispatch(step) Iterator[Dict[Any, Any]][source]#
penaltyblog.matchflow.steps.source.from_concat(step) Iterator[Dict[Any, Any]][source]#

Create a Flow from a list of plans.

Parameters:

step (dict) – A dictionary containing the plans to concatenate.

Returns:

A new Flow streaming matching files.

Return type:

Iterator[dict]

penaltyblog.matchflow.steps.source.from_folder(step) Iterator[Dict[Any, Any]][source]#

Create a Flow from a folder of JSON or JSONL files.

Parameters:

step (dict) –

A dictionary containing: - path (str): The path to the folder containing the records. - storage_options (dict, optional): Additional options for cloud storage backends.

For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

Returns:

A new Flow streaming matching files.

Return type:

Iterator[dict]

penaltyblog.matchflow.steps.source.from_glob(step) Iterator[Dict[Any, Any]][source]#

Create a Flow from a glob pattern.

Parameters:

step (dict) –

A dictionary containing: - pattern (str): Glob pattern (e.g., “data/**/*.json”). - storage_options (dict, optional): Additional options for cloud storage backends.

For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

Returns:

A new Flow streaming matching files.

Return type:

Iterator[dict]

penaltyblog.matchflow.steps.source.from_json(step) Iterator[Dict[Any, Any]][source]#

Create a Flow from a JSON file.

Parameters:

step (dict) –

A dictionary containing: - path (str): The path to the JSON file. - storage_options (dict, optional): Additional options for cloud storage backends.

For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

Returns:

A new Flow streaming matching files.

Return type:

Iterator[dict]

penaltyblog.matchflow.steps.source.from_jsonl(step) Iterator[Dict[Any, Any]][source]#

Create a Flow from a JSONL file.

Parameters:

step (dict) –

A dictionary containing: - path (str): The path to the JSONL file. - storage_options (dict, optional): Additional options for cloud storage backends.

For S3: {“key”: “access_key”, “secret”: “secret_key”, “endpoint_url”: “url”} For GCS: {“token”: “path/to/token.json”} For Azure: {“account_name”: “name”, “account_key”: “key”}

Returns:

A new Flow streaming matching files.

Return type:

Iterator[dict]

penaltyblog.matchflow.steps.source.json_load(f)[source]#
penaltyblog.matchflow.steps.source.json_loads(b)[source]#
penaltyblog.matchflow.steps.transform.apply_assign(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Assign new fields to each record.

Parameters:
  • records – A stream of records to assign fields to.

  • step – Configuration dict containing the fields to assign.

Returns:

A stream of records with assigned fields.

penaltyblog.matchflow.steps.transform.apply_distinct(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#
penaltyblog.matchflow.steps.transform.apply_drop(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Drop specified fields from each record.

Parameters:
  • records – A stream of records to drop fields from.

  • step – Configuration dict containing the keys to drop.

Returns:

A stream of records with dropped fields.

penaltyblog.matchflow.steps.transform.apply_dropna(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Drop records with missing values.

Parameters:
  • records – A stream of records to drop missing values from.

  • step – Configuration dict containing the fields to drop missing values from.

Returns:

A stream of records with dropped records.

penaltyblog.matchflow.steps.transform.apply_explode(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Explode records based on a list of fields.

Parameters:
  • records – A stream of records to explode.

  • step – Configuration dict containing the fields to explode.

Returns:

A stream of exploded records.

penaltyblog.matchflow.steps.transform.apply_filter(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Filter records based on a predicate.

Parameters:
  • records – A stream of records to filter.

  • step – Configuration dict containing the predicate to apply.

Returns:

A stream of filtered records.

penaltyblog.matchflow.steps.transform.apply_flatten(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Flatten nested dictionaries into a single-level dictionary using dot notation.

Parameters:
  • records – A stream of records to flatten.

  • step – Configuration dict containing the keys to flatten.

Returns:

A stream of flattened records.

penaltyblog.matchflow.steps.transform.apply_fused(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Apply a fused sequence of map/assign/filter operations.

Parameters:
  • records – A stream of records to apply fused operations to.

  • step – Configuration dict with an ‘ops’ list and potentially embedded steps.

Returns:

A stream of records with the fused operations applied.

penaltyblog.matchflow.steps.transform.apply_join(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Join records based on a list of keys. Dispatcher function that selects the appropriate join strategy.

Parameters:
  • records – A stream of records to join.

  • step – Configuration dict containing the keys to join by.

Returns:

A stream of joined records.

penaltyblog.matchflow.steps.transform.apply_limit(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Limit the number of records.

Parameters:
  • records – A stream of records to limit.

  • step – Configuration dict containing the count to limit by.

Returns:

A stream of limited records.

penaltyblog.matchflow.steps.transform.apply_map(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Apply a function to each record.

Parameters:
  • records – A stream of records to apply the function to.

  • step – Configuration dict containing the function to apply.

Returns:

A stream of mapped records.

penaltyblog.matchflow.steps.transform.apply_pivot(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Pivot records based on a list of index fields.

Parameters:
  • records – A stream of records to pivot.

  • step – Configuration dict containing the index fields to pivot by.

Returns:

A stream of pivoted records.

penaltyblog.matchflow.steps.transform.apply_rename(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Rename fields in each record.

Parameters:
  • records – A stream of records to rename fields in.

  • step – Configuration dict containing the mapping of old to new field names.

Returns:

A stream of records with renamed fields.

penaltyblog.matchflow.steps.transform.apply_sample_fraction(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Sample a fraction of the records.

Parameters:
  • records – A stream of records to sample a fraction from.

  • step – Configuration dict containing the fraction to sample.

Returns:

A stream of sampled records.

penaltyblog.matchflow.steps.transform.apply_sample_n(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Sample a fixed number of records.

Parameters:
  • records – A stream of records to sample from.

  • step – Configuration dict with ‘n’ and optional ‘seed’.

Returns:

A stream of sampled records.

penaltyblog.matchflow.steps.transform.apply_select(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Select specific fields from each record.

Parameters:
  • records – A stream of records to select fields from.

  • step – Configuration dict containing the fields to select.

Returns:

A stream of records with selected fields.

penaltyblog.matchflow.steps.transform.apply_sort(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Sort records based on a list of keys.

Parameters:
  • records – A stream of records to sort.

  • step – Configuration dict containing the keys to sort by.

Returns:

A stream of sorted records.

penaltyblog.matchflow.steps.transform.apply_split_array(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Split an array into multiple records.

Parameters:
  • records – A stream of records to split arrays from.

  • step – Configuration dict containing the field to split.

Returns:

A stream of split records.

penaltyblog.matchflow.steps.transform.apply_summary(records: Iterator[Dict[str, Any]], step: dict) Iterator[Dict[str, Any]][source]#

Apply a summary function to the records.

Parameters:
  • records – A stream of records to apply the summary function to.

  • step – Configuration dict containing the summary function to apply.

Returns:

A stream of summary results.

penaltyblog.matchflow.steps.utils.fast_get_field(record: dict, parts: list[str]) Any[source]#

Retrieve a nested value from a mapping or list using a pre-split path.

This is a slightly optimized helper for looking up nested values when the caller has already split a dotted path into parts. Each element of parts is used to traverse dictionaries by key. If the current value is a list and the part is a numeric string it will be treated as an integer index. If any step cannot be resolved the function returns None.

Parameters:
  • record (dict) – The mapping to search. May contain nested dicts/lists.

  • parts (list[str]) – Pre-split path components (for example ["a", "0", "b"]).

Returns:

The found value, or None if the path does not exist or an index is out of range.

Return type:

Any

Example

>>> fast_get_field({"a": [{"b": 1}]}, ["a", "0", "b"])
1
penaltyblog.matchflow.steps.utils.flatten_dict(d: dict, parent_key: str = '', sep: str = '.') dict[source]#

Flatten a nested dictionary into a single-level dictionary using dot notation.

Parameters:
  • d (dict) – The dictionary to flatten.

  • parent_key (str) – The prefix for nested keys.

  • sep (str) – The separator to use (default is “.”).

Returns:

Flattened dictionary with dot-notated keys.

Return type:

dict

penaltyblog.matchflow.steps.utils.get_field(record: dict, path: str | List[str]) Any[source]#

Retrieve a nested value from a mapping or list using dot-notation.

The path may either be a dotted string (for example "a.0.b") or a pre-split list of parts (["a", "0", "b"]). Dictionary keys are looked up by name. If a traversal step yields a list and the corresponding part is a numeric string, it will be used as a list index. Any lookup failure returns None rather than raising.

Parameters:
  • record (dict) – The mapping to search. May contain nested dicts/lists.

  • path (Union[str, List[str]]) – Dotted path or list of path components.

Returns:

The found value, or None if the path does not exist or an index is invalid.

Return type:

Any

Example

>>> get_field({"loc": [50, 40]}, "loc.0")
50
penaltyblog.matchflow.steps.utils.get_index(field: str, index: int) Callable[[dict], Any][source]#

Returns a function that extracts the `index`th value from a list field (dot path).

Example

f = get_index(“location”, 0) f({“location”: [50, 40]}) → 50

If the field or index is missing, returns None.

penaltyblog.matchflow.steps.utils.reservoir_sample(iterable, k, seed=None) list[source]#

Sample a fixed number of records from an iterable using the reservoir sampling algorithm.

Parameters:
  • iterable (iterable) – The iterable to sample from.

  • k (int) – The number of records to sample.

  • seed (int, optional) – The seed for the random number generator.

Returns:

The sampled records.

Return type:

list

penaltyblog.matchflow.steps.utils.schema(records: list[dict], sample_size: int = 50) dict[source]#

Get the schema of a list of records.

Parameters:
  • records (list[dict]) – The list of records to get the schema from.

  • sample_size (int) – The number of records to sample.

Returns:

The schema of the records.

Return type:

dict

penaltyblog.matchflow.steps.utils.set_nested_field(record: dict, path: str, value: Any) None[source]#

Set a nested field in a record using dot notation.

Parameters:
  • record (dict) – The record to modify.

  • path (str) – The path to the field (dot notation).

  • value (Any) – The value to set.

penaltyblog.matchflow.steps.utils.unify_types(types: set[type]) type[source]#

Unify a set of types into a single type.

Parameters:

types (set[type]) – The set of types to unify.

Returns:

The unified type.

Return type:

type