Basic Pipelines: Transforming Your Data#
Once you’ve loaded your data into a Flow, the next step is usually to clean, reshape, and enrich it.
Flow provides familiar methods like .filter(), .assign(), .select(), and .explode(), designed to work lazily over nested JSON records - no flattening or DataFrame conversion required.
📦 Example Records#
from penaltyblog.matchflow import Flow
sample_records = [
{
"event_id": 1,
"match_id": 123,
"period": 1,
"timestamp": "00:01:30.500",
"type_name": "Pass",
"player_name": "Kevin De Bruyne",
"location": [60.1, 40.3],
"pass_recipient_name": "Erling Haaland",
"pass_outcome_name": "Complete",
},
{
"event_id": 2,
"type_name": "Shot",
"player_name": "Erling Haaland",
"location": [85.5, 50.2],
"shot_xg": 0.05,
"shot_outcome_name": "Goal",
},
# More records...
]
flow = Flow.from_records(sample_records)
🎯 Selecting Fields with .select()#
player_locations = flow.select("player_name", "location").head(1)
print(player_locations)
[{'player_name': 'Kevin De Bruyne', 'location': [60.1, 40.3]}]
🧠 Accessing Nested Fields#
example = [{"a": {"b": {"c": 1}}}]
flow = Flow.from_records(example).select("a.b.c")
print(flow.head(1))
[{'a': {'b': {'c': 1}}}]
🧹 Handling Dotted Keys#
Option 1 — Flatten the record#
If your keys contain dots (e.g. “player.info.name.full”), you can:
flow.flatten().select("player.info.name.full")
Option 2 — Rename and assign#
flow.rename(**{"player.info": "player_info"})
.assign(name_full=lambda r: r["player_info"].get("name.full"))
.select("name_full")
🔍 Filtering Records#
Basic filter using a lambda#
shots = flow.filter(lambda r: r.get("type_name") == "Shot")
print(shots.select("player_name", "shot_outcome_name").collect())
Using predicate helpers#
from penaltyblog.matchflow import where_equals
goals = flow.filter(
where_equals("type_name", "Shot"),
where_equals("shot_outcome_name", "Goal"),
where_equals("player_name", "Erling Haaland")
)
print(goals.select("player_name", "shot_xg").collect())
✍️ Assigning Fields with .assign()#
half_flow = flow.assign(
half=lambda r: "First" if r.get("period") == 1 else "Second"
)
print(half_flow.select("player_name", "half").head(1))
You can also overwrite fields:
uppercase_flow = flow.assign(
player_name=lambda r: r.get("player_name", "").upper()
)
🔀 Renaming Fields with .rename()#
renamed = flow.rename(
match_id="id",
type_name="event_type"
).select("id", "event_type")
print(renamed.head(1))
🎈 Exploding Lists with .explode()#
example = [{
"event_id": 30,
"players": ["Player X", "Player Y"],
"roles": ["Passer", "Receiver"]
}]
exploded = Flow.from_records(example).explode("players", "roles")
pprint(exploded.collect())
[{'event_id': 30, 'players': 'Player X', 'roles': 'Passer'},
{'event_id': 30, 'players': 'Player Y', 'roles': 'Receiver'}]
🎯 Splitting Arrays with .split_array()#
split = flow.split_array("location", into=["x", "y"]).select("x", "y").head(1)
print(split)
[{'x': 60.1, 'y': 40.3}]
🧮 Accessing Array Elements by Index#
If a field is a list (like coordinates or player IDs), you can extract specific values using dot notation with a numeric index:
record = {"player": "Kevin De Bruyne", "location": [60.1, 40.3]}
flow = Flow.from_records([record])
To get just the X or Y value from location:
flow.select("location.0", "location.1").collect()
[{'location': {'0': 60.1, '1': 40.3}}]
Note
The numeric indexes are treated like nested keys internally, so “location.0” means “first element of location”.
If you want those values as top-level fields, just rename them:
xy = (
flow.select("location.0", "location.1")
.rename(**{
"location.0": "x",
"location.1": "y"
})
)
print(xy.collect())
[{'location': {}, 'x': 60.1, 'y': 40.3}]
✅ Summary#
These methods form the building blocks of most Flow pipelines:
.select()to pick fields.filter()to narrow your data.assign()to compute new columns.rename()to simplify field names.explode()to unpack lists.split_array()to handle coordinate fields
You chain these operations lazily and collect results only when you’re ready.
flow = (
Flow.from_records(sample_records)
.filter(where_equals("type_name", "Shot"))
.assign(xg_bin=lambda r: "High" if r.get("shot_xg", 0) > 0.1 else "Low")
.select("player_name", "xg_bin")
.show(3)
)
🚀 Next: Grouping and Summaries#
In the next section, we’ll cover .group_by() and .summary() to compute aggregates - like total xG per player or matc