Skip to main content

Performance Notes

Memory and wall-time considerations for the ingestion pipeline. Documents the patterns we adopted to keep peak memory bounded on wide-and-tall entities (Stellantis Jeep US inventory, ~250k rows × ~70 columns at national scope), the measurements that proved them out, and a list of deferred enhancements for future runs.

Benchmarking

Local benchmarks live in benchmarks/ at the repo root. See benchmarks/README.md for the full harness, but at a glance:

# Fast iteration scenario (NYC + 50 mi radius, ~90 s)
benchmarks/run.sh stellantis_jeep_inventory_fast <state-label>

# Canonical national scenario (~17 min)
benchmarks/run.sh stellantis_jeep_inventory <state-label>

# Compare two summary.json files
python benchmarks/compare.py BASELINE.json CANDIDATE.json

Stage grading uses two independent thresholds (yellow / red), both for peak memory (default 6 / 8 GB) and wall-clock duration (default 10 / 20 min). Container cap is informational; grades drive pass/fail. A STAGE_TIMEOUT_S watchdog SIGKILLs runaway stages.

Numbers stamped from a benchmark run live in the PR description that shipped the change, not in committed JSON files — they're tightly coupled to the host machine and day-to-day API volume, so a checked-in "baseline" decays quickly. The git SHA + branch are embedded in every saved summary.json for forensic lookup.

Watch out for variable upstream API throughput

The Stellantis public inventory endpoint shows uneven response time during evening US peak hours. Symptom: per-page fetch time climbs from ~0.3 s/page early in a run to ~0.6 s/page mid-run, and the largest models (1000+ pages) can take 9-10 minutes each instead of the ~3-4 minutes seen on faster days. No 4xx/5xx responses — the requests just slow down. This is wall-clock noise, not a code regression, but it can trip the duration watchdog (default 30 min/stage) on national runs during peak load.

Initially we suspected per-IP rate limiting, but a VPN comparison ruled that out: a fresh VPN egress IP saw the same slow rates immediately, with no warm-up period. The slowness is server-side, not client-targeted — most likely API saturation from consumer dealer-locator traffic during evening shopping hours. Time of day correlates more strongly than any source-IP behavior we observed.

Mitigations:

  • Run during off-peak hours (US morning / midday) for the cleanest duration measurements. Memory measurements are unaffected by API throughput so they're durable regardless of run time.
  • Bump STAGE_TIMEOUT_S to 2700-3600s on national runs during peak hours so the watchdog doesn't kill an otherwise-healthy run.
  • The inventory client rotates its User-Agent per fetch as general hygiene — matches the pattern in ai_audi.sources.scs. See _random_headers in ai_stellantis.sources.inventory. Doesn't fix the throughput issue but doesn't hurt.
  • Production runs from Dagster Cloud Serverless run during a configurable cron window — schedule them outside US evening hours.

Memory profile per tier

The dominant pile-ups in the pre-fix state, ranked by peak contribution:

  1. Transformed-tier snapshot hashing. _snapshot_series previously stringified every column and MD5-ed the concatenation. On a 70-column inventory entity that materialized a per-row string roughly the size of the whole row, twice (once for the snapshot column, once during concat_str). Replaced with pl.struct(*cols).hash() — a single Polars C++ pass over Arrow buffers, native handling of List/Struct columns, no per-row string materialization.

  2. Per-row Polars frame construction in transformed tier. Each raw inventory page (50 vehicles) produced its own pl.from_dicts call; for Jeep at national scope ~5000 raw pages produced ~5000 tiny DataFrames in batch_dfs. Coalesced to one frame per Arrow batch (collect_batches default ~1000 rows) — single-digit batch_dfs length, larger amortization on each pl.from_dicts.

  3. Raw-tier list[dict] intermediate. The [make_raw_row(...) for ...] + pl.from_dicts(rows, ...) pattern held the row-dict list AND the resulting Polars frame simultaneously during construction. make_raw_row_frame (in ai_core.schema) builds the frame from columnar inputs — constants like partition_date, request_url, fetched_at are broadcast into a Series of length n instead of repeated in n separate dicts.

  4. serialize_json_fields returning new dicts. Doubled transient memory by allocating dict(row) per row before mutating. Now mutates in place; caller is the only owner.

  5. Unbounded _extract_field_diffs aggregation. The check ran a .unique() aggregation over every diff column on every conflicting eid. On wide entities with many conflicts that allocation could explode. Capped at _MAX_DISC = 50 eids — diffs beyond that are not reported in metadata anyway.

  6. Per-row Polars filter in consolidated field policies. _apply_cross_source_policies ran src_df.filter(...) once per row to look up the today-source's matching record. Replaced with a pre-built dict index keyed by (key_field, ...) tuple — O(1) lookup per row, single index per source.

