Source code for penaltyblog.matchflow.steps.source

import os
from typing import TYPE_CHECKING, Any, Dict, Iterator, cast

if TYPE_CHECKING:
    from ..flow import Flow

import fsspec

from .source_opta import from_opta
from .source_statsbomb import from_statsbomb

try:
    import orjson as _json_lib_orjson

    BINARY = True

    def json_load(f):
        return _json_lib_orjson.loads(f.read())

    def json_loads(b):
        return _json_lib_orjson.loads(b)

except ImportError:
    import json as _json_lib_std

    BINARY = False

[docs] def json_load(f): return _json_lib_std.load(f)
[docs] def json_loads(b): return _json_lib_std.loads(b.decode("utf-8"))
def _handle_missing_dependency(path: str) -> None: """ Check if required cloud storage dependencies are installed and provide helpful error messages. Args: path (str): The path being accessed Raises: ImportError: If required dependency is missing """ protocol_mapping = { "s3://": "s3fs", "gs://": "gcsfs", "gcs://": "gcsfs", "azure://": "adlfs", "abfs://": "adlfs", "abfss://": "adlfs", } for protocol, package in protocol_mapping.items(): if path.startswith(protocol): try: __import__(package) except ImportError: raise ImportError( f"To access {protocol} paths, install {package}: pip install {package}" ) from None break
[docs] def dispatch(step) -> Iterator[Dict[Any, Any]]: op = step["op"] if op == "from_folder": return from_folder(step) elif op == "from_materialized": return iter(step["records"]) elif op == "from_json": return from_json(step) elif op == "from_jsonl": return from_jsonl(step) elif op == "from_statsbomb": return from_statsbomb(step) elif op == "from_opta": return from_opta(step) elif op == "from_glob": return from_glob(step) elif op == "from_concat": return from_concat(step) else: raise ValueError(f"Unsupported source op: {step['op']}")
[docs] def from_folder(step) -> Iterator[Dict[Any, Any]]: """ Create a Flow from a folder of JSON or JSONL files. Args: step (dict): A dictionary containing: - path (str): The path to the folder containing the records. - storage_options (dict, optional): Additional options for cloud storage backends. For S3: {"key": "access_key", "secret": "secret_key", "endpoint_url": "url"} For GCS: {"token": "path/to/token.json"} For Azure: {"account_name": "name", "account_key": "key"} Returns: Iterator[dict]: A new Flow streaming matching files. """ folder = step["path"] storage_options = step.get("storage_options", {}) file_exts = (".json", ".jsonl") # now explicitly includes .jsonl # Check dependencies for cloud storage _handle_missing_dependency(folder) # Use fsspec to list files in the directory fs = fsspec.filesystem( fsspec.utils.infer_storage_options(folder, **storage_options)["protocol"], **storage_options, ) for filename in fs.ls(folder, detail=False): basename = os.path.basename(filename) if not basename.endswith(file_exts): continue # Create child steps with storage_options passed through child_step = {"path": filename} if storage_options: child_step["storage_options"] = storage_options if basename.endswith(".jsonl"): yield from from_jsonl(child_step) elif basename.endswith(".json"): yield from from_json(child_step)
[docs] def from_json(step) -> Iterator[Dict[Any, Any]]: """ Create a Flow from a JSON file. Args: step (dict): A dictionary containing: - path (str): The path to the JSON file. - storage_options (dict, optional): Additional options for cloud storage backends. For S3: {"key": "access_key", "secret": "secret_key", "endpoint_url": "url"} For GCS: {"token": "path/to/token.json"} For Azure: {"account_name": "name", "account_key": "key"} Returns: Iterator[dict]: A new Flow streaming matching files. """ path = step["path"] storage_options = step.get("storage_options", {}) # Check dependencies for cloud storage _handle_missing_dependency(path) mode = "rb" if BINARY else "r" with fsspec.open(path, mode, **storage_options) as f: data = json_load(f) if isinstance(data, list): yield from data elif isinstance(data, dict): yield data else: raise ValueError("JSON file must contain either a list or a single dict")
[docs] def from_jsonl(step) -> Iterator[Dict[Any, Any]]: """ Create a Flow from a JSONL file. Args: step (dict): A dictionary containing: - path (str): The path to the JSONL file. - storage_options (dict, optional): Additional options for cloud storage backends. For S3: {"key": "access_key", "secret": "secret_key", "endpoint_url": "url"} For GCS: {"token": "path/to/token.json"} For Azure: {"account_name": "name", "account_key": "key"} Returns: Iterator[dict]: A new Flow streaming matching files. """ path = step["path"] storage_options = step.get("storage_options", {}) # Check dependencies for cloud storage _handle_missing_dependency(path) mode = "rb" if BINARY else "r" with fsspec.open(path, mode, **storage_options) as f: for line in f: if not line.strip(): continue yield json_loads(line)
[docs] def from_glob(step) -> Iterator[Dict[Any, Any]]: """ Create a Flow from a glob pattern. Args: step (dict): A dictionary containing: - pattern (str): Glob pattern (e.g., "data/**/*.json"). - storage_options (dict, optional): Additional options for cloud storage backends. For S3: {"key": "access_key", "secret": "secret_key", "endpoint_url": "url"} For GCS: {"token": "path/to/token.json"} For Azure: {"account_name": "name", "account_key": "key"} Returns: Iterator[dict]: A new Flow streaming matching files. """ pattern = step["pattern"] storage_options = step.get("storage_options", {}) # Check dependencies for cloud storage _handle_missing_dependency(pattern) # Use fsspec glob try: storage_options_for_fs = storage_options.copy() protocol = fsspec.utils.infer_storage_options( pattern, **storage_options_for_fs )["protocol"] fs = fsspec.filesystem(protocol, **storage_options_for_fs) except Exception as e: raise ValueError( f"Failed to create filesystem for pattern '{pattern}': {e}" ) from e # Get the list of files matching the pattern try: file_paths = fs.glob(pattern) except Exception as e: raise ValueError(f"Failed to glob pattern '{pattern}': {e}") from e for path in file_paths: # Skip directories if fs.isdir(path): continue # Handle path reconstruction for cloud storage if protocol != "file": if not path.startswith(f"{protocol}://"): # Extract the bucket and base path from the pattern pattern_parts = pattern.replace(f"{protocol}://", "").split("/", 1) bucket = pattern_parts[0] # If the path already starts with the bucket name, just prepend the protocol if path.startswith(f"{bucket}/"): path = f"{protocol}://{path}" else: # Otherwise, reconstruct using the full base path from the pattern if "/" in pattern: base_path = pattern.rsplit("/", 1)[0] else: base_path = pattern clean_path = path.lstrip("/") path = f"{base_path}/{clean_path}" else: # For local files, ensure we have absolute paths if not os.path.isabs(path): path = os.path.abspath(path) # Create child steps with storage_options passed through child_step = {"path": path} if storage_options: child_step["storage_options"] = storage_options if path.endswith(".jsonl"): yield from from_jsonl(child_step) elif path.endswith(".json"): yield from from_json(child_step)
[docs] def from_concat(step) -> Iterator[Dict[Any, Any]]: """ Create a Flow from a list of plans. Args: step (dict): A dictionary containing the plans to concatenate. Returns: Iterator[dict]: A new Flow streaming matching files. """ from ..executor import FlowExecutor for plan in step["plans"]: yield from FlowExecutor(plan).execute()