Skip to content

Aggregating

You have a crossmatched table and you want one row per object (counts, means, min/max magnitudes); you have a wide source table and you want to bin sources on the sky; you want the brightest 100 things in a footprint. All of those are aggregations.

This page covers ACID's fluent aggregation surface — group_by, aggregate, the reduction shortcuts (count/mean/…), a post-aggregate where (the HAVING role), sort, with the acid.agg constructors — and the small set of rules that keep aggregation fast at billion-row scale. The SQL escape hatch covers everything the fluent verbs can do plus the rejected shapes (window functions, non-decomposable aggregates); see the SQL escape hatch for that.

The shortest aggregate

Group by a column and count:

count_per_band.py
import acid
from acid import agg

acid.init("catalogs.yaml", workers=8)
obs = acid.open("forced_source")

per_band = (obs
            .group_by("band")
            .aggregate(n=agg.count()))

df = per_band.to_polars()
import acid

acid.init("catalogs.yaml", workers=8)
r = acid.sql.query("""
    SELECT band, COUNT(*) AS n
    FROM   forced_source
    GROUP BY band
""")
df = r.to_polars()

Without a group_by call the aggregate is global — one output row, no grouping keys:

n_total = obs.aggregate(n=agg.count()).to_polars()   # shape: (1, 1)

The verbs in detail

Catalog.group_by(*keys) sets the grouping keys. Each key is a flat column name, an aliased SQL expression ("floor(mag) AS mag_bin"), or a function call. Group keys appear in the output — keys first, Polars-style.

Catalog.aggregate(**named) runs the aggregation, naming each output column by keyword. Each value is an acid.agg constructor:

from acid import agg

(cat.group_by("band")
    .aggregate(
        n         = agg.count(),
        n_finite  = agg.count("mag"),     # COUNT(mag) — non-null count
        mean_mag  = agg.mean("mag"),
        min_mag   = agg.min("mag"),
        max_mag   = agg.max("mag"),
        std_mag   = agg.std("mag"),
    ))

The available acid.agg constructors are exactly the decomposable aggregates (see Why no agg.median? below):

Constructor What it computes
agg.count() COUNT(*) — total rows in the group
agg.count("col") COUNT(col) — non-null count of col
agg.sum("col") SUM(col)
agg.mean("col") mean of col (SQL AVG; SUM / COUNT)
agg.min("col") MIN(col)
agg.max("col") MAX(col)
agg.std("col") population standard deviation (divisor n)
agg.var("col") population variance (divisor n)
agg.all("col") true iff every row's col is true
agg.any("col") true iff any row's col is true
agg.list("col", order_by=…) collect each group's col into a per-group list<T> (see List-valued aggregates)

agg.std / agg.var are population statistics (divisor n)

Both use the population formula — divisor n, not n − 1. The SQL STDDEV / VARIANCE (and the STDDEV_POP / VARIANCE_POP spellings) all map to the same population formula. If you need the sample standard deviation (divisor n − 1, the more common astronomy default), rescale: also collect agg.count(), then s = sigma * sqrt(n / (n - 1)). Or take the sample statistic in Polars / Astropy after the query. The reduction shortcuts std() / var() are population too.

There is no .having() — a post-aggregate .where(...) is HAVING. After .aggregate(...), .where(...) filters the grouped result, referencing the output / group-key names from the aggregate(...) call (not the underlying row-level columns):

(cat.group_by("band")
    .aggregate(n=agg.count(), mean_mag=agg.mean("mag"))
    .where("n > 100"))            # HAVING: groups with more than 100 rows

Reduction shortcuts

For a single aggregate you can skip the agg.* ceremony with the reduction shortcuts — count, sum, mean, min, max, std, var. Global (no group_by) they materialize and return a bare Python scalar; grouped they return a chainable Catalog (column count / mean_<col> / …), so a following .where(...) is still HAVING:

acid.open("object").where("mag < 18").count()       # -> int (eager)
acid.open("object").mean("parallax")                 # -> float

(acid.open("source").group_by("band").count()        # -> Catalog
    .where("count > 100"))

Use the explicit .aggregate(...) when you need several stats in one pass, or want to name the outputs.

Catalog.sort(*keys, descending=False, nulls_last=False) orders the output. Pair it with .limit(K) for top-K; a standalone sort with no limit is rejected at compile time (see Top-K — the canonical pattern below). descending / nulls_last accept a scalar (applied to every key) or a per-key sequence:

(cat.group_by("band")
    .aggregate(n=agg.count(), mean_mag=agg.mean("mag"))
    .sort("n", descending=True)
    .limit(10))

