Skip to main content

Component Reference

Advanced configuration for ai_core component classes. See Code Structure for the component overview and basic YAML format.

Raw Tier Schema

Every raw asset stores exactly five columns (produced by make_raw_row() in ai_core/schema.py):

ColumnTypeContent
_partition_datedatePartition date the fetch ran for
_fetched_atdatetimeUTC timestamp of the fetch
_request_urlstrURL called
_request_paramsstrJSON-encoded query params
_response_bodystrRaw JSON response string

TransformedEntityComponent reads _response_body and passes it to the extractor in the source registry.

ai_core Utilities

packages/ai_core/src/ai_core/http_utils.py and packages/ai_core/src/ai_core/patterns.py provide reusable helpers and dataclasses for source modules:

  • @retry_with_exponential_backoff — decorator that retries on transient HTTP errors with jitter. On retry exhaustion the underlying exception is re-raised (it does not return None), so callers can derive a final status code instead of silently swallowing the failure. Use on any fetch_* function that calls an external API.
  • call_with_exponential_backoff(fn, *, max_retries, backoff_seconds, exceptions, op_name=None) — imperative form of the same; use when retry params come from variables. Caller passes a no-arg callable (typically lambda or functools.partial).
  • http_get_with_retry(url, ...) / http_post_with_retry(url, ...) — single-call HTTP GET/POST with exponential backoff on 429 / 5xx + connection errors. Wraps httpx.get/post with call_with_exponential_backoff and TransientHTTPError.raise_if_transient.
  • paginated_get(url, params, ...) / paginated_post(url, body, ...) — fetch all pages of a paginated REST endpoint concurrently. Both return a flat list of (response_content, request_params) tuples — one per fetched page, in arbitrary order when max_workers > 1. Pass the content directly to make_raw_row to populate the _response_body Iceberg column without re-encoding.
  • paginated_get_events(url, params, ...) — event-native form of paginated_get from ai_core/http/paginated.py. Emits HttpRequestEvent records for response and exception events, including retry attempts, attempt count, and request duration, so callers that need request transparency can inspect failures without parsing retry logs. The regular paginated_get helper remains the compatibility wrapper that yields only successful response pages.
  • TransientHTTPError(message, *, status_code=None) — exception raised by HTTP helpers; carries an optional status_code so callers can derive the final response status after retry exhaustion. Class-level helpers: TransientHTTPError.is_transient(status_code) (predicate for 429 / 5xx) and TransientHTTPError.raise_if_transient(resp) (raise on transient status, return resp otherwise).
  • extract_status_code(exc, resp=None) — single-source helper that pulls a status code from TransientHTTPError, httpx.HTTPStatusError, or a fallback response object.
  • FetchAttempt(ok, status_code, context) — dataclass recording a single HTTP attempt. Source modules emit one per request to feed the fetch_attempts asset check; context is a small dict naming what the request was for (e.g. {"page": 4, "vehicle_id": "v123"}).
  • RawFetchResult(df, attempts) — dataclass returned by fetch() and fetch_with_upstream() on RawSourceComponent. Bundles the raw pl.DataFrame and the FetchAttempt records that produced it.
  • browser_session(*, seed_url=None, impersonate="chrome", ...) — context manager yielding a curl_cffi session with browser TLS impersonation. Use for sites with TLS-fingerprinting bot protection (Akamai, Cloudflare, PerimeterX); seed_url triggers a warmup GET to set bot-manager session cookies before the first real call.
  • parallel_call(tasks, *, max_workers, ...) — fan out a list of zero-arg callables across a thread pool. Failures captured as exceptions in the result list (per-input). Supports Hystrix-style circuit-breaker abort via failure_ratio + min_samples, or abort_on_any_failure=True for "drop on first failure" semantics; aborted runs raise ParallelCallAborted carrying the partial results.
  • make_raw_row_frame(partition_date, request_url, request_params_list, response_body_list, *, fetched_at) — builds a pl.DataFrame matching RAW_ROW_SCHEMA from columnar input lists. Prefer this over [make_raw_row(...) for ...] + pl.from_dicts(rows, ...) when a fetcher produces many rows: constants are broadcast into a Series of length n instead of repeated in n separate Python dicts, and the transient list[dict] intermediate that previously coexisted with the resulting DataFrame is gone. See make_raw_row for single-row construction.

Source modules should use these rather than implementing their own retry, pagination, or instrumentation logic.

For memory and wall-time considerations, including measured wins from the patterns we adopted and a list of deferred enhancements, see Performance Notes.

