Working with Files: Input & Output#
Flow makes it easy to load, stream, and save structured JSON data from a variety of sources. Whether you’re pulling from disk, an API, or a folder of .jsonl files — Flow provides a consistent, lazy interface for building pipelines.
📥 Loading Data into Flow#
Use Flow.from_* methods to create a new Flow from Python objects or files.
🧠 From Python Data: .from_records(...)#
from penaltyblog.matchflow import Flow
data = [{"id": 1, "value": "A"}, {"id": 2, "value": "B"}]
flow = Flow.from_records(data)
Also works with single dicts or generators:
flow = Flow.from_records({"id": 3, "value": "C"})
def gen():
for i in range(3):
yield {"id": i}
flow = Flow.from_records(gen())
Warning
If you mutate records (e.g. with .assign()), Flow modifies them in place. Use .copy() or deepcopy() to protect your originals.
📄 From JSON Lines (JSONL) File: .from_jsonl(...)#
flow = Flow.from_jsonl("data/events.jsonl")
📂 From Folder of JSON Files: .from_folder(...)#
flow = Flow.from_folder("data/events/")
Reads all .json and .jsonl files in a directory.
Each .json file must contain either:
A single dict
A list of dicts
Files are streamed one at a time - efficient for bulk ingestion.
✨ From Glob Pattern: .from_glob(...)#
flow = Flow.from_glob("data/**/*.json")
Searches recursively using glob.glob. Same behavior as .from_folder, but more flexible for matching paths and subfolders.
🧾 From JSON File (Single Object or Array): .from_json(...)#
flow = Flow.from_json("data/game.json")
Accepts a single object (as one record), or
A list of objects (as multiple records)
Note
This reads the entire file into memory. Use .from_jsonl() for streaming large datasets.
Working with Cloud Storage (S3, GCS, Azure)#
All file-based creation methods (from_json, from_jsonl, from_folder, from_glob) can read directly from cloud storage by providing the appropriate URI and storage_options.
To do this, you’ll need to install the necessary dependencies for your cloud provider:
Amazon S3: pip install penaltyblog[aws]
Google Cloud Storage: pip install penaltyblog[gcp]
Azure Data Lake / Blob Storage: pip install penaltyblog[azure]
The storage_options parameter is an optional dictionary containing your credentials if you are not storing them as environment variables.
import penaltyblog as pb
s3_options = {
"key": "YOUR_AWS_ACCESS_KEY_ID",
"secret": "YOUR_AWS_SECRET_ACCESS_KEY",
}
flow = pb.Flow.from_json("s3://my-bucket/data.json", storage_options=s3_options)
gcs_options = {"token": "path/to/your/gcs_credentials.json"}
flow = pb.Flow.from_jsonl("gs://my-gcs-bucket/data.jsonl", storage_options=gcs_options)
azure_options = {
"account_name": "YOUR_STORAGE_ACCOUNT_NAME",
"account_key": "YOUR_STORAGE_ACCOUNT_KEY",
}
flow = pb.Flow.from_folder("abfs://container/data/", storage_options=azure_options)
💾 Saving Data from a Flow#
Once your pipeline is complete, use .to_*() methods to export the result.
.to_jsonl(path)#
Write one record per line:
flow.to_jsonl("output/events.jsonl")
.to_json(path)#
Write all records as a JSON array:
flow.to_json("summary.json", indent=4)
Note
This collects the entire stream before writing.
.to_json_files(folder, by="id")#
Write each record to its own .json file:
flow.to_json_files("out/", by="event_id")
“out/123.json”
“out/456.json”
Field must be a string or something serializable to filename.
.to_pandas()#
Convert the flow to a Pandas DataFrame:
df = flow.select("player_name", "shot_xg").to_pandas()
Note
Best used after filtering/flattening to avoid deeply nested fields.
✅ Summary#
Input Options#
Source Format |
Method |
Streaming? |
Notes |
|---|---|---|---|
Python objects |
|
✅ |
Lists, dicts, or generators |
JSONL file |
|
✅ |
Efficient for large datasets |
Single JSON file |
|
❌ |
Loads entire file at once |
Folder of files |
|
✅ |
Streams one file at a time |
Glob pattern |
|
✅ |
Recursively matches files |
Output Options#
🧠 What’s Next?#
Now that you can load and save data, let’s look at inspecting, debugging, and explaining your flows using .head(), .keys(), .explain() and more.