Grouping and Aggregating Data#
After cleaning and transforming individual records, the next step in data analysis is often to summarize information across different groups.
For example:
Count the number of shots per team
Compute the average xG per player
Sum the number of passes per zone
Flow provides powerful tools for these group-based operations.
π Grouping Records: .group_by(...)#
Use .group_by(...) to define how records should be grouped. It takes one or more field names and returns a FlowGroup object - a pipeline for grouped records.
from penaltyblog.matchflow import Flow
sample_records = [
{"event_id": 1, "type_name": "Pass", "team_name": "Manchester City", "player_name": "Kevin De Bruyne"},
{"event_id": 2, "type_name": "Shot", "team_name": "Manchester City", "player_name": "Erling Haaland", "shot_xg": 0.05},
{"event_id": 3, "type_name": "Duel", "team_name": "Manchester City", "player_name": "Rodri"},
{"event_id": 4, "type_name": "Pass", "team_name": "Manchester City", "player_name": "Kevin De Bruyne"},
{"event_id": 5, "type_name": "Shot", "team_name": "Arsenal", "player_name": "Bukayo Saka", "shot_xg": 0.01},
]
flow = Flow.from_records(sample_records)
grouped = flow.group_by("team_name")
You can group by multiple keys:
flow.group_by("type_name", "player_name")
π Aggregating Groups with .summary(...)#
Once you have a group, use .summary(...) to compute one or more aggregations per group.
Example: Sum xG per team#
result = (
flow
.group_by("team_name")
.summary(total_xg=("shot_xg", "sum"))
)
print(result.collect())
Example: Shots and xG per player#
from penaltyblog.matchflow import where_equals
player_summary = (
flow
.filter(where_equals("type_name", "Shot"))
.group_by("player_name")
.summary(
total_xg=("shot_xg", "sum"),
number_of_shots="count"
)
)
print(player_summary.collect())
βοΈ Built-in Aggregation Functions#
Flow supports many built-in aggregators:
count,sum,mean,min,max,median,std,varfirst,last,mode,range,nuniqueall,any,prodCustom callables or lambdas
π§ͺ Custom Aggregation Example#
Want to calculate a custom stat, like shots on target %?
def shot_accuracy(rows):
outcomes = ["Goal", "Saved"]
shots = [r for r in rows if r.get("shot_outcome_name")]
if not shots:
return 0
return 100 * sum(r["shot_outcome_name"] in outcomes for r in shots) / len(shots)
Apply it:
result = (
flow
.filter(where_equals("type_name", "Shot"))
.group_by("player_name")
.summary(sot_percentage=shot_accuracy)
)
print(result.collect())
π Rolling and Time-Based Aggregations#
In addition to simple group summaries, Flow provides powerful tools to aggregate events across time windows. These are especially useful for event-based data like football matches, where youβd like to calculate rolling metrics or fixed interval summaries.
π Rolling Summaries: .rolling_summary(β¦)#
Rolling summaries compute an aggregation for each row, based on a moving window of previous rows. This is useful for things like:
xG over the previous 5 minutes
Cumulative passes in the last 10 events
Momentum metrics that change throughout a match
from datetime import timedelta
result = (
flow
.filter(where_equals("type_name", "Shot"))
.assign(
timestamp=lambda r: timedelta(
minutes=r["minute"],
seconds=r.get("second", 0)
)
)
.group_by("team_name")
.sort_by("timestamp") # Important: sort within groups!
.rolling_summary(
window="5m",
time_field="timestamp",
aggregators={
"xg": ("sum", "shot_xg"),
"shots": ("count", "shot_xg")
}
)
.select("team_name", "timestamp", "xg", "shots")
)
result.show()
The window can be either a time string (
"5m","30s","1h") or an integer (number of rows).Always
.sort_by()on your time field after grouping: this ensures the rolling window works as intended.The function emits one row per input row, where each aggregation is computed over the previous window of rows.
β±οΈ Fixed Time Buckets: .time_bucket(...)#
If you want non-overlapping, regular time intervals (e.g. total xG every 5 minutes), use .time_bucket(). This partitions your data into uniform windows and computes aggregates for each.
result = (
flow
.filter(where_equals("type_name", "Shot"))
.assign(
timestamp=lambda r: timedelta(
minutes=r["minute"],
seconds=r.get("second", 0)
)
)
.group_by("team_name")
.time_bucket(
freq="5m",
time_field="timestamp",
label="left", # bucket labeled at start of interval
aggregators={
"xg": ("sum", "shot_xg"),
"shots": ("count", "shot_xg")
}
)
.select("team_name", "bucket", "xg", "shots")
)
result.show()
freqdefines the bucket size (e.g."10s","5m").You donβt need to sort beforehand -
.time_bucket()handles sorting internally.The label argument controls whether the bucket timestamp refers to the start (
"left") or end ("right") of each window.The output field for the bucket timestamp defaults to βbucketβ, but can be renamed via
bucket_name="...".
π Aggregating the Whole Dataset#
You can call .summary() directly on a Flow to compute dataset-wide aggregates without grouping.
summary = (
flow
.filter(where_equals("type_name", "Shot"))
.summary(
total_xg=("shot_xg", "sum"),
avg_xg=("shot_xg", "mean"),
total_shots="count"
)
)
print(summary.head(1))
β Summary#
.group_by()groups records by field(s).summary()applies aggregations to each group (or full dataset)You can mix built-in aggregators with custom functions
Grouped flows return regular Flow objects - chain
.select(),.sort_by(), etc.
π Next Up: Joining Flows#
Learn how to join datasets together - like linking events with players or match metadata - using .join().