How aggregation runs (partial aggregation)

ACID's decomposable aggregates run in two phases:

  • Phase 1 computes per-partition partials. For COUNT(*) that is one row per group per partition; for AVG(col) it is a (SUM, COUNT) pair per group per partition. Phase-1 partials are kept in memory when small (spilled to scratch when they exceed inmem_row_limit).
  • Phase 2 combines the partials into the final per-group result.

This is why ACID does not have to write every row to disk to compute a COUNT(*) — the partial path scales with the number of groups, not the number of rows. It is also why the aggregate set is restricted (see Why no agg.median? below).

Why no agg.median?

You will not find agg.median, agg.mode, agg.percentile, or COUNT(DISTINCT) on either surface. ACID rejects them at analyze time with ValidationError:

Rejected shape Why
MEDIAN, MODE, percentiles Non-decomposable — needs the full row set
COUNT(DISTINCT col) Non-decomposable — needs every distinct value
SELECT DISTINCT ... Same — would require global materialization
Bare GROUP BY (no aggregates) Same as DISTINCT
Unbounded ORDER BY (no LIMIT) Full global sort — would materialize all rows

The reason is the same in every case: each of these needs to see every row of every partition at once, which on a billion-row catalog means spilling everything to disk and reducing centrally. ACID's earlier "full-materialize fallback" path was removed precisely because it turned silent performance cliffs ("the query just stalled") into visible failures ("the query refused; here is why").

If you need a median, materialize and compute it in pandas or Polars after the query:

df = (cat.where("band = 'r'")
         .select("source_id, mag")
         .to_polars())

median_r = df.select(pl.col("mag").median()).item()

For a per-group median, group in Polars after the fluent path:

df = (cat.select("source_id, band, mag").to_polars())
medians = df.group_by("band").agg(pl.col("mag").median())

This trades scale for capability — fine for a few million rows; not fine for the whole sky.

Top-K — the canonical pattern

sort(*keys).limit(K) (fluent) or ORDER BY ... LIMIT K (SQL) is pushed down to each partition: every partition keeps its own top-K, phase 2 picks the global top-K from the union. The cost scales with K and the number of partitions, not with total rows.

brightest_per_field.py
brightest = (obs
             .group_by("field")
             .aggregate(n=agg.count(), brightest=agg.min("mag"))
             .sort("brightest")
             .limit(100))

The same shape works without grouping — "the 100 brightest sources in the whole catalog":

top100 = (obs
          .select("source_id, ra, dec, mag")
          .sort("mag")
          .limit(100))

This is the right shape for any "top K" question. A standalone sort with no limit is rejected; if you actually need every row sorted, write to a HATS catalog with .save(...) and sort the result locally.

List-valued aggregates

Unlike MEDIAN, collecting a group's values into a list is decomposable — partition partials hold sub-lists, phase 2 concatenates them — so acid supports it. This is the single-catalog light-curve fold: group a source table by object ID, collect each measurement column into a per-object, time-ordered list.

Two surfaces, same result. agg.list(col, order_by=…) inside aggregate(...) names each list explicitly:

lightcurve_fold.py
from acid import agg

per_object = (acid.open("diaSource")
              .group_by("diaObjectId")
              .aggregate(
                  mjd  = agg.list("midpointMjdTai", order_by="midpointMjdTai"),
                  flux = agg.list("psfFlux",        order_by="midpointMjdTai"),
              ))
# one row per object; mjd and flux are list<T>, co-sorted by mjd.

order_by= sorts the elements within each list consistently across every agg.list sharing the key — so element i of flux is the same source as element i of mjd. descending=True reverses it.

collect_lists(*cols, order_by=…) is the convenience form: after a group_by, it folds every column (or a named subset) except the group key(s) and the HEALPix index into per-group lists named after their source column:

# fold every other column into a per-object list:
lc = (acid.open("diaSource")
        .group_by("diaObjectId")
        .collect_lists(order_by="midpointMjdTai"))

# or name a subset — only those are read from parquet (pushdown):
lc = (acid.open("diaSource")
        .group_by("diaObjectId")
        .collect_lists("midpointMjdTai", "psfFlux", order_by="midpointMjdTai"))

collect_lists is single-catalog only — for a joined frame, name the lists explicitly with aggregate(...=agg.list(...)), or use a nested join, which is the join-time version of the same fold.

localized=True — the partition-local fast path

