Skip to content

Python functions on partitions

Sometimes the calculation you need doesn't fit a SQL expression — a period fit, an SED template match, a calibration, a custom variability statistic. acid lets you run your own NumPy / SciPy / Astropy code as a first-class step in a fluent chain, executed per partition with the engine's parallelism and projection pushdown.

Two verbs:

  • with_columns(name, fn, …) — add one or more new columns computed by a Python function. The rest of the frame passes through unchanged. This is the common one.
  • map_partitions(fn, …) — replace each partition's whole frame with fn(df) -> pl.DataFrame. Use it when the operation changes the rows themselves, not just adds columns.

with_columns — add computed columns

snr.py
import acid
import numpy as np

acid.init("/data/hats", workers=8)

def snr(flux, flux_err):
    return flux / flux_err

result = (acid.open("object")
          .with_columns("snr", snr,
                        columns=["flux", "flux_err"],
                        schema="f8")
          .where("snr > 5")
          .to_astropy())

Three things are always required (no signature inference):

  • name — the output column name. A str for the single-column form (fn returns one array-like), or a list[str] for the multi-column form (fn returns a dict / tuple / pl.DataFrame).
  • columns= — the input columns fn reads. This drives projection pushdown across the function boundary: a UDF declaring columns=["flux", "flux_err"] reads only those two columns from parquet, even from a 1000-column catalog. Verify with .explain().
  • schema= — the output dtype(s): a NumPy-style string ("f8", "i8"), a {name: dtype} dict (multi-column), or a pa.Schema. The declared schema makes the output names and types known at compile time, so .columns and downstream verbs compose with zero engine I/O.

Input modes — NumPy or polars

mode="numpy" (the default) hands each input column to fn as a np.ndarray. mode="polars" hands a pl.Series instead — useful when your inputs are list columns (e.g. from a nested join) and you want to drive them with Polars' .list.* namespace:

def n_epochs(mjd):                 # mjd is an object-array of per-object lists
    import numpy as np
    return np.array([len(x) for x in mjd], dtype="i8")

cat.with_columns("n_epochs", n_epochs, columns=["mjd"], schema="i8")
def n_epochs(mjd):                 # mjd is a pl.Series of List
    return mjd.list.len()

cat.with_columns("n_epochs", n_epochs,
                 columns=["mjd"], schema="i8", mode="polars")

@acid.function — declare metadata once

Passing columns= / schema= at every call gets repetitive. The @acid.function decorator attaches them to the function itself:

reusable_udf.py
import acid

@acid.function(columns=["mag", "err"], schema="f8")
def snr(mag, err):
    return mag / err

# columns=/schema= now ride the function — no need to repeat them:
cat.with_columns("snr", snr)

Stateful UDFs — a heavy resource, built once per worker

When your function needs an expensive resource — a template library, a WCS, a trained ML model — building it on every call is wasteful, and shipping a built instance to every worker is worse (it would ride the pickle into every task). Decorate a class with @acid.function to get a deferred-construction factory: __init__ runs once per worker process and the instance is cached there; it never travels in the task payload.

sed_match.py
import acid

@acid.function(columns=["wavelength", "flux"], schema="int64")
class match_sed:
    def __init__(self, path="/data/templates.fits"):
        self.templates = read_template_library(path)    # heavy; runs on the worker

    def __call__(self, wavelength, flux):
        return best_template_index(wavelength, flux, self.templates)

# Calling the factory captures (cls, args) into a pickle-safe handle;
# the library loads once per worker on first use, not in the parent.
cat.with_columns("template_id", match_sed("/data/templates.fits"))

map_partitions — replace the whole frame

When the operation changes the rows (a resampling, a per-partition model that emits different rows), use map_partitions. The function receives the partition's pl.DataFrame and returns its replacement; schema= (the output schema) is required, columns= is an optional projection-narrowing hint:

resample.py
import polars as pl

def hourly_means(df: pl.DataFrame) -> pl.DataFrame:
    return (df.group_by_dynamic("mjd", every="1h")
              .agg(pl.col("flux").mean().alias("flux_hourly")))

cat.map_partitions(
    hourly_means,
    schema={"mjd": pl.Float64, "flux_hourly": pl.Float64},
)

Where these can go in a chain

  • with_columns runs after the spine (post-join). A crossmatch / join after a with_columns is rejected — crossmatch first, then add columns. After a nested join or aggregate, with_columns computes over the per-object list columns (the list-in → scalar-out light-curve shape; see Light curves).
  • map_partitions replaces the frame, so it changes row identity: a crossmatch / join after it is rejected, and so is using it after an aggregate. .save() the result and re-open it if you need to join the output.
  • Neither may overwrite ra / dec / _healpix_29 — those define the partition layout; use a different output name.

Honest limits

  • columns= and schema= are required — there is no signature inspection. This is deliberate: the declared schema is what lets downstream verbs compose without running your function.
  • A with_columns inside a join operand (a.crossmatch(b.with_columns(...))) is honored, but a with_columns that produces a column then used as a crossmatch coordinate is not — the coordinates are fixed at open time.
  • cloudpickle ships your function (closures, lambdas, decorated classes) to the workers; the cloudpickled spec rides the plan once per query, never per task. A genuinely un-pickleable object (an open file handle, a live DB connection) must be built inside the function (the stateful-class pattern above is how).

See also