Advanced pipelines
Step reference, custom steps, multi-CSV joins, error modes, and dataset grouping
Step reference
All steps live in lcmd_db.core.lib.importers.steps.
Fetch
| Step | Purpose |
|---|---|
http(url=, filename=) | Download one file over HTTP |
github(repo=, ref=, filename=) | Download one file from a GitHub repo by ref |
materials_cloud(record_id=, files=) | Download one or more files from a Materials Cloud record |
local(paths=) | Copy files or directories from the local filesystem (testing, vendored data) |
unarchive_step(filename=) | Manually extract an archive (auto-extract is on by default) |
Parse
| Step | Purpose |
|---|---|
read_csv(path=) | Read one CSV into ctx.rows |
read_csvs_merged(specs=, on=, how=) | Read several CSVs, select / rename columns per file, optional row markers, then join |
attach_structures(pattern=, column=, required=) | Resolve a per-row structure file path |
transform(fn) | Apply an arbitrary (ctx, df) -> df function — escape hatch for polars surgery (see proparg.py, tm_gsspin_plus.py) |
with_rows(fn) | Replace ctx.rows from scratch — use when no CSV exists |
Derive
| Step / Field | Purpose |
|---|---|
derive_chemistry(smiles_col=) | The full SMILES-derived suite. Use this whenever you have SMILES |
derive(*fields) | Compose a custom subset of derived fields |
MolecularFormula() | Falls back to XYZ when SMILES is missing. Returns None silently if both fail |
MolecularWeight() | Falls back to XYZ. Default required=True raises if neither is available — pass required=False to allow nulls |
CanonicalSmiles(), InchiKey(), Selfies(), FormalCharge(), WildcardCount(), NonWildcardHeavyAtomCount() | Single-purpose computations |
Load
| Step | Purpose |
|---|---|
load_molecules(smiles_col=, ...) | Insert molecules in one bulk transaction |
load_reactions(participants=, ...) | Insert reactions + their participants |
load_fragments(type_slug=, smiles_col=, ...) | Insert one fragment pool |
Multi-CSV joins
read_csvs_merged handles the common case where a dataset spans several CSVs
keyed by a shared identifier. Each CSVSpec controls one input.
from lcmd_db.core.lib.importers.steps import CSVSpec, read_csvs_merged
parse = read_csvs_merged(
specs=[
CSVSpec(path="main.csv"),
CSVSpec(
path="extra.csv",
columns=["id", "extra_col"], # avoid pulling junk columns
rename={"extra_col": "renamed_col"}, # resolve name collisions
),
CSVSpec(
path="srs.csv",
marker="_in_srs", # boolean column flagging this CSV's rows
),
],
on="id",
how="left", # outer is uncommon; left preserves the main CSV's rows
)The marker column is the canonical way to drive
get_participants_fn in reactions — it's
purpose-built and avoids null-checking substantive columns.
The ImportContext
Every step receives a single ImportContext. The fields you'll touch:
| Field | What it is |
|---|---|
ctx.rows | The current Polars frame. None before read_csv, set thereafter |
ctx.data_dir | Per-subset cache directory (see data_dir) |
ctx.limit | The --limit N value, or None for a full import |
ctx.subset | The Subset definition currently running |
ctx.user | User credited as creator of imported rows |
ctx.logger | Standard logger with the subset name baked in |
ctx.report_error(stage, msg, *, row_index=None, field=None, exception=...) | Records a structured error; respects the subset's on_error mode |
Writing a custom step
The pipeline is just composed Step[InState, OutState] values. When the
built-ins don't fit, write your own:
import polars as pl
from lcmd_db.core.lib.importers import HasRows, ImportContext, ParseError, Step
def select_unique(column: str) -> Step[HasRows, HasRows]:
"""Replace ctx.rows with the unique values of one column, renamed to 'smiles'."""
async def _step(ctx: ImportContext) -> ImportContext:
assert ctx.rows is not None
if column not in ctx.rows.columns:
raise ParseError(f"Column {column!r} not found")
ctx.rows = (
ctx.rows.select(pl.col(column))
.drop_nulls()
.unique()
.rename({column: "smiles"})
)
if ctx.limit is not None:
ctx.rows = ctx.rows.head(ctx.limit)
return ctx
return Step(_step)A few rules the type system enforces:
Step[Empty, …]runs first (no upstream state),Step[HasRows, …]requiresctx.rowsto exist>>only composes when the right-hand input matches the left-hand output- An
asyncstep that hits the disk should defer withasyncio.to_thread— the runner overlaps I/O across pipelines
Reference example: _select_unique_column and _flp_pool in
apps/backend/lcmd_db/registry/subsets/flp/__init__.py. Same pattern, in the
wild.
Error modes
By default, per-row failures are collected — bad rows are dropped, the rest
ship. Switch to fail-fast with on_error="raise":
my_subset = Subset(
name="MySubset",
...,
on_error="raise", # default: "collect"
)Use raise when:
- You want CI to fail loudly on a bad row
- The dataset is small enough that any drop is significant
- You're iterating on the pipeline and want the first failure as the signal
Use collect (the default) when:
- The dataset is large and partial success is useful
- A few rows have known issues but you want the rest
Either way, every error is logged with row index and stage. The
ProcessingResult returned by import_subset carries the activity log; the
command also prints a per-stage summary at the end.
Grouping subsets into a dataset
Several subsets that share citations and metadata can be grouped into a
dataset. Subclass BaseDataset — name, description, and get_subsets
are abstract; citations, keywords, and metadata have empty defaults you
override as needed:
from typing import Any
from lcmd_db.apps.datasets.lib.citations.registry import Author, Citation
from lcmd_db.apps.subsets.models import Subset
from lcmd_db.core.lib.datasets import BaseDataset
class MyDataset(BaseDataset):
@property
def name(self) -> str:
return "MyDataset"
@property
def description(self) -> str:
return "A short paragraph about the dataset."
@property
def citations(self) -> list[Citation]:
return [
Citation(
citation_key="smith2024example",
type="article-journal",
title="The paper title",
authors=[Author(family="Smith", given="Jane")],
year=2024,
doi="10.1234/example",
),
]
@property
def keywords(self) -> list[str]:
return ["example", "tutorial"]
@property
def metadata(self) -> dict[str, Any]:
return {"is_default": True}
def get_subsets(self) -> list[str]:
return list(
Subset.objects.filter(name__icontains="MySubset").values_list("name", flat=True)
)Then register it (this is a separate registry from the subset one):
from lcmd_db.core.lib.datasets import registry
from .my_dataset import MyDataset
registry.register_dataset("my_dataset", MyDataset)get_subsets runs against the live DB — name patterns or explicit lists both
work. Datasets are materialized by just manage create_dataset --all.
Reference example: apps/backend/lcmd_db/registry/datasets/oscar.py.