By default a list fold is cross-partition (phase-1 partials + phase-2 combine), which is correct for any layout. If you know every row sharing a key lives within one HEALPix partition's neighborhood — the HATS nested-association layout, where a source inherits its parent object's pixel — group_by(..., localized=True) runs the fold partition-local (phase-1 only, no combine), which is faster:

per_object = (acid.open("diaSource")
              .group_by("diaObjectId", localized=True)
              .aggregate(mjd=agg.list("midpointMjdTai", order_by="midpointMjdTai")))

localized=True is an assertion, correct only when an object's extent fits within the catalog's margin radius. A wrong assertion splits a group that spans a partition boundary into several output rows with partial lists — so leave it off (the cross-partition default) unless you know the layout. It requires a _healpix_29 column and a configured margin cache (each missing piece is a compile-time error), allows only agg.list aggregates and plain-column keys, and rejects a post-aggregate sort/limit (those need the cross-partition combine).

localized was briefly named copartitioned

In an earlier alpha this flag was copartitioned= (and before that colocated=). It is now localized= — the old names are gone, no shim. If you see copartitioned= in an old notebook, rename it.

acid.sql gotcha — GROUP BY and SELECT aliases

In the SQL escape hatch, you cannot group by a SELECT alias of a computed expression. The analyzer resolves the GROUP BY term to a physical column name, and a SELECT alias of an expression is not a column. Concretely:

# Broken: 'bin' is a SELECT alias of an expression, not a column.
acid.sql.query("""
    SELECT CAST(mag AS BIGINT) AS bin, COUNT(*) AS n
    FROM   obs
    GROUP BY bin
""")
# -> ValidationError: unknown column 'bin'

The fix on the SQL side is to group by the expression itself:

acid.sql.query("""
    SELECT CAST(mag AS BIGINT) AS bin, COUNT(*) AS n
    FROM   obs
    GROUP BY CAST(mag AS BIGINT)
""")

The fluent surface has no such limit — group_by("CAST(mag AS BIGINT) AS bin") recognizes the alias, groups by the underlying expression, and names the output column bin for you. If you find yourself needing aliased expression keys, prefer the fluent path.

A worked science example: count Gaia sources in 1° sky bins

A common diagnostic — how many Gaia sources land in each 1° × 1° bin of RA/Dec? Use floor(...) AS ... aliases in group_by:

sky_bins.py
from acid import agg

per_bin = (acid.open("gaia_dr3")
             .where("phot_g_mean_mag < 18")
             .group_by(
                 "floor(ra) AS ra_bin",
                 "floor(dec) AS dec_bin",
             )
             .aggregate(n=agg.count(), mean_mag=agg.mean("phot_g_mean_mag")))

df = per_bin.to_polars()

For per-object aggregates after a crossmatch — e.g. number of forced-source detections per Rubin object — chain crossmatch and group_by:

forced_per_object.py
import astropy.units as u

matched = (acid.open("object")
             .crossmatch(acid.open("forced_source"), radius=0.5 * u.arcsec))

per_object = (matched
              .group_by("objectId")
              .aggregate(
                  n_obs    = agg.count(),
                  mean_mag = agg.mean("mag_b"),    # _b suffix from collision
                  span_mjd = agg.max("mjd_b"),
              )
              .where("n_obs > 1"))

df = per_object.to_polars()

The _b suffix appears because the right-side mag / mjd columns collided with same-named columns on the left and got the default _<alias> suffix. See crossmatching catalogs for how that suffix is chosen.

Saving an aggregate as a catalog

Catalog.save(path) of a global aggregate (no group_by keys) produces a one-partition HATS catalog with the single output row. A group_by aggregate produces a one-partition HATS catalog with one row per group. Either way, the saved catalog is itself a normal Catalog you can register and crossmatch against later. Worth knowing because a follow-on crossmatch on a single-partition catalog still runs correctly — no special-case needed on the user side.

gaia_counts = (acid.open("gaia_dr3")
                 .group_by("floor(ra) AS ra_bin", "floor(dec) AS dec_bin")
                 .aggregate(n=agg.count())
                 .save("/data/gaia_counts", name="gaia_counts"))

# `gaia_counts` is a normal Catalog; "gaia_counts" is also resolvable by name.

What acid.sql adds on top

The fluent surface covers the decomposable cases. acid.sql.query(...) adds:

  • Window functions (OVER (...)) — only on the SQL side. Rejected on both surfaces if combined with the rejected shapes above.
  • More flexible HAVING expressions on output column names.
  • Subqueries / CTEs in the FROM and JOIN-RHS positions (with the restricted shape — see SQL escape hatch).

See Verbs or SQL — which should I use? in the SQL page for the decision aid.

See also