Python functions on partitions¶
Sometimes the calculation you need doesn't fit a SQL expression — a
period fit, an SED template match, a calibration, a custom variability
statistic. acid lets you run your own NumPy / SciPy / Astropy code as
a first-class step in a fluent chain, executed per partition with the
engine's parallelism and projection pushdown.
Two verbs:
with_columns(name, fn, …)— add one or more new columns computed by a Python function. The rest of the frame passes through unchanged. This is the common one.map_partitions(fn, …)— replace each partition's whole frame withfn(df) -> pl.DataFrame. Use it when the operation changes the rows themselves, not just adds columns.
with_columns — add computed columns¶
import acid
import numpy as np
acid.init("/data/hats", workers=8)
def snr(flux, flux_err):
return flux / flux_err
result = (acid.open("object")
.with_columns("snr", snr,
columns=["flux", "flux_err"],
schema="f8")
.where("snr > 5")
.to_astropy())
Three things are always required (no signature inference):
name— the output column name. Astrfor the single-column form (fnreturns one array-like), or alist[str]for the multi-column form (fnreturns a dict / tuple /pl.DataFrame).columns=— the input columnsfnreads. This drives projection pushdown across the function boundary: a UDF declaringcolumns=["flux", "flux_err"]reads only those two columns from parquet, even from a 1000-column catalog. Verify with.explain().schema=— the output dtype(s): a NumPy-style string ("f8","i8"), a{name: dtype}dict (multi-column), or apa.Schema. The declared schema makes the output names and types known at compile time, so.columnsand downstream verbs compose with zero engine I/O.
Input modes — NumPy or polars¶
mode="numpy" (the default) hands each input column to fn as a
np.ndarray. mode="polars" hands a pl.Series instead — useful when
your inputs are list columns (e.g. from a nested join) and you want
to drive them with Polars' .list.* namespace:
@acid.function — declare metadata once¶
Passing columns= / schema= at every call gets repetitive. The
@acid.function decorator attaches them to the function itself:
import acid
@acid.function(columns=["mag", "err"], schema="f8")
def snr(mag, err):
return mag / err
# columns=/schema= now ride the function — no need to repeat them:
cat.with_columns("snr", snr)
Stateful UDFs — a heavy resource, built once per worker¶
When your function needs an expensive resource — a template library, a
WCS, a trained ML model — building it on every call is wasteful, and
shipping a built instance to every worker is worse (it would ride the
pickle into every task). Decorate a class with @acid.function to
get a deferred-construction factory: __init__ runs once per worker
process and the instance is cached there; it never travels in the
task payload.
import acid
@acid.function(columns=["wavelength", "flux"], schema="int64")
class match_sed:
def __init__(self, path="/data/templates.fits"):
self.templates = read_template_library(path) # heavy; runs on the worker
def __call__(self, wavelength, flux):
return best_template_index(wavelength, flux, self.templates)
# Calling the factory captures (cls, args) into a pickle-safe handle;
# the library loads once per worker on first use, not in the parent.
cat.with_columns("template_id", match_sed("/data/templates.fits"))
map_partitions — replace the whole frame¶
When the operation changes the rows (a resampling, a per-partition
model that emits different rows), use map_partitions. The function
receives the partition's pl.DataFrame and returns its replacement;
schema= (the output schema) is required, columns= is an optional
projection-narrowing hint:
import polars as pl
def hourly_means(df: pl.DataFrame) -> pl.DataFrame:
return (df.group_by_dynamic("mjd", every="1h")
.agg(pl.col("flux").mean().alias("flux_hourly")))
cat.map_partitions(
hourly_means,
schema={"mjd": pl.Float64, "flux_hourly": pl.Float64},
)
Where these can go in a chain¶
with_columnsruns after the spine (post-join). Acrossmatch/joinafter awith_columnsis rejected — crossmatch first, then add columns. After a nested join oraggregate,with_columnscomputes over the per-object list columns (the list-in → scalar-out light-curve shape; see Light curves).map_partitionsreplaces the frame, so it changes row identity: acrossmatch/joinafter it is rejected, and so is using it after anaggregate..save()the result and re-open it if you need to join the output.- Neither may overwrite
ra/dec/_healpix_29— those define the partition layout; use a different output name.
Honest limits¶
columns=andschema=are required — there is no signature inspection. This is deliberate: the declared schema is what lets downstream verbs compose without running your function.- A
with_columnsinside a join operand (a.crossmatch(b.with_columns(...))) is honored, but awith_columnsthat produces a column then used as a crossmatch coordinate is not — the coordinates are fixed at open time. cloudpickleships your function (closures, lambdas, decorated classes) to the workers; the cloudpickled spec rides the plan once per query, never per task. A genuinely un-pickleable object (an open file handle, a live DB connection) must be built inside the function (the stateful-class pattern above is how).
See also¶
- Light curves for a list of targets — the canonical list-in → scalar-out use (epoch counts, per-object stats).
- Aggregating — the built-in decomposable aggregates; reach for a UDF when you need something not on that list.
- Writing SQL (the escape hatch) — when the calculation is expressible in SQL.