Skip to main content

Code Structure

Package Layout

The repo is a uv workspace monorepo. Ingestion code is split across two layers:

packages/ai_core owns the generic component base classes. It has no knowledge of any specific OEM or API.

projects/ai_<oem> (e.g. projects/ai_audi) owns everything OEM-specific: the source modules that call the APIs, the component subclasses that wire them into assets, and the YAML config that instantiates everything.

Directory layout inside an OEM project

projects/ai_audi/
└── src/ai_audi/
├── components/
│ └── raw.py ← RawSourceComponent subclasses (override fetch() and base_url)
├── sources/
│ ├── scs.py ← API client + SCSModelEntity / SCSDealerEntity subclasses
│ ├── onegraph.py ← API client + OneGraphModelEntity subclass
│ ├── pss.py ← API client + PSSDealerEntity subclass
│ ├── catalog.py
│ └── registry.py ← extractor registry: maps (source, resource, entity) → function
├── entities.py ← canonical entity base classes (Python-named fields only)
└── defs/
├── raw/ ← verbatim fetch components (one per source endpoint)
├── transformed/ ← entity extraction + deduplication
└── consolidated/ ← cross-source merge + entity refs

Component Classes

Components are importable from ai_core.components. Each class maps to one stage of the pipeline:

ClassAssets emittedPurpose
RawSourceComponent<oem>/sources/<source>_<resource> + <oem>/raw/<source>_<resource>Emits the source observability node and the verbatim fetch asset; subclass to supply fetch() and base_url
TransformedEntityComponent<oem>/transformed/<name>_<entity>Extracts a specific entity type from the verbatim raw asset via the OEM's extractor registry, then deduplicates rows by key group hashes and aggregates list fields
ConsolidatedComponent<oem>/consolidated/<entity>Merges transformed records across all sources for an entity type; tracks per-source provenance via {name}_exists columns; resolves cross-entity references via foreign_keys

RawSourceComponent is fully implemented — subclasses only override fetch() and base_url. TransformedEntityComponent is config-driven: the extractor function (looked up via the OEM registry) defines the output schema, key_fields control deduplication, and aggregations specify list-collection columns. ConsolidatedComponent is a single component per OEM that merges transformed records across all sources for all four entity types and resolves cross-entity foreign-key references.

Source Modules

Source modules in ai_<oem>/sources/ are pure Python with no Dagster imports. They contain HTTP logic, data extraction, and the Pydantic entity subclasses for that source's API shape. This keeps them testable in isolation without a Dagster context.

Each source module defines a subclass of the relevant base entity (from entities.py) that declares validation_alias / AliasPath mappings for that API's naming conventions. Parse functions call SourceSubclass.model_validate(raw_dict) and return the base entity type:

# ai_audi/sources/pss.py — entity subclass + parse function
class PSSDealerEntity(AudiDealerEntity):
dealer_id: str = Field("", validation_alias="dealerId")
display_name: str | None = Field(
None, validation_alias=AliasPath("additionalData", "displayName")
)
...

def parse_dealer_entities(dealers: list[dict[str, Any]]) -> list[AudiDealerEntity]:
return [PSSDealerEntity.model_validate(d) for d in dealers]

The base entity classes in entities.py carry only canonical Python-named fields. Source subclasses own their API's field mapping — no aliases or source-specific logic belongs in the base.

Component subclasses implement two hooks that the base build_defs calls:

class ScsInventoryRawComponent(RawSourceComponent):
@property
def base_url(self) -> str:
return scs.SEARCH_URL # single source of truth — defined in the source module

def fetch(self) -> RawFetchResult:
return scs.fetch_vehicles()

fetch() returns either a single RawFetchResult (the common case) or an Iterator[RawFetchResult] for streaming fetchers; each RawFetchResult bundles the raw pl.DataFrame and the FetchAttempt records that produced it. See Component Reference → ai_core Utilities for the dataclass definitions and instrumentation pattern.

The base class wires these into the asset graph automatically — no build_defs override needed.

Source Registry

Each OEM project exposes an extractor registry in ai_<oem>/sources/registry.py. It maps (source, resource, entity) tuples to extraction functions, and is the single lookup point TransformedEntityComponent uses at materialization time:

# ai_audi/sources/registry.py
REGISTRY: dict[tuple[str, str, str], Callable[[str, str], pl.DataFrame]] = {
("pss", "dealers", "dealers"): pss.extract_dealers,
("scs", "search", "inventory"): scs.extract_inventory,
("onegraph", "carlines", "models"): onegraph.extract_models,
("catalog", "for_model", "features"): catalog.extract_features,
}

