dbt_helper

A basic CLI to autogenerate dbt data test configurations.

Attributes

Classes

DbtColumn

Define yaml structure of a dbt column.

DbtTable

Define yaml structure of a dbt table.

DbtSource

Define basic dbt yml structure to add a pudl table as a dbt source.

DbtSchema

Define basic structure of a dbt models yaml file.

UpdateResult

TableUpdateArgs

Define a single class to collect the args for all table update commands.

Functions

schema_has_removals_or_modifications(→ bool)

Check if the DeepDiff includes any removals or modifications.

_log_schema_diff(diff, old_schema, new_schema)

Print old and new YAML, and summary of schema changes.

_schema_diff_summary(→ str)

Return all changes in a DeepDiff between two schemas as a string.

get_data_source(→ str)

Return data source for a table or 'output' if there's more than one source.

_get_local_table_path(table_name)

_get_model_path(→ pathlib.Path)

_get_row_count_csv_path(→ pathlib.Path)

_get_existing_row_counts(→ pandas.DataFrame)

_calculate_row_counts(→ pandas.DataFrame)

_combine_row_counts(→ pandas.DataFrame)

_write_row_counts(row_counts[, target])

update_row_counts(→ UpdateResult)

Generate updated row counts per partition and write to csv file within dbt project.

update_table_schema(→ UpdateResult)

Generate and write out a schema.yaml file defining a new or updated table.

_log_update_result(result)

_infer_partition_column(→ str)

update_tables(tables, target, clobber, update, schema, ...)

Add or update dbt schema configs and row count expectations for PUDL tables.

validate(→ None)

Validate a selection of DBT nodes.

dbt_helper()

Script for auto-generating dbt configuration and migrating existing tests.

Module Contents

dbt_helper.logger[source]
dbt_helper.ALL_TABLES[source]
class dbt_helper.DbtColumn(/, **data: Any)[source]

Bases: pydantic.BaseModel

Define yaml structure of a dbt column.

name: str[source]
description: str | None = None[source]
data_tests: list | None = None[source]
meta: dict | None = None[source]
tags: list[str] | None = None[source]
add_column_tests(column_tests: list) DbtColumn[source]

Add data tests to columns in dbt config.

class dbt_helper.DbtTable(/, **data: Any)[source]

Bases: pydantic.BaseModel

Define yaml structure of a dbt table.

name: str[source]
description: str | None = None[source]
data_tests: list | None = None[source]
columns: list[DbtColumn][source]
meta: dict | None = None[source]
tags: list[str] | None = None[source]
config: dict | None = None[source]
add_source_tests(source_tests: list) DbtSource[source]

Add data tests to source in dbt config.

add_column_tests(column_tests: dict[str, list]) DbtSource[source]

Add data tests to columns in dbt config.

static get_row_count_test_dict(table_name: str, partition_column: str)[source]

Return a dictionary with a dbt row count data test encoded in a dict.

classmethod from_table_name(table_name: str, partition_column: str) DbtSchema[source]

Construct configuration defining table from PUDL metadata.

class dbt_helper.DbtSource(/, **data: Any)[source]

Bases: pydantic.BaseModel

Define basic dbt yml structure to add a pudl table as a dbt source.

name: str = 'pudl'[source]
tables: list[DbtTable][source]
data_tests: list | None = None[source]
description: str | None = None[source]
meta: dict | None = None[source]
add_source_tests(source_tests: list) DbtSource[source]

Add data tests to source in dbt config.

add_column_tests(column_tests: dict[list]) DbtSource[source]

Add data tests to columns in dbt config.

class dbt_helper.DbtSchema(/, **data: Any)[source]

Bases: pydantic.BaseModel

Define basic structure of a dbt models yaml file.

version: int = 2[source]
sources: list[DbtSource][source]
models: list[DbtTable] | None = None[source]
add_source_tests(source_tests: list, model_name: str | None = None) DbtSchema[source]

Add data tests to source in dbt config.

add_column_tests(column_tests: dict[list], model_name: str | None = None) DbtSchema[source]