Source Overrides

PPM_SOURCE_OVERRIDES is an optional env var pointing to a YAML file that substitutes local fixture data for live API calls. This allows full pipeline materializations without network access, and is the preferred approach for integration testing. See ai_core/source_overrides.py for the override resolution logic.

RawSourceComponent

Fetch contract

fetch() and fetch_with_upstream() return a single RawFetchResult. The df field may be a pl.DataFrame (in-memory) or a pl.LazyFrame (streaming, backed by a temp IPC file). Use drain_fetch_results from ai_core.http_utils when a fetcher produces data in natural per-resource batches — it drains a generator of RawFetchResult chunks into a single RawFetchResult with a scanning LazyFrame, avoiding peak memory spikes while keeping the component interface simple.

fetch_attempts instrumentation

The raw-tier fetch_attempts asset check (severity WARN, non-blocking) reports per-request outcomes in its metadata so non-zero HTTP failures show up in the Dagster UI without parsing logs. To feed it, the source module's fetch_* function should:

  1. Catch TransientHTTPError and httpx.HTTPError per request.
  2. Build a FetchAttempt for each request — set ok=True on success, or ok=False with status_code derived via extract_status_code(exc, resp) on failure. Populate context with whatever fields identify the request (e.g. {"target": vehicle_id, "page": page_idx}).
  3. Return RawFetchResult(df=df, attempts=attempts) from the fetcher, and have the component's fetch() return that value unchanged.

OEMs that have not been instrumented continue to work — the component wraps a bare pl.DataFrame return into a RawFetchResult automatically and the asset check reports tracking=uninstrumented in its metadata so operators can tell instrumented and uninstrumented sources apart at a glance.

The check's metadata also includes failure_patterns — failures grouped by (status_code, set of context keys) and capped at MAX_PATTERNS_IN_METADATA = 50 so the JSON payload stays small enough for the Dagster UI. Each pattern surfaces common_fields (constant context across the cluster — the actionable signal) and variations (per-attempt values for the keys that differ).

Concurrency

Set concurrency: N in YAML when a raw component makes many parallel requests (e.g. per-vehicle detail fetches). The base class uses this to cap in-flight requests.

Upstream Source Dependencies

When a raw fetch requires data from another raw asset (e.g. dealer IDs must be fetched first), override fetch_with_upstream() instead of fetch() in the RawSourceComponent subclass. The upstream asset key is declared via the upstream_dep attribute in YAML as a full asset key path (e.g., "audi/raw/scs_search"). The base class wires the dependency automatically.

BigQuery-backed Sources

When a raw component reads from BigQuery instead of an HTTP API, it uses a BigQueryResource (from dagster-gcp) registered in the OEM project's Definitions under the key BIG_QUERY_RESOURCE_KEY (exported from ai_core.components.raw_historical).

The resource is registered once per OEM project and supplies the GCP project and credentials:

from dagster_gcp import BigQueryResource
from ai_core.components.raw_historical import BIG_QUERY_RESOURCE_KEY

dg.Definitions(resources={
BIG_QUERY_RESOURCE_KEY: BigQueryResource(
gcp_credentials=dg.EnvVar("GCP_CREDENTIALS"),
project=os.environ.get("<OEM>_GCP_PROJECT", "<default-project>"),
)
})

Authentication uses Application Default Credentials when gcp_credentials is not set — run gcloud auth application-default login for local development.

Tags

RawSourceComponent accepts a tags: dict[str, str] attribute. The component merges this with {"oem": oem} on every asset it produces. OEMs with multiple brands use this to attach a brand tag:

tags:
brand: jeep

Each brand gets its own raw/transformed components; the consolidated tier unions them by listing all brand-scoped transformed asset keys as separate source_groups entries.

TransformedEntityComponent

Tiebreak

Selects a winning row when duplicate records conflict on non-aggregated columns:

tiebreak:
column: date_offer # field to compare when resolving conflicts
strategy: max # or min

Aggregations

Collects distinct values from duplicate rows into a list column:

aggregations:
- output_name: vsapi_classifications
collect_distinct_from: vsapi_classification

ConsolidatedComponent

See the ConsolidatedComponent reference for full documentation, including:

  • Source Groupshistory_days, observation_date_col, filter_window_boundary, gap fill
  • Merge Specmatch_on tiers, coalesce policies
  • Foreign Keys — single-value joins, left_col>ref_col syntax, list-mode dot-path joins