Aggregating¶
You have a crossmatched table and you want one row per object (counts, means, min/max magnitudes); you have a wide source table and you want to bin sources on the sky; you want the brightest 100 things in a footprint. All of those are aggregations.
This page covers ACID's fluent aggregation surface — group_by,
aggregate, the reduction shortcuts (count/mean/…), a post-aggregate
where (the HAVING role), sort, with the acid.agg constructors — and
the small set of rules that keep aggregation fast at billion-row scale.
The SQL escape hatch covers everything the fluent verbs can do plus
the rejected shapes (window functions, non-decomposable aggregates);
see the SQL escape hatch for that.
The shortest aggregate¶
Group by a column and count:
Without a group_by call the aggregate is global — one output row,
no grouping keys:
The verbs in detail¶
Catalog.group_by(*keys) sets the grouping keys. Each key is a flat
column name, an aliased SQL expression ("floor(mag) AS mag_bin"), or
a function call. Group keys appear in the output — keys first,
Polars-style.
Catalog.aggregate(**named) runs the aggregation, naming each output
column by keyword. Each value is an acid.agg constructor:
from acid import agg
(cat.group_by("band")
.aggregate(
n = agg.count(),
n_finite = agg.count("mag"), # COUNT(mag) — non-null count
mean_mag = agg.mean("mag"),
min_mag = agg.min("mag"),
max_mag = agg.max("mag"),
std_mag = agg.std("mag"),
))
The available acid.agg constructors are exactly the decomposable
aggregates (see Why no agg.median? below):
| Constructor | What it computes |
|---|---|
agg.count() |
COUNT(*) — total rows in the group |
agg.count("col") |
COUNT(col) — non-null count of col |
agg.sum("col") |
SUM(col) |
agg.mean("col") |
mean of col (SQL AVG; SUM / COUNT) |
agg.min("col") |
MIN(col) |
agg.max("col") |
MAX(col) |
agg.std("col") |
population standard deviation (divisor n) |
agg.var("col") |
population variance (divisor n) |
agg.all("col") |
true iff every row's col is true |
agg.any("col") |
true iff any row's col is true |
agg.list("col", order_by=…) |
collect each group's col into a per-group list<T> (see List-valued aggregates) |
agg.std / agg.var are population statistics (divisor n)
Both use the population formula — divisor n, not n − 1. The SQL
STDDEV / VARIANCE (and the STDDEV_POP / VARIANCE_POP
spellings) all map to the same population formula. If you need the
sample standard deviation (divisor n − 1, the more common
astronomy default), rescale: also collect agg.count(), then
s = sigma * sqrt(n / (n - 1)). Or take the sample statistic in
Polars / Astropy after the query. The reduction shortcuts std() /
var() are population too.
There is no .having() — a post-aggregate .where(...) is HAVING.
After .aggregate(...), .where(...) filters the grouped result, referencing
the output / group-key names from the aggregate(...) call (not the underlying
row-level columns):
(cat.group_by("band")
.aggregate(n=agg.count(), mean_mag=agg.mean("mag"))
.where("n > 100")) # HAVING: groups with more than 100 rows
Reduction shortcuts¶
For a single aggregate you can skip the agg.* ceremony with the reduction
shortcuts — count, sum, mean, min, max, std, var. Global (no
group_by) they materialize and return a bare Python scalar; grouped they
return a chainable Catalog (column count / mean_<col> / …), so a following
.where(...) is still HAVING:
acid.open("object").where("mag < 18").count() # -> int (eager)
acid.open("object").mean("parallax") # -> float
(acid.open("source").group_by("band").count() # -> Catalog
.where("count > 100"))
Use the explicit .aggregate(...) when you need several stats in one pass, or
want to name the outputs.
Catalog.sort(*keys, descending=False, nulls_last=False) orders the
output. Pair it with .limit(K) for top-K; a standalone sort with
no limit is rejected at compile time (see
Top-K — the canonical pattern below).
descending / nulls_last accept a scalar (applied to every key) or a
per-key sequence:
(cat.group_by("band")
.aggregate(n=agg.count(), mean_mag=agg.mean("mag"))
.sort("n", descending=True)
.limit(10))
How aggregation runs (partial aggregation)¶
ACID's decomposable aggregates run in two phases:
- Phase 1 computes per-partition partials. For
COUNT(*)that is one row per group per partition; forAVG(col)it is a(SUM, COUNT)pair per group per partition. Phase-1 partials are kept in memory when small (spilled to scratch when they exceedinmem_row_limit). - Phase 2 combines the partials into the final per-group result.
This is why ACID does not have to write every row to disk to compute a
COUNT(*) — the partial path scales with the number of groups, not
the number of rows. It is also why the aggregate set is restricted
(see Why no agg.median? below).
Why no agg.median?¶
You will not find agg.median, agg.mode, agg.percentile, or
COUNT(DISTINCT) on either surface. ACID rejects them at analyze time
with ValidationError:
| Rejected shape | Why |
|---|---|
MEDIAN, MODE, percentiles |
Non-decomposable — needs the full row set |
COUNT(DISTINCT col) |
Non-decomposable — needs every distinct value |
SELECT DISTINCT ... |
Same — would require global materialization |
Bare GROUP BY (no aggregates) |
Same as DISTINCT |
Unbounded ORDER BY (no LIMIT) |
Full global sort — would materialize all rows |
The reason is the same in every case: each of these needs to see every row of every partition at once, which on a billion-row catalog means spilling everything to disk and reducing centrally. ACID's earlier "full-materialize fallback" path was removed precisely because it turned silent performance cliffs ("the query just stalled") into visible failures ("the query refused; here is why").
If you need a median, materialize and compute it in pandas or Polars after the query:
df = (cat.where("band = 'r'")
.select("source_id, mag")
.to_polars())
median_r = df.select(pl.col("mag").median()).item()
For a per-group median, group in Polars after the fluent path:
df = (cat.select("source_id, band, mag").to_polars())
medians = df.group_by("band").agg(pl.col("mag").median())
This trades scale for capability — fine for a few million rows; not fine for the whole sky.
Top-K — the canonical pattern¶
sort(*keys).limit(K) (fluent) or ORDER BY ... LIMIT K (SQL) is
pushed down to each partition: every partition keeps its own top-K,
phase 2 picks the global top-K from the union. The cost scales with
K and the number of partitions, not with total rows.
brightest = (obs
.group_by("field")
.aggregate(n=agg.count(), brightest=agg.min("mag"))
.sort("brightest")
.limit(100))
The same shape works without grouping — "the 100 brightest sources in the whole catalog":
This is the right shape for any "top K" question. A standalone sort
with no limit is rejected; if you actually need every row sorted,
write to a HATS catalog with .save(...) and sort the result locally.
List-valued aggregates¶
Unlike MEDIAN, collecting a group's values into a list is
decomposable — partition partials hold sub-lists, phase 2 concatenates
them — so acid supports it. This is the single-catalog light-curve
fold: group a source table by object ID, collect each measurement
column into a per-object, time-ordered list.
Two surfaces, same result. agg.list(col, order_by=…) inside
aggregate(...) names each list explicitly:
from acid import agg
per_object = (acid.open("diaSource")
.group_by("diaObjectId")
.aggregate(
mjd = agg.list("midpointMjdTai", order_by="midpointMjdTai"),
flux = agg.list("psfFlux", order_by="midpointMjdTai"),
))
# one row per object; mjd and flux are list<T>, co-sorted by mjd.
order_by= sorts the elements within each list consistently across
every agg.list sharing the key — so element i of flux is the
same source as element i of mjd. descending=True reverses it.
collect_lists(*cols, order_by=…) is the convenience form: after a
group_by, it folds every column (or a named subset) except the
group key(s) and the HEALPix index into per-group lists named after
their source column:
# fold every other column into a per-object list:
lc = (acid.open("diaSource")
.group_by("diaObjectId")
.collect_lists(order_by="midpointMjdTai"))
# or name a subset — only those are read from parquet (pushdown):
lc = (acid.open("diaSource")
.group_by("diaObjectId")
.collect_lists("midpointMjdTai", "psfFlux", order_by="midpointMjdTai"))
collect_lists is single-catalog only — for a joined frame, name
the lists explicitly with aggregate(...=agg.list(...)), or use a
nested join,
which is the join-time version of the same fold.
localized=True — the partition-local fast path¶
By default a list fold is cross-partition (phase-1 partials +
phase-2 combine), which is correct for any layout. If you know every
row sharing a key lives within one HEALPix partition's neighborhood —
the HATS nested-association layout, where a source inherits its parent
object's pixel — group_by(..., localized=True) runs the fold
partition-local (phase-1 only, no combine), which is faster:
per_object = (acid.open("diaSource")
.group_by("diaObjectId", localized=True)
.aggregate(mjd=agg.list("midpointMjdTai", order_by="midpointMjdTai")))
localized=True is an assertion, correct only when an object's
extent fits within the catalog's margin radius. A wrong assertion
splits a group that spans a partition boundary into several output rows
with partial lists — so leave it off (the cross-partition default)
unless you know the layout. It requires a _healpix_29 column and a
configured margin cache (each missing piece is a compile-time error),
allows only agg.list aggregates and plain-column keys, and rejects a
post-aggregate sort/limit (those need the cross-partition combine).
localized was briefly named copartitioned
In an earlier alpha this flag was copartitioned= (and before that
colocated=). It is now localized= — the old names are gone, no
shim. If you see copartitioned= in an old notebook, rename it.
acid.sql gotcha — GROUP BY and SELECT aliases¶
In the SQL escape hatch, you cannot group by a SELECT alias of a computed expression. The analyzer resolves the GROUP BY term to a physical column name, and a SELECT alias of an expression is not a column. Concretely:
# Broken: 'bin' is a SELECT alias of an expression, not a column.
acid.sql.query("""
SELECT CAST(mag AS BIGINT) AS bin, COUNT(*) AS n
FROM obs
GROUP BY bin
""")
# -> ValidationError: unknown column 'bin'
The fix on the SQL side is to group by the expression itself:
acid.sql.query("""
SELECT CAST(mag AS BIGINT) AS bin, COUNT(*) AS n
FROM obs
GROUP BY CAST(mag AS BIGINT)
""")
The fluent surface has no such limit — group_by("CAST(mag AS BIGINT) AS bin")
recognizes the alias, groups by the underlying expression, and names
the output column bin for you. If you find yourself needing aliased
expression keys, prefer the fluent path.
A worked science example: count Gaia sources in 1° sky bins¶
A common diagnostic — how many Gaia sources land in each 1° × 1° bin
of RA/Dec? Use floor(...) AS ... aliases in group_by:
from acid import agg
per_bin = (acid.open("gaia_dr3")
.where("phot_g_mean_mag < 18")
.group_by(
"floor(ra) AS ra_bin",
"floor(dec) AS dec_bin",
)
.aggregate(n=agg.count(), mean_mag=agg.mean("phot_g_mean_mag")))
df = per_bin.to_polars()
For per-object aggregates after a crossmatch — e.g. number of
forced-source detections per Rubin object — chain crossmatch and
group_by:
import astropy.units as u
matched = (acid.open("object")
.crossmatch(acid.open("forced_source"), radius=0.5 * u.arcsec))
per_object = (matched
.group_by("objectId")
.aggregate(
n_obs = agg.count(),
mean_mag = agg.mean("mag_b"), # _b suffix from collision
span_mjd = agg.max("mjd_b"),
)
.where("n_obs > 1"))
df = per_object.to_polars()
The _b suffix appears because the right-side mag / mjd columns
collided with same-named columns on the left and got the default
_<alias> suffix. See crossmatching catalogs
for how that suffix is chosen.
Saving an aggregate as a catalog¶
Catalog.save(path) of a global aggregate (no group_by keys)
produces a one-partition HATS catalog with the single output row. A
group_by aggregate produces a one-partition HATS catalog with one row
per group. Either way, the saved catalog is itself a normal Catalog
you can register and crossmatch against later. Worth knowing because a
follow-on crossmatch on a single-partition catalog still runs
correctly — no special-case needed on the user side.
gaia_counts = (acid.open("gaia_dr3")
.group_by("floor(ra) AS ra_bin", "floor(dec) AS dec_bin")
.aggregate(n=agg.count())
.save("/data/gaia_counts", name="gaia_counts"))
# `gaia_counts` is a normal Catalog; "gaia_counts" is also resolvable by name.
What acid.sql adds on top¶
The fluent surface covers the decomposable cases. acid.sql.query(...) adds:
- Window functions (
OVER (...)) — only on the SQL side. Rejected on both surfaces if combined with the rejected shapes above. - More flexible
HAVINGexpressions on output column names. - Subqueries / CTEs in the FROM and JOIN-RHS positions (with the restricted shape — see SQL escape hatch).
See Verbs or SQL — which should I use? in the SQL page for the decision aid.
See also¶
- Crossmatching catalogs — the most common input to an aggregation: count matched rows per anchor.
- Filtering rows —
wherecuts before grouping; a post-aggregatewhereis for after (theHAVINGrole). - SQL escape hatch — windowing, non-decomposable aggregates (rejected with a clear error), more flexible joins.
- Working with results & exporting — what comes out and how to write it to disk.