Advanced pipelines
Step reference, custom steps, multi-CSV joins, error modes, and dataset grouping
Step reference
All steps live in lcmd_db.core.lib.importers.steps.
Fetch
| Step | Purpose |
|---|---|
http(url=, filename=) | Download one file over HTTP |
github(repo=, ref=, filename=) | Download one file from a GitHub repo by ref |
materials_cloud(record_id=, files=) | Download one or more files from a Materials Cloud record |
local(paths=) | Copy files or directories from the local filesystem (testing, vendored data) |
unarchive_step(filename=) | Manually extract an archive (auto-extract is on by default) |
Parse
| Step | Purpose |
|---|---|
read_csv(path=) | Read one CSV into ctx.rows |
read_csvs_merged(specs=, on=, how=) | Read several CSVs, select / rename columns per file, optional row markers, then join |
attach_structures(pattern=, column=, required=) | Resolve a per-row structure file path |
transform(fn) | Apply an arbitrary (ctx, df) -> df function — escape hatch for polars surgery (see proparg.py, tm_gsspin_plus.py) |
with_rows(fn) | Replace ctx.rows from scratch — use when no CSV exists |
Derive
| Step / Field | Purpose |
|---|---|
derive_chemistry(smiles_col=) | The full SMILES-derived suite. Use this whenever you have SMILES |
derive(*fields) | Compose a custom subset of derived fields |
MolecularFormula() | Falls back to XYZ when SMILES is missing. Returns None silently if both fail |
MolecularWeight() | Falls back to XYZ. Default required=True raises if neither is available — pass required=False to allow nulls |
CanonicalSmiles(), InchiKey(), Selfies(), FormalCharge(), WildcardCount(), NonWildcardHeavyAtomCount() | Single-purpose computations |
Load
| Step | Purpose |
|---|---|
load_molecules(smiles_col=, ...) | Insert molecules in one bulk transaction |
load_reactions(participants=, ...) | Insert reactions + their participants |
load_fragments(type_slug=, smiles_col=, ...) | Insert one fragment pool |
Multi-CSV joins
read_csvs_merged handles the common case where a dataset spans several CSVs
keyed by a shared identifier. Each CSVSpec controls one input.
from lcmd_db.core.lib.importers.steps import CSVSpec, read_csvs_merged
parse = read_csvs_merged(
specs=[
CSVSpec(path="main.csv"),
CSVSpec(
path="extra.csv",
columns=["id", "extra_col"], # avoid pulling junk columns
rename={"extra_col": "renamed_col"}, # resolve name collisions
),
CSVSpec(
path="srs.csv",
marker="_in_srs", # boolean column flagging this CSV's rows
),
],
on="id",
how="left", # outer is uncommon; left preserves the main CSV's rows
)The marker column is the canonical way to drive
get_participants_fn in reactions — it's
purpose-built and avoids null-checking substantive columns.
The ImportContext
Every step receives a single ImportContext. The fields you'll touch:
| Field | What it is |
|---|---|
ctx.rows | The current Polars frame. None before read_csv, set thereafter |
ctx.data_dir | Per-subset cache directory (see data_dir) |
ctx.limit | The --limit N value, or None for a full import |
ctx.subset | The Subset definition currently running |
ctx.user | User credited as creator of imported rows |
ctx.logger | Standard logger with the subset name baked in |
ctx.report_error(stage, msg, *, row_index=None, field=None, exception=...) | Records a structured error; respects the subset's on_error mode |
Writing a custom step
The pipeline is just composed Step[InState, OutState] values. When the
built-ins don't fit, write your own:
import polars as pl
from lcmd_db.core.lib.importers import HasRows, ImportContext, ParseError, Step
def select_unique(column: str) -> Step[HasRows, HasRows]:
"""Replace ctx.rows with the unique values of one column, renamed to 'smiles'."""
async def _step(ctx: ImportContext) -> ImportContext:
assert ctx.rows is not None
if column not in ctx.rows.columns:
raise ParseError(f"Column {column!r} not found")
ctx.rows = (
ctx.rows.select(pl.col(column))
.drop_nulls()
.unique()
.rename({column: "smiles"})
)
if ctx.limit is not None:
ctx.rows = ctx.rows.head(ctx.limit)
return ctx
return Step(_step)A few rules the type system enforces:
Step[Empty, …]runs first (no upstream state),Step[HasRows, …]requiresctx.rowsto exist>>only composes when the right-hand input matches the left-hand output- An
asyncstep that hits the disk should defer withasyncio.to_thread— the runner overlaps I/O across pipelines
Reference example: _select_unique_column and _flp_pool in
apps/backend/lcmd_db/registry/subsets/flp/__init__.py. Same pattern, in the
wild.
Error modes
By default, per-row failures are collected — bad rows are dropped, the rest
ship. Switch to fail-fast with on_error="raise":
my_subset = Subset(
name="MySubset",
...,
on_error="raise", # default: "collect"
)Use raise when:
- You want CI to fail loudly on a bad row
- The dataset is small enough that any drop is significant
- You're iterating on the pipeline and want the first failure as the signal
Use collect (the default) when:
- The dataset is large and partial success is useful
- A few rows have known issues but you want the rest
Either way, every error is logged with row index and stage. The
ProcessingResult returned by import_subset carries the activity log; the
command also prints a per-stage summary at the end.
Grouping subsets into a dataset
Several subsets that share citations and metadata can be grouped into a
dataset. A dataset is a declarative Dataset dataclass instance — the same
functional shape as a Subset. It lists its members by referencing their
Subset specs directly, so there is no database query and the grouping is
explicit. name, description, and subsets are required; citations,
keywords, and metadata are optional (metadata defaults to
{"source": "dataset_registry"}):
from lcmd_db.apps.datasets.lib.citations.registry import Author, Citation
from lcmd_db.core.lib.datasets import Dataset
from lcmd_db.registry.subsets.my_subset import my_subset
my_dataset = Dataset(
name="MyDataset",
description="A short paragraph about the dataset.",
subsets=(my_subset,),
citations=[
Citation(
citation_key="smith2024example",
type="article-journal",
title="The paper title",
authors=[Author(family="Smith", given="Jane")],
year=2024,
doi="10.1234/example",
),
],
keywords=["example", "tutorial"],
metadata={"is_default": True},
)Then register it (this is a separate registry from the subset one):
from lcmd_db.core.lib.datasets import dataset_registry as registry
from .my_dataset import my_dataset
registry.register("my_dataset", my_dataset)The subsets tuple holds the subset specs themselves, so a typo'd import fails
at startup rather than silently dropping a member, and the declared order
becomes the subset order within the dataset. Datasets are materialized by
just manage create_dataset --all. Reference example:
apps/backend/lcmd_db/registry/datasets/oscar.py.