Source code for pudl.etl.asset_checks

"""Programmatically defined Dagster asset checks for PUDL.

We primarily use Dagster asset checks to validate the schemas of PUDL tables. We use
Pandera to programmatically define dataframe schemas based on the PUDL metadata with the
asset check factory :func:`asset_check_from_schema` defined below.

For data validation we almost entirely rely on dbt data tests.
"""

from typing import Any

import geopandas as gpd  # noqa: ICN002
import pandas as pd
import pandera.pandas as pr_pandas
import pandera.polars as pr_polars
import polars as pl
from dagster import (
    AssetCheckResult,
    AssetChecksDefinition,
    AssetKey,
    asset_check,
)
from pandera.errors import SchemaErrors

from pudl.helpers import ParquetData, get_parquet_table_polars
from pudl.metadata.classes import Package, Resource
from pudl.settings import ferceqr_year_quarters


[docs] def _collect_asset_metadata(asset_value) -> dict[str, Any]: """Collect basic metadata about the asset.""" if isinstance(asset_value, pl.LazyFrame): shape = ( asset_value.select(pl.len()).collect(engine="streaming").item(), asset_value.collect_schema().len(), ) else: shape = asset_value.shape return { "asset_type": str(type(asset_value)), "asset_shape": list(shape), }
[docs] def _extract_actual_columns_and_dtypes( asset_value: pl.LazyFrame | pd.DataFrame, ) -> tuple[list[str], dict[str, str], bool]: """Extract actual column names and dtypes from supported dataframe objects.""" use_pandas_backend = False if isinstance(asset_value, pl.LazyFrame): schema = asset_value.collect_schema() actual_columns = schema.names() actual_dtypes = { col: str(dtype) for col, dtype in zip(actual_columns, schema.dtypes(), strict=True) } return actual_columns, actual_dtypes, use_pandas_backend if isinstance(asset_value, pd.DataFrame): use_pandas_backend = True actual_columns = list(asset_value.columns) actual_dtypes = { str(col): str(dtype) for col, dtype in asset_value.dtypes.items() } return actual_columns, actual_dtypes, use_pandas_backend raise ValueError( f"Unsupported asset type for dtype collection: {type(asset_value)}" )
[docs] def _collect_dtype_metadata( asset_value: pl.LazyFrame | pd.DataFrame, resource: Resource, ) -> dict[str, Any]: """Build metadata comparing actual dataframe dtypes to metadata-driven expectations. Args: asset_value: Asset output to introspect. Supported types are :class:`pandas.DataFrame` and :class:`polars.LazyFrame`. resource: PUDL metadata resource whose schema fields define expected columns and dtypes. Returns: A metadata dictionary with: - ``field_details``: per-column expected and actual dtype details. - ``column_comparison``: expected/actual column counts and optional missing or extra column lists. - ``type_mismatches``: only present when common columns have differing dtype strings. Raises: ValueError: If ``asset_value`` is not a supported dataframe type. Notes: Expected dtypes are captured as strings from ``field.to_pandera_column()``. Any errors while computing expected dtypes are recorded inline as ``"Error: ..."`` values rather than raised. """ dtype_errors: dict[str, str] = {} actual_columns, actual_dtypes, use_pandas_backend = ( _extract_actual_columns_and_dtypes(asset_value) ) expected_columns = [field.name for field in resource.schema.fields] pandera_dtypes = {} for field in resource.schema.fields: try: pandera_dtypes[field.name] = str( field.to_pandera_column(use_pandas_backend=use_pandas_backend).dtype ) except Exception as e: error_text = str(e) pandera_dtypes[field.name] = f"Error: {error_text}" dtype_errors[field.name] = error_text field_details = { field.name: { "pudl_field_dtype": field.type, "expected_pandera_dtype": pandera_dtypes.get(field.name, "Unknown"), "actual_dtype": actual_dtypes.get(field.name, "Column not present"), } for field in resource.schema.fields } missing_columns = sorted(set(expected_columns) - set(actual_columns)) extra_columns = sorted(set(actual_columns) - set(expected_columns)) column_comparison: dict[str, Any] = { "expected_count": len(expected_columns), "actual_count": len(actual_columns), } if missing_columns: column_comparison["missing_columns"] = missing_columns if extra_columns: column_comparison["extra_columns"] = extra_columns common_columns = sorted(set(expected_columns) & set(actual_columns)) type_mismatches = {} for col in common_columns: expected_type = pandera_dtypes.get(col, "Unknown") actual_type = actual_dtypes.get(col, "Unknown") if expected_type != actual_type and expected_type != "Unknown": type_mismatches[col] = {"expected": expected_type, "actual": actual_type} metadata = { "field_details": field_details, "column_comparison": column_comparison, } if dtype_errors: metadata["expected_dtype_errors"] = dtype_errors if type_mismatches: metadata["type_mismatches"] = type_mismatches return metadata
[docs] def _collect_geometry_metadata(asset_value) -> dict[str, Any]: """Collect GeoPandas-specific metadata.""" if not isinstance(asset_value, gpd.GeoDataFrame): return {} metadata = { "geometry_column": ( asset_value.geometry.name if hasattr(asset_value, "geometry") else "No geometry attribute" ) } if hasattr(asset_value, "geometry") and hasattr(asset_value.geometry, "dtype"): metadata["geometry_dtype"] = str(asset_value.geometry.dtype) return metadata
[docs] def _process_schema_errors(schema_errors: SchemaErrors) -> dict[str, Any]: """Process Pandera schema errors into structured metadata.""" detailed_errors = [] for err in schema_errors.schema_errors: error_info = { "error_type": type(err).__name__, "error_message": str(err), "failure_cases": str(err.failure_cases) if hasattr(err, "failure_cases") else "No failure_cases", "data": str(err.data) if hasattr(err, "data") else "No data", } # Add optional error attributes for attr in ["schema", "check", "args"]: if hasattr(err, attr): error_info[f"{attr}_info"] = str(getattr(err, attr)) detailed_errors.append(error_info) return { "detailed_errors": detailed_errors, "num_errors": len(schema_errors.schema_errors), }
[docs] def asset_check_from_schema( # noqa: C901 asset_key: AssetKey, package: Package, duckdb_asset: bool, high_memory_asset: bool, ) -> AssetChecksDefinition | None: """Create a dagster asset check based on the resource schema, if defined. The vast majority of assets will be loaded as Polars LazyFrames directly using the ``PudlParquetIOManager`` and validated with Pandera's Polars backend, but there are two exceptions to this. The first exception are assets which contain a geometry data type. These assets will all be loaded as geopandas GeoDataFrames and use Pandera's Pandas backend as Polars does not support geometry data types. The second exception are assets produced entirely using DuckDB. These assets return ``ParquetData`` objects, which are handled by the default io-manager. In this case, the resulting parquet file(s) will be scanned with Polars to produce a LazyFrame, then handled exactly the same as a typical asset. """ resource_id = asset_key.to_user_string() try: resource = package.get_resource(resource_id) except ValueError: return None pandera_schema = resource.schema.to_pandera() partitions = ferceqr_year_quarters if "ferceqr" in resource_id else None if duckdb_asset: asset_type = ParquetData elif isinstance(pandera_schema, pr_polars.DataFrameSchema): asset_type = pl.LazyFrame elif isinstance(pandera_schema, pr_pandas.DataFrameSchema): asset_type = gpd.GeoDataFrame else: raise ValueError( "Unexpected return type from `Resource.schema.to_pandera()`." f"Expected a pandera `DataFrameSchema`, but got: `{type(pandera_schema)}`" ) @asset_check(asset=asset_key, blocking=True, partitions_def=partitions) def pandera_schema_check(asset_value: asset_type) -> AssetCheckResult: if isinstance(asset_value, ParquetData): asset_value = get_parquet_table_polars( table_name=resource_id, partitions=asset_value.partitions, ) # Collect all metadata metadata = ( _collect_asset_metadata(asset_value) | _collect_dtype_metadata(asset_value, resource) | _collect_geometry_metadata(asset_value) ) try: if isinstance(asset_value, pl.LazyFrame): validated_schema = asset_value.pipe(pandera_schema.validate, lazy=True) # Only validate data contents if asset is not marked as high memory if not high_memory_asset: validated_schema.collect(engine="streaming") else: pandera_schema.validate(asset_value, lazy=True) return AssetCheckResult(passed=True, metadata=metadata) except SchemaErrors as schema_errors: metadata.update(_process_schema_errors(schema_errors)) return AssetCheckResult(passed=False, metadata=metadata) except Exception as e: metadata["unexpected_error"] = { "error_type": type(e).__name__, "error_message": str(e), "error_args": str(e.args) if hasattr(e, "args") else "No args", } return AssetCheckResult(passed=False, metadata=metadata) return pandera_schema_check