Performance Notes
Memory and wall-time considerations for the ingestion pipeline. Documents the patterns we adopted to keep peak memory bounded on wide-and-tall entities (Stellantis Jeep US inventory, ~250k rows × ~70 columns at national scope), the measurements that proved them out, and a list of deferred enhancements for future runs.
Benchmarking
Local benchmarks live in benchmarks/ at the repo root. See
benchmarks/README.md for the full harness, but at a glance:
# Fast iteration scenario (NYC + 50 mi radius, ~90 s)
benchmarks/run.sh stellantis_jeep_inventory_fast <state-label>
# Canonical national scenario (~17 min)
benchmarks/run.sh stellantis_jeep_inventory <state-label>
# Compare two summary.json files
python benchmarks/compare.py BASELINE.json CANDIDATE.json
Stage grading uses two independent thresholds (yellow / red), both for
peak memory (default 6 / 8 GB) and wall-clock duration (default 10 / 20
min). Container cap is informational; grades drive pass/fail. A
STAGE_TIMEOUT_S watchdog SIGKILLs runaway stages.
Numbers stamped from a benchmark run live in the PR description that
shipped the change, not in committed JSON files — they're tightly coupled
to the host machine and day-to-day API volume, so a checked-in
"baseline" decays quickly. The git SHA + branch are embedded in every
saved summary.json for forensic lookup.
Watch out for variable upstream API throughput
The Stellantis public inventory endpoint shows uneven response time during evening US peak hours. Symptom: per-page fetch time climbs from ~0.3 s/page early in a run to ~0.6 s/page mid-run, and the largest models (1000+ pages) can take 9-10 minutes each instead of the ~3-4 minutes seen on faster days. No 4xx/5xx responses — the requests just slow down. This is wall-clock noise, not a code regression, but it can trip the duration watchdog (default 30 min/stage) on national runs during peak load.
Initially we suspected per-IP rate limiting, but a VPN comparison ruled that out: a fresh VPN egress IP saw the same slow rates immediately, with no warm-up period. The slowness is server-side, not client-targeted — most likely API saturation from consumer dealer-locator traffic during evening shopping hours. Time of day correlates more strongly than any source-IP behavior we observed.
Mitigations:
- Run during off-peak hours (US morning / midday) for the cleanest duration measurements. Memory measurements are unaffected by API throughput so they're durable regardless of run time.
- Bump
STAGE_TIMEOUT_Sto 2700-3600s on national runs during peak hours so the watchdog doesn't kill an otherwise-healthy run. - The inventory client rotates its User-Agent per fetch as
general hygiene — matches the pattern in
ai_audi.sources.scs. See_random_headersinai_stellantis.sources.inventory. Doesn't fix the throughput issue but doesn't hurt. - Production runs from Dagster Cloud Serverless run during a configurable cron window — schedule them outside US evening hours.
Memory profile per tier
The dominant pile-ups in the pre-fix state, ranked by peak contribution:
-
Transformed-tier snapshot hashing.
_snapshot_seriespreviously stringified every column and MD5-ed the concatenation. On a 70-column inventory entity that materialized a per-row string roughly the size of the whole row, twice (once for the snapshot column, once duringconcat_str). Replaced withpl.struct(*cols).hash()— a single Polars C++ pass over Arrow buffers, native handling of List/Struct columns, no per-row string materialization. -
Per-row Polars frame construction in transformed tier. Each raw inventory page (50 vehicles) produced its own
pl.from_dictscall; for Jeep at national scope ~5000 raw pages produced ~5000 tiny DataFrames inbatch_dfs. Coalesced to one frame per Arrow batch (collect_batches default ~1000 rows) — single-digitbatch_dfslength, larger amortization on eachpl.from_dicts. -
Raw-tier
list[dict]intermediate. The[make_raw_row(...) for ...]+pl.from_dicts(rows, ...)pattern held the row-dict list AND the resulting Polars frame simultaneously during construction.make_raw_row_frame(inai_core.schema) builds the frame from columnar inputs — constants likepartition_date,request_url,fetched_atare broadcast into a Series of length n instead of repeated in n separate dicts. -
serialize_json_fieldsreturning new dicts. Doubled transient memory by allocatingdict(row)per row before mutating. Now mutates in place; caller is the only owner. -
Unbounded
_extract_field_diffsaggregation. The check ran a.unique()aggregation over every diff column on every conflicting eid. On wide entities with many conflicts that allocation could explode. Capped at_MAX_DISC = 50eids — diffs beyond that are not reported in metadata anyway. -
Per-row Polars filter in consolidated field policies.
_apply_cross_source_policiesransrc_df.filter(...)once per row to look up the today-source's matching record. Replaced with a pre-built dict index keyed by(key_field, ...)tuple — O(1) lookup per row, single index per source.
Streaming patterns
Raw tier — per-sub-resource yield
StellantisInventoryRawComponent.fetch_with_upstream yields one
RawFetchResult per model year code instead of accumulating all pages
across all models into one frame. The IO manager
(_AiDbIOManager.handle_output) writes each yielded chunk via
table_writer separately — overwrite on the first chunk, append on
subsequent. Single Iceberg snapshot per partition; bounded peak memory
per chunk regardless of total fan-out.
This pattern is not promoted to ai_core — it's Stellantis-public-
inventory-specific. If another OEM hits the same scale, factor it out
into a StreamingRawSourceComponent base class in ai_core then.
Transformed tier — chunked write
TransformedEntityComponent yields a generator of pl.DataFrame slices
via _iter_chunks(df, _WRITE_CHUNK_SIZE) (default 50k rows, override
with TRANSFORMED_WRITE_CHUNK_SIZE env var). The IO manager writes each
chunk separately. The full transformed frame stays alive while chunks
iterate, but pyiceberg's encoder state is bounded per-chunk.
Universal vs OEM-specific changes
Universal (auto-applied to every OEM via ai_core)
_snapshot_seriesusespl.struct(*cols).hash()(transformed tier)._extract_field_diffscapped at_MAX_DISCeids (transformed tier).serialize_json_fieldsmutates in place (transformed tier).- Per-Arrow-batch coalescing in
transformed_asset(transformed tier). _apply_cross_source_policiespre-indexes source rows for O(1) lookup (consolidated tier).make_raw_row_framehelper available inai_core.schema.
OEM-specific (must be explicitly migrated)
-
make_raw_row_frameadoption: only Stellantis inventory + dealers are migrated. Other Stellantis raw components and all of ai_audi, ai_mercedes, ai_mb, ai_nissan still use the[make_raw_row(...) for ...] + pl.from_dicts(...)pattern. Migration is mechanical for sequential producers (catalog-style: build a list of records, convert to columnar lists, callmake_raw_row_frame). For threaded producers (e.g. ai_audiscs.py), per-rowfetched_atsemantics need to be either flattened ormake_raw_row_frameextended to accept a per-row list. -
Stellantis
dc_transformed.pyshares the per-row Polars frame construction anti-pattern with ai_core's oldtransformed_asset. It iteratespartition_raw.iter_rows(named=True)directly with no Arrow batch boundary, so the same coalescing fix needs manual chunking (e.g. flush to a frame every N rows). Has not been migrated.
Deferred enhancements
The list of changes evaluated but not landed, with rationale.
Streaming raw-tier read in transformed_asset
partition_lazy.collect_batches() in TransformedEntityComponent
iterates Arrow batches — but the underlying Polars LazyFrame.collect_batches()
materializes the full result into a list of batches before yielding,
so the entire raw partition (~5 GB at national Jeep scope) is held
during the loop. Switching to Polars streaming engine
(scan_iceberg(...).filter(...).collect(streaming=True)) could
process the filter without materializing — projected savings: ~3-5 GB
on transformed at national. Deferred because Polars streaming has
rough edges around Iceberg scans and the iteration cycle (17 min per
national run) makes debugging slow. Reconsider if memory headroom
becomes critical.
Polars-expression rewrite of _apply_cross_source_policies
The function still does df.to_dicts() → Python loop → pl.from_dicts(...).
Pre-indexing source rows (Phase 5) eliminates the worst hotspot but the
to_dicts/from_dicts round-trip remains, allocating a full Python dict
list and a fresh DataFrame on output. A full rewrite as Polars
expressions over the merged frame would eliminate this. Deferred
because the consolidated tier isn't in the current benchmark scenario,
no production hotspot is documented, and the rewrite is non-trivial
(handles multi-source ranking, today/historic distinction, per-row
policy decisions).
Vectorize Pydantic extraction as Polars expressions
This is the highest-leverage remaining optimization and the one most likely to draw skepticism from anyone with production Dagster experience. Today every OEM source module's extractor looks like this:
def extract_vehicles(response_body, request_params):
raw = data.get("vehicles") or []
return [PublicInventoryEntity.model_validate(v).model_dump() for v in raw]
For 250k vehicles per Jeep partition this is 250k model_validate
calls + 250k model_dump calls in a Python hot loop. Pydantic v2 on a
50-field model with multiple validation_alias / AliasPath mappings
and field_validator / model_validator hooks allocates substantial
transient state per record — FieldInfo lookups, ValidatorContext
objects, deeply-nested coercion calls. Hundreds of millions of object
allocations to do work that's structurally vectorizable.
Conservative estimate of a vectorized rewrite: 5-10× faster wall time on the transformed tier, 40-60% lower transient memory.
What the per-record work is actually doing (all vectorizable in Polars):
- Field renaming (
camelCase→snake_case) - Type coercion (string → float, string → int)
- Defaults / null substitution
- Computed columns (
make_cosy_image_urls,_strip_app_char, etc.) - Nested-object flattening (
price.msrp,specs.attributes→ flat columns)
The naive replacement is ugly: dozens of pl.col(...).alias(...),
pl.col(...).cast(...), pl.when(...).then(...).otherwise(...)
expressions per entity, repeated across every OEM source module. We
lose the readability of a single Pydantic class as the entity schema.
Proposed approach — keep the OO declaration, compile it to a Polars
pipeline. The Pydantic class stays as the canonical schema source of
truth (we already extract pydantic_to_polars_schema(...) from it), but
its per-record validation is replaced with a vectorized compiler that
walks the class's field metadata once at registration time and produces
a reusable Polars expression chain. Sketch:
class PublicInventoryEntity(InventoryPublicFields):
# Field types — same as today; consumed by pydantic_to_polars_schema.
# Per-field renames declared as class metadata (today's
# validation_alias) but compiled to .rename() at extraction time.
_aliases: ClassVar[dict[str, str]] = {
"lower_level_package": "llp",
"model_year_code": "assumedModelYearCode",
# ...
}
# Nested-path extractions (today's AliasPath) compiled to
# struct.field(...) chains.
_paths: ClassVar[dict[str, list[str]]] = {
"upper_level_package": ["mackevisionData", "ulp"],
# ...
}
# Pre-flatten transforms (today's @model_validator(mode="before"))
# declared as named transform classes so they can be replayed in
# a Polars context.
_transforms: ClassVar[list[Transform]] = [
FlattenStruct("price", into=["msrp", "destination", "net_price"]),
SpecsToColumns("specs.attributes", mapping=_SPEC_NAME_MAP),
# ...
]
# Per-field coercers — same set we already have, but consumed by
# the compiler instead of running per-instance.
_coerce: ClassVar[dict[str, Coercer]] = {
"msrp": Float, "horsepower": Float, "vin": StripStr, ...
}
Then a compile_extractor(EntityClass) -> Callable[[dict], pl.DataFrame]
walks the metadata once and produces a function that does the entire
extraction over a batch of dicts via Polars expressions — the same
declarative readability as Pydantic, vectorized execution underneath.
The migration is non-trivial because:
- ~10 OEM source modules need their extractors updated.
- Some
model_validator(mode="after")hooks do non-trivial work (cosy image URL generation, ccode-to-model-code derivation) that needs careful translation. - Tests for entity validation will need to validate against the vectorized output.
Deferred until either (a) memory headroom on Serverless becomes a sustained problem despite the changes already landed, or (b) wall-time on transformed tier becomes a bottleneck. We have not seen transformed duration go red in measurements so far. The architectural pattern (declaration on the entity class, vectorized execution at extract time) preserves the OO readability that makes the codebase approachable — it's not "rewrite as raw Polars" but "compile the Pydantic-style declaration to Polars."
dc_transformed coalescing migration
Apply the same per-Arrow-batch coalescing as ai_core/transformed.py
to Stellantis dc_transformed.py. Needs manual chunking since the file
iterates rows directly without Arrow batch boundaries. Deferred
because DC inventory isn't exercised in the current benchmark scenario.
Streaming RecordBatchReader IO manager path
For raw-tier components fanning out across many sub-resources,
delivering a pa.RecordBatchReader to the IO manager would let
pyiceberg tx.append() per batch within one transaction (single Iceberg
snapshot, bounded encoder state). Deferred — current per-chunk
streaming via _AiDbIOManager.handle_output already keeps memory
bounded; the additional gain is marginal for the schemas we have today.
Move intensive operations off Dagster Cloud Serverless
For sustained growth of the data, the 16 GB Serverless tier may stop being enough even with these mitigations (especially if multiple heavy assets run in parallel). Offloading the largest materializations to dedicated compute (a separate runner / GKE cluster / fargate task) is the structural answer once we exhaust per-process optimizations. Deferred — out of scope for this work, but worth tracking as the next architectural lever.
Adding a benchmark scenario
Drop a JSON file in benchmarks/configs/:
{
"name": "<scenario>",
"oem": "<oem>",
"description": "...",
"env": { "OPTIONAL_ENV_VAR": "value" },
"stages": [
{"name": "raw_x", "asset_key": "<oem>/raw/<source>", "label": "..."},
{"name": "transformed_x", "asset_key": "<oem>/transformed/<entity>_from_<source>", "label": "..."}
]
}
The optional env block sets variables on the dagster subprocess (used
by _fast variants to constrain data scope without changing source
code). Run via benchmarks/run.sh <scenario>. See the Stellantis Jeep
configs for working examples.