Each extractor function lives in the corresponding source module (sources/scs.py, sources/onegraph.py, etc.) and has the signature (body: str, params: str) -> pl.DataFrame. Both arguments are raw JSON strings — the _response_body and _request_params columns from the raw Iceberg table, passed without pre-parsing. The function is responsible for calling orjson.loads internally, building entity records via Pydantic, serializing any dict/nested-model fields, and returning a typed pl.DataFrame. Module-level schema constants (pydantic_to_polars_schema and json_serialized_fields of the base entity class) are computed at import time and reused across calls.

If a (source, resource, entity) tuple is not found in the registry, the asset raises NotImplementedError with instructions on where to add it.

YAML Configuration

Components are instantiated from YAML files in defs/. Dagster's load_from_defs_folder discovers them automatically — no changes to definitions.py are needed when adding a new component.

Each file declares one component:

# defs/raw/defs.yaml  (one multi-document file per tier; components separated by ---)

type: ai_audi.components.ScsInventoryRawComponent
attributes:
oem: audi
source: scs # API name
resource: search # endpoint/query within that API
market: us
auth: public
protocol: json_rest
description: "Audi US vehicle inventory search (SCS)"
start_date: "2024-01-01"
# defs/transformed/defs.yaml
# The raw/<source>_<resource> asset is auto-wired as a data input; no deps entry needed.

type: ai_core.components.TransformedEntityComponent
attributes:
oem: audi
name: scs # forms transformed/scs_inventory
entity: inventory
start_date: "2024-01-01"
key_fields: [car_id] # fields hashed to produce the entity ID; must uniquely identify a row
entity_id: inventory_id # column name for the entity ID hash
# defs/consolidated/defs.yaml

type: ai_core.components.ConsolidatedComponent
attributes:
oem: audi
start_date: "2024-01-01"
dealers:
source_groups:
- name: pss
source: pss_dealers
priority: 1
merge:
match_on: [[mat_primary_code]]
key_fields: [mat_primary_code]
models:
source_groups:
- name: onegraph
source: onegraph_models
priority: 1
merge:
match_on: [[model_catalog_id]]
key_fields: [model_catalog_id]
features:
source_groups:
- name: catalog
source: catalog_features
priority: 1
merge:
match_on: [[model_catalog_id, pr3_id, parent_package_code]]
key_fields: [model_catalog_id, pr3_id, parent_package_code]
foreign_keys:
- entity_type: models
target: model_id
"on": [model_catalog_id]
inventory:
source_groups:
- name: scs
source: scs_inventory
priority: 1
merge:
match_on: [[vin]]
key_fields: [vin]
foreign_keys:
- entity_type: dealers
target: dealer_id
"on": [dealer_mat_primary_code>mat_primary_code]
- entity_type: models
target: model_id
"on": [model_catalog_id]

The type field is a fully-qualified Python class path. Switching from a base class to an OEM subclass is a one-line change in the YAML.

See the ConsolidatedComponent reference for full configuration options including source groups, gap fill, merge spec, and foreign keys.

Attribute validation

The ai_core component classes enforce required attributes via Pydantic field_validator definitions. If a required field is missing or empty, load_from_defs_folder raises a ValidationError. See the validators on the component classes in packages/ai_core/src/ai_core/components/ for the current set. Every OEM project's test suite validates its YAML configs by calling load_from_defs_folder — see the testing conventions in .claude/rules/testing.md.

Adding a New OEM

The platform uses a Dagster hybrid ECS deployment — code locations run as Fargate tasks in our AWS VPC, not on Dagster's managed serverless infrastructure. Adding a new OEM requires both Python scaffolding and AWS infrastructure changes in the same PR.

Run /scaffold-oem and Claude will handle the full process. For the complete checklist of every required change — Python project, Dockerfile, Terraform (S3, ECR), CI workflow, dagster_cloud.yaml — see Scaffolding a New OEM.

The Python implementation steps are:

  1. Scaffold via /scaffold-oem (generates project skeleton, build.yaml, Dockerfile)
  2. Create source modules in src/ai_<oem>/sources/ — pure Python, no Dagster imports
  3. Create src/ai_<oem>/sources/registry.py mapping (source, resource, entity) tuples to extractor functions
  4. Create component subclasses in src/ai_<oem>/components/raw.py
  5. Add YAML in src/ai_<oem>/defs/raw/defs.yaml, defs/transformed/defs.yaml, and defs/consolidated/defs.yaml
  6. Run uv sync --directory projects/ai_<oem>