Source code for pudl.etl.asset_checks
"""Programmatically defined Dagster asset checks for PUDL.
We primarily use Dagster asset checks to validate the schemas of PUDL tables. We use
Pandera to programmatically define dataframe schemas based on the PUDL metadata with the
asset check factory :func:`asset_check_from_schema` defined below.
For data validation we almost entirely rely on dbt data tests.
"""
from typing import Any
import geopandas as gpd # noqa: ICN002
import pandas as pd
import pandera.pandas as pr_pandas
import pandera.polars as pr_polars
import polars as pl
from dagster import (
AssetCheckResult,
AssetChecksDefinition,
AssetKey,
asset_check,
)
from pandera.errors import SchemaErrors
from pudl.helpers import ParquetData, get_parquet_table_polars
from pudl.metadata.classes import Package, Resource
from pudl.settings import ferceqr_year_quarters
[docs]
def _collect_asset_metadata(asset_value) -> dict[str, Any]:
"""Collect basic metadata about the asset."""
if isinstance(asset_value, pl.LazyFrame):
shape = (
asset_value.select(pl.len()).collect(engine="streaming").item(),
asset_value.collect_schema().len(),
)
else:
shape = asset_value.shape
return {
"asset_type": str(type(asset_value)),
"asset_shape": list(shape),
}
[docs]
def _extract_actual_columns_and_dtypes(
asset_value: pl.LazyFrame | pd.DataFrame,
) -> tuple[list[str], dict[str, str], bool]:
"""Extract actual column names and dtypes from supported dataframe objects."""
use_pandas_backend = False
if isinstance(asset_value, pl.LazyFrame):
schema = asset_value.collect_schema()
actual_columns = schema.names()
actual_dtypes = {
col: str(dtype)
for col, dtype in zip(actual_columns, schema.dtypes(), strict=True)
}
return actual_columns, actual_dtypes, use_pandas_backend
if isinstance(asset_value, pd.DataFrame):
use_pandas_backend = True
actual_columns = list(asset_value.columns)
actual_dtypes = {
str(col): str(dtype) for col, dtype in asset_value.dtypes.items()
}
return actual_columns, actual_dtypes, use_pandas_backend
raise ValueError(
f"Unsupported asset type for dtype collection: {type(asset_value)}"
)
[docs]
def _collect_dtype_metadata(
asset_value: pl.LazyFrame | pd.DataFrame,
resource: Resource,
) -> dict[str, Any]:
"""Build metadata comparing actual dataframe dtypes to metadata-driven expectations.
Args:
asset_value: Asset output to introspect. Supported types are
:class:`pandas.DataFrame` and :class:`polars.LazyFrame`.
resource: PUDL metadata resource whose schema fields define expected columns and
dtypes.
Returns:
A metadata dictionary with:
- ``field_details``: per-column expected and actual dtype details.
- ``column_comparison``: expected/actual column counts and optional missing
or extra column lists.
- ``type_mismatches``: only present when common columns have differing dtype
strings.
Raises:
ValueError: If ``asset_value`` is not a supported dataframe type.
Notes:
Expected dtypes are captured as strings from ``field.to_pandera_column()``.
Any errors while computing expected dtypes are recorded inline as
``"Error: ..."`` values rather than raised.
"""
dtype_errors: dict[str, str] = {}
actual_columns, actual_dtypes, use_pandas_backend = (
_extract_actual_columns_and_dtypes(asset_value)
)
expected_columns = [field.name for field in resource.schema.fields]
pandera_dtypes = {}
for field in resource.schema.fields:
try:
pandera_dtypes[field.name] = str(
field.to_pandera_column(use_pandas_backend=use_pandas_backend).dtype
)
except Exception as e:
error_text = str(e)
pandera_dtypes[field.name] = f"Error: {error_text}"
dtype_errors[field.name] = error_text
field_details = {
field.name: {
"pudl_field_dtype": field.type,
"expected_pandera_dtype": pandera_dtypes.get(field.name, "Unknown"),
"actual_dtype": actual_dtypes.get(field.name, "Column not present"),
}
for field in resource.schema.fields
}
missing_columns = sorted(set(expected_columns) - set(actual_columns))
extra_columns = sorted(set(actual_columns) - set(expected_columns))
column_comparison: dict[str, Any] = {
"expected_count": len(expected_columns),
"actual_count": len(actual_columns),
}
if missing_columns:
column_comparison["missing_columns"] = missing_columns
if extra_columns:
column_comparison["extra_columns"] = extra_columns
common_columns = sorted(set(expected_columns) & set(actual_columns))
type_mismatches = {}
for col in common_columns:
expected_type = pandera_dtypes.get(col, "Unknown")
actual_type = actual_dtypes.get(col, "Unknown")
if expected_type != actual_type and expected_type != "Unknown":
type_mismatches[col] = {"expected": expected_type, "actual": actual_type}
metadata = {
"field_details": field_details,
"column_comparison": column_comparison,
}
if dtype_errors:
metadata["expected_dtype_errors"] = dtype_errors
if type_mismatches:
metadata["type_mismatches"] = type_mismatches
return metadata
[docs]
def _collect_geometry_metadata(asset_value) -> dict[str, Any]:
"""Collect GeoPandas-specific metadata."""
if not isinstance(asset_value, gpd.GeoDataFrame):
return {}
metadata = {
"geometry_column": (
asset_value.geometry.name
if hasattr(asset_value, "geometry")
else "No geometry attribute"
)
}
if hasattr(asset_value, "geometry") and hasattr(asset_value.geometry, "dtype"):
metadata["geometry_dtype"] = str(asset_value.geometry.dtype)
return metadata
[docs]
def _process_schema_errors(schema_errors: SchemaErrors) -> dict[str, Any]:
"""Process Pandera schema errors into structured metadata."""
detailed_errors = []
for err in schema_errors.schema_errors:
error_info = {
"error_type": type(err).__name__,
"error_message": str(err),
"failure_cases": str(err.failure_cases)
if hasattr(err, "failure_cases")
else "No failure_cases",
"data": str(err.data) if hasattr(err, "data") else "No data",
}
# Add optional error attributes
for attr in ["schema", "check", "args"]:
if hasattr(err, attr):
error_info[f"{attr}_info"] = str(getattr(err, attr))
detailed_errors.append(error_info)
return {
"detailed_errors": detailed_errors,
"num_errors": len(schema_errors.schema_errors),
}
[docs]
def asset_check_from_schema( # noqa: C901
asset_key: AssetKey,
package: Package,
duckdb_asset: bool,
high_memory_asset: bool,
) -> AssetChecksDefinition | None:
"""Create a dagster asset check based on the resource schema, if defined.
The vast majority of assets will be loaded as Polars LazyFrames directly using
the ``PudlParquetIOManager`` and validated with Pandera's Polars backend, but
there are two exceptions to this. The first exception are assets which contain
a geometry data type. These assets will all be loaded as geopandas GeoDataFrames
and use Pandera's Pandas backend as Polars does not support geometry data types.
The second exception are assets produced entirely using DuckDB. These assets
return ``ParquetData`` objects, which are handled by the default io-manager. In
this case, the resulting parquet file(s) will be scanned with Polars to produce
a LazyFrame, then handled exactly the same as a typical asset.
"""
resource_id = asset_key.to_user_string()
try:
resource = package.get_resource(resource_id)
except ValueError:
return None
pandera_schema = resource.schema.to_pandera()
partitions = ferceqr_year_quarters if "ferceqr" in resource_id else None
if duckdb_asset:
asset_type = ParquetData
elif isinstance(pandera_schema, pr_polars.DataFrameSchema):
asset_type = pl.LazyFrame
elif isinstance(pandera_schema, pr_pandas.DataFrameSchema):
asset_type = gpd.GeoDataFrame
else:
raise ValueError(
"Unexpected return type from `Resource.schema.to_pandera()`."
f"Expected a pandera `DataFrameSchema`, but got: `{type(pandera_schema)}`"
)
@asset_check(asset=asset_key, blocking=True, partitions_def=partitions)
def pandera_schema_check(asset_value: asset_type) -> AssetCheckResult:
if isinstance(asset_value, ParquetData):
asset_value = get_parquet_table_polars(
table_name=resource_id,
partitions=asset_value.partitions,
)
# Collect all metadata
metadata = (
_collect_asset_metadata(asset_value)
| _collect_dtype_metadata(asset_value, resource)
| _collect_geometry_metadata(asset_value)
)
try:
if isinstance(asset_value, pl.LazyFrame):
validated_schema = asset_value.pipe(pandera_schema.validate, lazy=True)
# Only validate data contents if asset is not marked as high memory
if not high_memory_asset:
validated_schema.collect(engine="streaming")
else:
pandera_schema.validate(asset_value, lazy=True)
return AssetCheckResult(passed=True, metadata=metadata)
except SchemaErrors as schema_errors:
metadata.update(_process_schema_errors(schema_errors))
return AssetCheckResult(passed=False, metadata=metadata)
except Exception as e:
metadata["unexpected_error"] = {
"error_type": type(e).__name__,
"error_message": str(e),
"error_args": str(e.args) if hasattr(e, "args") else "No args",
}
return AssetCheckResult(passed=False, metadata=metadata)
return pandera_schema_check