Streaming patterns

Raw tier — per-sub-resource yield

StellantisInventoryRawComponent.fetch_with_upstream yields one RawFetchResult per model year code instead of accumulating all pages across all models into one frame. The IO manager (_AiDbIOManager.handle_output) writes each yielded chunk via table_writer separately — overwrite on the first chunk, append on subsequent. Single Iceberg snapshot per partition; bounded peak memory per chunk regardless of total fan-out.

This pattern is not promoted to ai_core — it's Stellantis-public- inventory-specific. If another OEM hits the same scale, factor it out into a StreamingRawSourceComponent base class in ai_core then.

Transformed tier — chunked write

TransformedEntityComponent yields a generator of pl.DataFrame slices via _iter_chunks(df, _WRITE_CHUNK_SIZE) (default 50k rows, override with TRANSFORMED_WRITE_CHUNK_SIZE env var). The IO manager writes each chunk separately. The full transformed frame stays alive while chunks iterate, but pyiceberg's encoder state is bounded per-chunk.

Universal vs OEM-specific changes

Universal (auto-applied to every OEM via ai_core)

  • _snapshot_series uses pl.struct(*cols).hash() (transformed tier).
  • _extract_field_diffs capped at _MAX_DISC eids (transformed tier).
  • serialize_json_fields mutates in place (transformed tier).
  • Per-Arrow-batch coalescing in transformed_asset (transformed tier).
  • _apply_cross_source_policies pre-indexes source rows for O(1) lookup (consolidated tier).
  • make_raw_row_frame helper available in ai_core.schema.

OEM-specific (must be explicitly migrated)

  • make_raw_row_frame adoption: only Stellantis inventory + dealers are migrated. Other Stellantis raw components and all of ai_audi, ai_mercedes, ai_mb, ai_nissan still use the [make_raw_row(...) for ...] + pl.from_dicts(...) pattern. Migration is mechanical for sequential producers (catalog-style: build a list of records, convert to columnar lists, call make_raw_row_frame). For threaded producers (e.g. ai_audi scs.py), per-row fetched_at semantics need to be either flattened or make_raw_row_frame extended to accept a per-row list.

  • Stellantis dc_transformed.py shares the per-row Polars frame construction anti-pattern with ai_core's old transformed_asset. It iterates partition_raw.iter_rows(named=True) directly with no Arrow batch boundary, so the same coalescing fix needs manual chunking (e.g. flush to a frame every N rows). Has not been migrated.

Deferred enhancements

The list of changes evaluated but not landed, with rationale.

Streaming raw-tier read in transformed_asset

partition_lazy.collect_batches() in TransformedEntityComponent iterates Arrow batches — but the underlying Polars LazyFrame.collect_batches() materializes the full result into a list of batches before yielding, so the entire raw partition (~5 GB at national Jeep scope) is held during the loop. Switching to Polars streaming engine (scan_iceberg(...).filter(...).collect(streaming=True)) could process the filter without materializing — projected savings: ~3-5 GB on transformed at national. Deferred because Polars streaming has rough edges around Iceberg scans and the iteration cycle (17 min per national run) makes debugging slow. Reconsider if memory headroom becomes critical.

Polars-expression rewrite of _apply_cross_source_policies

The function still does df.to_dicts() → Python loop → pl.from_dicts(...). Pre-indexing source rows (Phase 5) eliminates the worst hotspot but the to_dicts/from_dicts round-trip remains, allocating a full Python dict list and a fresh DataFrame on output. A full rewrite as Polars expressions over the merged frame would eliminate this. Deferred because the consolidated tier isn't in the current benchmark scenario, no production hotspot is documented, and the rewrite is non-trivial (handles multi-source ranking, today/historic distinction, per-row policy decisions).

Vectorize Pydantic extraction as Polars expressions

This is the highest-leverage remaining optimization and the one most likely to draw skepticism from anyone with production Dagster experience. Today every OEM source module's extractor looks like this:

def extract_vehicles(response_body, request_params):
raw = data.get("vehicles") or []
return [PublicInventoryEntity.model_validate(v).model_dump() for v in raw]

For 250k vehicles per Jeep partition this is 250k model_validate calls + 250k model_dump calls in a Python hot loop. Pydantic v2 on a 50-field model with multiple validation_alias / AliasPath mappings and field_validator / model_validator hooks allocates substantial transient state per record — FieldInfo lookups, ValidatorContext objects, deeply-nested coercion calls. Hundreds of millions of object allocations to do work that's structurally vectorizable.

Conservative estimate of a vectorized rewrite: 5-10× faster wall time on the transformed tier, 40-60% lower transient memory.