Add data tests to columns in dbt config.

classmethod from_table_name(table_name: str, partition_column: str) DbtSchema[source]

Construct configuration defining table from PUDL metadata.

classmethod from_yaml(schema_path: pathlib.Path) DbtSchema[source]

Load a DbtSchema object from a YAML file.

classmethod to_yaml(schema_path: pathlib.Path)[source]

Write DbtSchema object to YAML file.

dbt_helper.schema_has_removals_or_modifications(diff: deepdiff.DeepDiff) bool[source]

Check if the DeepDiff includes any removals or modifications.

dbt_helper._log_schema_diff(diff: deepdiff.DeepDiff, old_schema: DbtSchema, new_schema: DbtSchema)[source]

Print old and new YAML, and summary of schema changes.

dbt_helper._schema_diff_summary(diff: deepdiff.DeepDiff) str[source]

Return all changes in a DeepDiff between two schemas as a string.

dbt_helper.get_data_source(table_name: str) str[source]

Return data source for a table or ‘output’ if there’s more than one source.

class dbt_helper.UpdateResult[source]

Bases: tuple

success[source]
message[source]
dbt_helper._get_local_table_path(table_name)[source]
dbt_helper._get_model_path(table_name: str, data_source: str) pathlib.Path[source]
dbt_helper._get_row_count_csv_path(target: str = 'etl-full') pathlib.Path[source]
dbt_helper._get_existing_row_counts(target: str = 'etl-full') pandas.DataFrame[source]
dbt_helper._calculate_row_counts(table_name: str, partition_column: str = 'report_year') pandas.DataFrame[source]
dbt_helper._combine_row_counts(existing: pandas.DataFrame, new: pandas.DataFrame) pandas.DataFrame[source]
dbt_helper._write_row_counts(row_counts: pandas.DataFrame, target: str = 'etl-full')[source]
dbt_helper.update_row_counts(table_name: str, partition_column: str = 'report_year', target: str = 'etl-full', clobber: bool = False, update: bool = False) UpdateResult[source]

Generate updated row counts per partition and write to csv file within dbt project.

dbt_helper.update_table_schema(table_name: str, data_source: str, partition_column: str = 'report_year', clobber: bool = False, update: bool = False) UpdateResult[source]

Generate and write out a schema.yaml file defining a new or updated table.

dbt_helper._log_update_result(result: UpdateResult)[source]
dbt_helper._infer_partition_column(table_name: str) str[source]
class dbt_helper.TableUpdateArgs[source]

Define a single class to collect the args for all table update commands.

tables: list[str][source]
target: Literal['etl-full', 'etl-fast'] = 'etl-full'[source]
schema: bool = False[source]
row_counts: bool = False[source]
clobber: bool = False[source]
update: bool = False[source]
dbt_helper.update_tables(tables: list[str], target: str, clobber: bool, update: bool, schema: bool, row_counts: bool)[source]

Add or update dbt schema configs and row count expectations for PUDL tables.

The tables argument can be a single table name, a list of table names, or ‘all’. If ‘all’ the script will update configurations for for all PUDL tables.

If --clobber is set, existing configurations for tables will be overwritten. If --update is set, existing configurations for tables will be updated only if this does not result in deletions.

dbt_helper.validate(select: str = '*', exclude: str | None = None, target: str = 'etl-full') None[source]

Validate a selection of DBT nodes.

Wraps the dbt build command line so we can annotate the result with the actual data that was returned from the test query.

dbt_helper.dbt_helper()[source]

Script for auto-generating dbt configuration and migrating existing tests.

This CLI currently provides the following sub-commands:

  • update-tables which can update or create a dbt table (model) schema.yml file under the dbt/models repo. These configuration files tell dbt about the structure of the table and what data tests are specified for it. It also adds a (required) row count test by default. The script can also generate or update the expected row counts for existing tables, assuming they have been materialized to parquet files and are sitting in your $PUDL_OUT directory.

  • validate: run validation tests for a selection of DBT nodes.

Run dbt_helper {command} --help for detailed usage on each command.