Component Reference
Advanced configuration for ai_core component classes. See Code Structure for the component overview and basic YAML format.
Raw Tier Schema
Every raw asset stores exactly five columns (produced by make_raw_row() in ai_core/schema.py):
| Column | Type | Content |
|---|---|---|
_partition_date | date | Partition date the fetch ran for |
_fetched_at | datetime | UTC timestamp of the fetch |
_request_url | str | URL called |
_request_params | str | JSON-encoded query params |
_response_body | str | Raw JSON response string |
TransformedEntityComponent reads _response_body and passes it to the extractor in the source registry.
ai_core Utilities
packages/ai_core/src/ai_core/http_utils.py and packages/ai_core/src/ai_core/patterns.py provide reusable helpers and dataclasses for source modules:
@retry_with_exponential_backoff— decorator that retries on transient HTTP errors with jitter. On retry exhaustion the underlying exception is re-raised (it does not returnNone), so callers can derive a final status code instead of silently swallowing the failure. Use on anyfetch_*function that calls an external API.call_with_exponential_backoff(fn, *, max_retries, backoff_seconds, exceptions, op_name=None)— imperative form of the same; use when retry params come from variables. Caller passes a no-arg callable (typicallylambdaorfunctools.partial).http_get_with_retry(url, ...)/http_post_with_retry(url, ...)— single-call HTTP GET/POST with exponential backoff on 429 / 5xx + connection errors. Wrapshttpx.get/postwithcall_with_exponential_backoffandTransientHTTPError.raise_if_transient.paginated_get(url, params, ...)/paginated_post(url, body, ...)— fetch all pages of a paginated REST endpoint concurrently. Both return a flat list of(response_content, request_params)tuples — one per fetched page, in arbitrary order whenmax_workers > 1. Pass the content directly tomake_raw_rowto populate the_response_bodyIceberg column without re-encoding.paginated_get_events(url, params, ...)— event-native form ofpaginated_getfromai_core/http/paginated.py. EmitsHttpRequestEventrecords for response and exception events, including retry attempts, attempt count, and request duration, so callers that need request transparency can inspect failures without parsing retry logs. The regularpaginated_gethelper remains the compatibility wrapper that yields only successful response pages.TransientHTTPError(message, *, status_code=None)— exception raised by HTTP helpers; carries an optionalstatus_codeso callers can derive the final response status after retry exhaustion. Class-level helpers:TransientHTTPError.is_transient(status_code)(predicate for 429 / 5xx) andTransientHTTPError.raise_if_transient(resp)(raise on transient status, returnrespotherwise).extract_status_code(exc, resp=None)— single-source helper that pulls a status code fromTransientHTTPError,httpx.HTTPStatusError, or a fallback response object.FetchAttempt(ok, status_code, context)— dataclass recording a single HTTP attempt. Source modules emit one per request to feed thefetch_attemptsasset check;contextis a small dict naming what the request was for (e.g.{"page": 4, "vehicle_id": "v123"}).RawFetchResult(df, attempts)— dataclass returned byfetch()andfetch_with_upstream()onRawSourceComponent. Bundles the rawpl.DataFrameand theFetchAttemptrecords that produced it.browser_session(*, seed_url=None, impersonate="chrome", ...)— context manager yielding acurl_cffisession with browser TLS impersonation. Use for sites with TLS-fingerprinting bot protection (Akamai, Cloudflare, PerimeterX);seed_urltriggers a warmup GET to set bot-manager session cookies before the first real call.parallel_call(tasks, *, max_workers, ...)— fan out a list of zero-arg callables across a thread pool. Failures captured as exceptions in the result list (per-input). Supports Hystrix-style circuit-breaker abort viafailure_ratio+min_samples, orabort_on_any_failure=Truefor "drop on first failure" semantics; aborted runs raiseParallelCallAbortedcarrying the partial results.make_raw_row_frame(partition_date, request_url, request_params_list, response_body_list, *, fetched_at)— builds apl.DataFramematchingRAW_ROW_SCHEMAfrom columnar input lists. Prefer this over[make_raw_row(...) for ...] + pl.from_dicts(rows, ...)when a fetcher produces many rows: constants are broadcast into a Series of length n instead of repeated in n separate Python dicts, and the transient list[dict] intermediate that previously coexisted with the resulting DataFrame is gone. Seemake_raw_rowfor single-row construction.
Source modules should use these rather than implementing their own retry, pagination, or instrumentation logic.
For memory and wall-time considerations, including measured wins from the patterns we adopted and a list of deferred enhancements, see Performance Notes.
Source Overrides
PPM_SOURCE_OVERRIDES is an optional env var pointing to a YAML file that substitutes local fixture data for live API calls. This allows full pipeline materializations without network access, and is the preferred approach for integration testing. See ai_core/source_overrides.py for the override resolution logic.
RawSourceComponent
Fetch contract
fetch() and fetch_with_upstream() return a single RawFetchResult. The df field may be a pl.DataFrame (in-memory) or a pl.LazyFrame (streaming, backed by a temp IPC file). Use drain_fetch_results from ai_core.http_utils when a fetcher produces data in natural per-resource batches — it drains a generator of RawFetchResult chunks into a single RawFetchResult with a scanning LazyFrame, avoiding peak memory spikes while keeping the component interface simple.
fetch_attempts instrumentation
The raw-tier fetch_attempts asset check (severity WARN, non-blocking) reports per-request outcomes in its metadata so non-zero HTTP failures show up in the Dagster UI without parsing logs. To feed it, the source module's fetch_* function should:
- Catch
TransientHTTPErrorandhttpx.HTTPErrorper request. - Build a
FetchAttemptfor each request — setok=Trueon success, orok=Falsewithstatus_codederived viaextract_status_code(exc, resp)on failure. Populatecontextwith whatever fields identify the request (e.g.{"target": vehicle_id, "page": page_idx}). - Return
RawFetchResult(df=df, attempts=attempts)from the fetcher, and have the component'sfetch()return that value unchanged.
OEMs that have not been instrumented continue to work — the component wraps a bare pl.DataFrame return into a RawFetchResult automatically and the asset check reports tracking=uninstrumented in its metadata so operators can tell instrumented and uninstrumented sources apart at a glance.
The check's metadata also includes failure_patterns — failures grouped by (status_code, set of context keys) and capped at MAX_PATTERNS_IN_METADATA = 50 so the JSON payload stays small enough for the Dagster UI. Each pattern surfaces common_fields (constant context across the cluster — the actionable signal) and variations (per-attempt values for the keys that differ).
Concurrency
Set concurrency: N in YAML when a raw component makes many parallel requests (e.g. per-vehicle detail fetches). The base class uses this to cap in-flight requests.
Upstream Source Dependencies
When a raw fetch requires data from another raw asset (e.g. dealer IDs must be fetched first), override fetch_with_upstream() instead of fetch() in the RawSourceComponent subclass. The upstream asset key is declared via the upstream_dep attribute in YAML as a full asset key path (e.g., "audi/raw/scs_search"). The base class wires the dependency automatically.
BigQuery-backed Sources
When a raw component reads from BigQuery instead of an HTTP API, it uses a BigQueryResource (from dagster-gcp) registered in the OEM project's Definitions under the key BIG_QUERY_RESOURCE_KEY (exported from ai_core.components.raw_historical).
The resource is registered once per OEM project and supplies the GCP project and credentials:
from dagster_gcp import BigQueryResource
from ai_core.components.raw_historical import BIG_QUERY_RESOURCE_KEY
dg.Definitions(resources={
BIG_QUERY_RESOURCE_KEY: BigQueryResource(
gcp_credentials=dg.EnvVar("GCP_CREDENTIALS"),
project=os.environ.get("<OEM>_GCP_PROJECT", "<default-project>"),
)
})
Authentication uses Application Default Credentials when gcp_credentials is not set — run gcloud auth application-default login for local development.
Tags
RawSourceComponent accepts a tags: dict[str, str] attribute. The component merges this with {"oem": oem} on every asset it produces. OEMs with multiple brands use this to attach a brand tag:
tags:
brand: jeep
Each brand gets its own raw/transformed components; the consolidated tier unions them by listing all brand-scoped transformed asset keys as separate source_groups entries.
TransformedEntityComponent
Tiebreak
Selects a winning row when duplicate records conflict on non-aggregated columns:
tiebreak:
column: date_offer # field to compare when resolving conflicts
strategy: max # or min
Aggregations
Collects distinct values from duplicate rows into a list column:
aggregations:
- output_name: vsapi_classifications
collect_distinct_from: vsapi_classification
ConsolidatedComponent
See the ConsolidatedComponent reference for full documentation, including:
- Source Groups —
history_days,observation_date_col,filter_window_boundary, gap fill - Merge Spec —
match_ontiers,coalescepolicies - Foreign Keys — single-value joins,
left_col>ref_colsyntax, list-mode dot-path joins