What the per-record work is actually doing (all vectorizable in Polars):

  • Field renaming (camelCasesnake_case)
  • Type coercion (string → float, string → int)
  • Defaults / null substitution
  • Computed columns (make_cosy_image_urls, _strip_app_char, etc.)
  • Nested-object flattening (price.msrp, specs.attributes → flat columns)

The naive replacement is ugly: dozens of pl.col(...).alias(...), pl.col(...).cast(...), pl.when(...).then(...).otherwise(...) expressions per entity, repeated across every OEM source module. We lose the readability of a single Pydantic class as the entity schema.

Proposed approach — keep the OO declaration, compile it to a Polars pipeline. The Pydantic class stays as the canonical schema source of truth (we already extract pydantic_to_polars_schema(...) from it), but its per-record validation is replaced with a vectorized compiler that walks the class's field metadata once at registration time and produces a reusable Polars expression chain. Sketch:

class PublicInventoryEntity(InventoryPublicFields):
# Field types — same as today; consumed by pydantic_to_polars_schema.

# Per-field renames declared as class metadata (today's
# validation_alias) but compiled to .rename() at extraction time.
_aliases: ClassVar[dict[str, str]] = {
"lower_level_package": "llp",
"model_year_code": "assumedModelYearCode",
# ...
}

# Nested-path extractions (today's AliasPath) compiled to
# struct.field(...) chains.
_paths: ClassVar[dict[str, list[str]]] = {
"upper_level_package": ["mackevisionData", "ulp"],
# ...
}

# Pre-flatten transforms (today's @model_validator(mode="before"))
# declared as named transform classes so they can be replayed in
# a Polars context.
_transforms: ClassVar[list[Transform]] = [
FlattenStruct("price", into=["msrp", "destination", "net_price"]),
SpecsToColumns("specs.attributes", mapping=_SPEC_NAME_MAP),
# ...
]

# Per-field coercers — same set we already have, but consumed by
# the compiler instead of running per-instance.
_coerce: ClassVar[dict[str, Coercer]] = {
"msrp": Float, "horsepower": Float, "vin": StripStr, ...
}

Then a compile_extractor(EntityClass) -> Callable[[dict], pl.DataFrame] walks the metadata once and produces a function that does the entire extraction over a batch of dicts via Polars expressions — the same declarative readability as Pydantic, vectorized execution underneath.

The migration is non-trivial because:

  • ~10 OEM source modules need their extractors updated.
  • Some model_validator(mode="after") hooks do non-trivial work (cosy image URL generation, ccode-to-model-code derivation) that needs careful translation.
  • Tests for entity validation will need to validate against the vectorized output.

Deferred until either (a) memory headroom on Serverless becomes a sustained problem despite the changes already landed, or (b) wall-time on transformed tier becomes a bottleneck. We have not seen transformed duration go red in measurements so far. The architectural pattern (declaration on the entity class, vectorized execution at extract time) preserves the OO readability that makes the codebase approachable — it's not "rewrite as raw Polars" but "compile the Pydantic-style declaration to Polars."

dc_transformed coalescing migration

Apply the same per-Arrow-batch coalescing as ai_core/transformed.py to Stellantis dc_transformed.py. Needs manual chunking since the file iterates rows directly without Arrow batch boundaries. Deferred because DC inventory isn't exercised in the current benchmark scenario.

Streaming RecordBatchReader IO manager path

For raw-tier components fanning out across many sub-resources, delivering a pa.RecordBatchReader to the IO manager would let pyiceberg tx.append() per batch within one transaction (single Iceberg snapshot, bounded encoder state). Deferred — current per-chunk streaming via _AiDbIOManager.handle_output already keeps memory bounded; the additional gain is marginal for the schemas we have today.

Move intensive operations off Dagster Cloud Serverless

For sustained growth of the data, the 16 GB Serverless tier may stop being enough even with these mitigations (especially if multiple heavy assets run in parallel). Offloading the largest materializations to dedicated compute (a separate runner / GKE cluster / fargate task) is the structural answer once we exhaust per-process optimizations. Deferred — out of scope for this work, but worth tracking as the next architectural lever.

Adding a benchmark scenario

Drop a JSON file in benchmarks/configs/:

{
"name": "<scenario>",
"oem": "<oem>",
"description": "...",
"env": { "OPTIONAL_ENV_VAR": "value" },
"stages": [
{"name": "raw_x", "asset_key": "<oem>/raw/<source>", "label": "..."},
{"name": "transformed_x", "asset_key": "<oem>/transformed/<entity>_from_<source>", "label": "..."}
]
}

The optional env block sets variables on the dagster subprocess (used by _fast variants to constrain data scope without changing source code). Run via benchmarks/run.sh <scenario>. See the Stellantis Jeep configs for working examples.