LCMD db logoLCMD[db]

Advanced pipelines

Step reference, custom steps, multi-CSV joins, error modes, and dataset grouping

Step reference

All steps live in lcmd_db.core.lib.importers.steps.

Fetch

StepPurpose
http(url=, filename=)Download one file over HTTP
github(repo=, ref=, filename=)Download one file from a GitHub repo by ref
materials_cloud(record_id=, files=)Download one or more files from a Materials Cloud record
local(paths=)Copy files or directories from the local filesystem (testing, vendored data)
unarchive_step(filename=)Manually extract an archive (auto-extract is on by default)

Parse

StepPurpose
read_csv(path=)Read one CSV into ctx.rows
read_csvs_merged(specs=, on=, how=)Read several CSVs, select / rename columns per file, optional row markers, then join
attach_structures(pattern=, column=, required=)Resolve a per-row structure file path
transform(fn)Apply an arbitrary (ctx, df) -> df function — escape hatch for polars surgery (see proparg.py, tm_gsspin_plus.py)
with_rows(fn)Replace ctx.rows from scratch — use when no CSV exists

Derive

Step / FieldPurpose
derive_chemistry(smiles_col=)The full SMILES-derived suite. Use this whenever you have SMILES
derive(*fields)Compose a custom subset of derived fields
MolecularFormula()Falls back to XYZ when SMILES is missing. Returns None silently if both fail
MolecularWeight()Falls back to XYZ. Default required=True raises if neither is available — pass required=False to allow nulls
CanonicalSmiles(), InchiKey(), Selfies(), FormalCharge(), WildcardCount(), NonWildcardHeavyAtomCount()Single-purpose computations

Load

StepPurpose
load_molecules(smiles_col=, ...)Insert molecules in one bulk transaction
load_reactions(participants=, ...)Insert reactions + their participants
load_fragments(type_slug=, smiles_col=, ...)Insert one fragment pool

Multi-CSV joins

read_csvs_merged handles the common case where a dataset spans several CSVs keyed by a shared identifier. Each CSVSpec controls one input.

from lcmd_db.core.lib.importers.steps import CSVSpec, read_csvs_merged

parse = read_csvs_merged(
    specs=[
        CSVSpec(path="main.csv"),
        CSVSpec(
            path="extra.csv",
            columns=["id", "extra_col"],            # avoid pulling junk columns
            rename={"extra_col": "renamed_col"},    # resolve name collisions
        ),
        CSVSpec(
            path="srs.csv",
            marker="_in_srs",                        # boolean column flagging this CSV's rows
        ),
    ],
    on="id",
    how="left",     # outer is uncommon; left preserves the main CSV's rows
)

The marker column is the canonical way to drive get_participants_fn in reactions — it's purpose-built and avoids null-checking substantive columns.

The ImportContext

Every step receives a single ImportContext. The fields you'll touch:

FieldWhat it is
ctx.rowsThe current Polars frame. None before read_csv, set thereafter
ctx.data_dirPer-subset cache directory (see data_dir)
ctx.limitThe --limit N value, or None for a full import
ctx.subsetThe Subset definition currently running
ctx.userUser credited as creator of imported rows
ctx.loggerStandard logger with the subset name baked in
ctx.report_error(stage, msg, *, row_index=None, field=None, exception=...)Records a structured error; respects the subset's on_error mode

Writing a custom step

The pipeline is just composed Step[InState, OutState] values. When the built-ins don't fit, write your own:

import polars as pl

from lcmd_db.core.lib.importers import HasRows, ImportContext, ParseError, Step

def select_unique(column: str) -> Step[HasRows, HasRows]:
    """Replace ctx.rows with the unique values of one column, renamed to 'smiles'."""
    async def _step(ctx: ImportContext) -> ImportContext:
        assert ctx.rows is not None
        if column not in ctx.rows.columns:
            raise ParseError(f"Column {column!r} not found")
        ctx.rows = (
            ctx.rows.select(pl.col(column))
            .drop_nulls()
            .unique()
            .rename({column: "smiles"})
        )
        if ctx.limit is not None:
            ctx.rows = ctx.rows.head(ctx.limit)
        return ctx

    return Step(_step)

A few rules the type system enforces:

  • Step[Empty, …] runs first (no upstream state), Step[HasRows, …] requires ctx.rows to exist
  • >> only composes when the right-hand input matches the left-hand output
  • An async step that hits the disk should defer with asyncio.to_thread — the runner overlaps I/O across pipelines

Reference example: _select_unique_column and _flp_pool in apps/backend/lcmd_db/registry/subsets/flp/__init__.py. Same pattern, in the wild.

Error modes

By default, per-row failures are collected — bad rows are dropped, the rest ship. Switch to fail-fast with on_error="raise":

my_subset = Subset(
    name="MySubset",
    ...,
    on_error="raise",   # default: "collect"
)

Use raise when:

  • You want CI to fail loudly on a bad row
  • The dataset is small enough that any drop is significant
  • You're iterating on the pipeline and want the first failure as the signal

Use collect (the default) when:

  • The dataset is large and partial success is useful
  • A few rows have known issues but you want the rest

Either way, every error is logged with row index and stage. The ProcessingResult returned by import_subset carries the activity log; the command also prints a per-stage summary at the end.

Grouping subsets into a dataset

Several subsets that share citations and metadata can be grouped into a dataset. Subclass BaseDatasetname, description, and get_subsets are abstract; citations, keywords, and metadata have empty defaults you override as needed:

apps/backend/lcmd_db/registry/datasets/my_dataset.py
from typing import Any

from lcmd_db.apps.datasets.lib.citations.registry import Author, Citation
from lcmd_db.apps.subsets.models import Subset
from lcmd_db.core.lib.datasets import BaseDataset


class MyDataset(BaseDataset):
    @property
    def name(self) -> str:
        return "MyDataset"

    @property
    def description(self) -> str:
        return "A short paragraph about the dataset."

    @property
    def citations(self) -> list[Citation]:
        return [
            Citation(
                citation_key="smith2024example",
                type="article-journal",
                title="The paper title",
                authors=[Author(family="Smith", given="Jane")],
                year=2024,
                doi="10.1234/example",
            ),
        ]

    @property
    def keywords(self) -> list[str]:
        return ["example", "tutorial"]

    @property
    def metadata(self) -> dict[str, Any]:
        return {"is_default": True}

    def get_subsets(self) -> list[str]:
        return list(
            Subset.objects.filter(name__icontains="MySubset").values_list("name", flat=True)
        )

Then register it (this is a separate registry from the subset one):

apps/backend/lcmd_db/registry/datasets/__init__.py
from lcmd_db.core.lib.datasets import registry

from .my_dataset import MyDataset

registry.register_dataset("my_dataset", MyDataset)

get_subsets runs against the live DB — name patterns or explicit lists both work. Datasets are materialized by just manage create_dataset --all. Reference example: apps/backend/lcmd_db/registry/datasets/oscar.py.

On this page