Source code for pudl.etl.cli

"""A command line interface (CLI) to the main PUDL ETL functionality."""

import pathlib
import sys
from collections.abc import Callable

import click
import fsspec
from dagster import (
    DagsterInstance,
    JobDefinition,
    build_reconstructable_job,
    execute_job,
)

import pudl
from pudl.etl import defs
from pudl.helpers import get_dagster_execution_config
from pudl.settings import EtlSettings
from pudl.workspace.setup import PudlPaths

[docs] logger = pudl.logging_helpers.get_logger(__name__)
[docs] def pudl_etl_job_factory( logfile: str | None = None, loglevel: str = "INFO", base_job: str = "etl_full", ) -> Callable[[], JobDefinition]: """Factory for parameterizing a reconstructable pudl_etl job. Args: loglevel: The log level for the job's execution. logfile: Path to a log file for the job's execution. base_job: Name of the Dagster ETL job to execute. Returns: The job definition to be executed. """ def get_pudl_etl_job(): """Create an pudl_etl_job wrapped by to be wrapped by reconstructable.""" pudl.logging_helpers.configure_root_logger(logfile=logfile, loglevel=loglevel) return defs.get_job_def(base_job) return get_pudl_etl_job
@click.command( context_settings={"help_option_names": ["-h", "--help"]}, ) @click.argument( "etl_settings_yml", type=click.Path( exists=True, dir_okay=False, resolve_path=True, path_type=pathlib.Path, ), ) @click.option( "--dagster-workers", default=0, type=int, help="Max number of processes Dagster can launch. Defaults to the number of CPUs.", ) @click.option( "--cloud-cache-path", type=str, default="s3://pudl.catalyst.coop/zenodo", help=( "Load cached inputs from cloud object storage (S3 or GCS). This is typically " "much faster and more reliable than downloading from Zenodo directly. By " "default we read from the cache in PUDL's free, public AWS Open Data Registry " "bucket." ), ) @click.option( "--logfile", help="If specified, write logs to this file.", type=click.Path( exists=False, resolve_path=True, path_type=pathlib.Path, ), ) @click.option( "--loglevel", default="INFO", type=click.Choice( ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False ), )
[docs] def pudl_etl( etl_settings_yml: pathlib.Path, dagster_workers: int, cloud_cache_path: str, logfile: pathlib.Path | None, loglevel: str, ): """Use Dagster to run the PUDL ETL, as specified by the file ETL_SETTINGS_YML.""" # Display logged output from the PUDL package: logfile_str = str(logfile) if logfile is not None else None pudl.logging_helpers.configure_root_logger(logfile=logfile_str, loglevel=loglevel) etl_settings = EtlSettings.from_yaml(str(etl_settings_yml)) if etl_settings.datasets is None: raise click.BadParameter( "No datasets were configured in the ETL settings file.", param_hint="etl_settings_yml", ) if etl_settings.datasets.epacems is None or etl_settings.datasets.epacems.disabled: raise click.BadParameter( "EPA CEMS is now always included in the ETL. " "Set datasets.epacems with disabled: false in your ETL settings file.", param_hint="etl_settings_yml", ) dataset_settings_config = etl_settings.datasets.model_dump() pudl_etl_reconstructable_job = build_reconstructable_job( "pudl.etl.cli", "pudl_etl_job_factory", reconstructable_kwargs={ "loglevel": loglevel, "logfile": logfile_str, }, ) run_config = { "resources": { "dataset_settings": {"config": dataset_settings_config}, "datastore": { "config": { "cloud_cache_path": cloud_cache_path, }, }, }, } # Limit the number of concurrent workers when launch assets that use a lot of memory. tag_concurrency_limits = [ { "key": "memory-use", "value": "high", "limit": 4, }, ] run_config.update( get_dagster_execution_config( num_workers=dagster_workers, tag_concurrency_limits=tag_concurrency_limits, ) ) result = execute_job( pudl_etl_reconstructable_job, instance=DagsterInstance.get(), run_config=run_config, ) # Workaround to reliably getting full stack trace if not result.success: for event in result.all_events: if event.event_type_value == "STEP_FAILURE": event_error = getattr(event.event_specific_data, "error", None) if event_error is not None: raise Exception(event_error) raise Exception("ETL failed but no step error details were available.") else: logger.info("ETL job completed successfully, publishing outputs.") for output_path in etl_settings.publish_destinations: logger.info(f"Publishing outputs to {output_path}") fs, _, _ = fsspec.get_fs_token_paths(output_path) fs.put( PudlPaths().output_dir, # type: ignore[call-arg] output_path, recursive=True, )
if __name__ == "__main__": sys.exit(pudl_etl())