Source code for pudl.output.eia930

"""Functions for compiling derived aspects of the EIA 930 data.

For a narrative overview of the timeseries imputation process, see the documentation
at :doc:`/methodology/timeseries_imputation`
"""

from datetime import date

import pandas as pd
from dagster import AssetOut, Output, asset, multi_asset

from pudl.analysis.timeseries_cleaning import (
    ImputeTimeseriesSettings,
    impute_timeseries_asset_factory,
)


[docs] def _add_timezone( df: pd.DataFrame, core_eia__codes_balancing_authorities: pd.DataFrame ) -> pd.DataFrame: other = core_eia__codes_balancing_authorities[["code", "report_timezone"]].rename( columns={"code": "balancing_authority_code_eia"} ) return df.merge(other, on=["balancing_authority_code_eia"]).rename( columns={"report_timezone": "timezone"} )
@asset( compute_kind="Python", required_resource_keys={"global_data_config"}, )
[docs] def _out_eia930__hourly_operations( core_eia930__hourly_operations: pd.DataFrame, core_eia__codes_balancing_authorities: pd.DataFrame, ) -> pd.DataFrame: """Adds timezone column and combined ID with BA/subregion used for imputation.""" core_eia930__hourly_operations = _add_timezone( core_eia930__hourly_operations, core_eia__codes_balancing_authorities, ) # TODO: BA code WAUE does not have listed timezone, so dropping these records for now waue_mask = core_eia930__hourly_operations["balancing_authority_code_eia"] == "WAUE" assert core_eia930__hourly_operations.loc[waue_mask, "timezone"].isnull().all(), ( "WAUE not expected to have a timezone" ) core_eia930__hourly_operations = core_eia930__hourly_operations.loc[~waue_mask] assert core_eia930__hourly_operations.timezone.notnull().all(), ( "All records should have a timezone after dropping WAUE" ) return core_eia930__hourly_operations
@asset( compute_kind="Python", required_resource_keys={"global_data_config"}, )
[docs] def _out_eia930__hourly_subregion_demand( core_eia930__hourly_subregion_demand: pd.DataFrame, core_eia__codes_balancing_authorities: pd.DataFrame, ) -> pd.DataFrame: """Adds timezone column and combined ID with BA/subregion used for imputation.""" core_eia930__hourly_subregion_demand = _add_timezone( core_eia930__hourly_subregion_demand, core_eia__codes_balancing_authorities, ) core_eia930__hourly_subregion_demand["combined_subregion_ba_code_eia"] = ( core_eia930__hourly_subregion_demand["balancing_authority_code_eia"].astype( "string" ) + core_eia930__hourly_subregion_demand[ "balancing_authority_subregion_code_eia" ].astype("string") ) return core_eia930__hourly_subregion_demand
[docs] def _years_from_context(context) -> list[int]: return [ int(half_year[:4]) for half_year in context.resources.global_data_config.pudl.eia.eia930.half_years ]
@asset
[docs] def _out_eia930__combined_demand( _out_eia930__hourly_operations: pd.DataFrame, _out_eia930__hourly_subregion_demand: pd.DataFrame, ) -> pd.DataFrame: """Combine subregion and BA demand into a single DataFrame to perform imputation.""" _out_eia930__hourly_operations["granularity"] = "ba" _out_eia930__hourly_subregion_demand["granularity"] = "subregion" # Set combined subregion/ba ID to just BA for BA specific data _out_eia930__hourly_operations["combined_subregion_ba_code_eia"] = ( _out_eia930__hourly_operations["balancing_authority_code_eia"] ) _out_eia930__hourly_operations["balancing_authority_subregion_code_eia"] = "" common_cols = [ "datetime_utc", "demand_reported_mwh", "timezone", "combined_subregion_ba_code_eia", "granularity", "balancing_authority_subregion_code_eia", "balancing_authority_code_eia", ] return pd.concat( [ _out_eia930__hourly_subregion_demand[common_cols], _out_eia930__hourly_operations[common_cols], ] )
[docs] imputed_combined_demand_assets = impute_timeseries_asset_factory( input_asset_name="_out_eia930__combined_demand", output_asset_name="_out_eia930__combined_imputed_demand", years_from_context=_years_from_context, value_col="demand_reported_mwh", imputed_value_col="demand_imputed_pudl_mwh", id_col="combined_subregion_ba_code_eia", simulation_group_col="granularity", output_io_manager_key="io_manager", settings=ImputeTimeseriesSettings( # The tnn method tends to work better when year is incomplete, so use # when imputing current year method_overrides={date.today().year: "tnn"}, ), )
@multi_asset( outs={ "out_eia930__hourly_subregion_demand": AssetOut( io_manager_key="parquet_io_manager" ), "out_eia930__hourly_operations": AssetOut(io_manager_key="parquet_io_manager"), } )
[docs] def split_ba_subregion_demand( _out_eia930__combined_imputed_demand: pd.DataFrame, core_eia930__hourly_subregion_demand: pd.DataFrame, core_eia930__hourly_operations: pd.DataFrame, ): """Split combined imputed demand into separate BA/subregion tables.""" # Merge core asset on imputed output asset to get columns dropped during imputation ba_demand = _out_eia930__combined_imputed_demand[ # Just merge BA data so we have a one-one merge _out_eia930__combined_imputed_demand["granularity"] == "ba" ].merge( # Drop reported demand so we don't have duplicate columns core_eia930__hourly_operations.drop(columns=["demand_reported_mwh"]), on=["datetime_utc", "balancing_authority_code_eia"], validate="one_to_one", how="left", ) # Repeat with subregion demand subregion_demand = _out_eia930__combined_imputed_demand[ _out_eia930__combined_imputed_demand["granularity"] == "subregion" ].merge( core_eia930__hourly_subregion_demand.drop(columns=["demand_reported_mwh"]), on=[ "datetime_utc", "balancing_authority_code_eia", "balancing_authority_subregion_code_eia", ], validate="one_to_one", how="left", ) return ( Output( output_name="out_eia930__hourly_subregion_demand", value=subregion_demand, ), Output( output_name="out_eia930__hourly_operations", value=ba_demand, ), )
@asset(io_manager_key="parquet_io_manager")
[docs] def out_eia930__hourly_aggregated_demand( out_eia930__hourly_operations: pd.DataFrame, core_eia__codes_balancing_authorities: pd.DataFrame, ) -> pd.DataFrame: """Aggregate imputed demand from the BA level to region, interconnect, and contiguous US.""" aggregation_levels = { "region": "balancing_authority_region_code_eia", "interconnect": "interconnect_code_eia", } # Merge with ``core_eia__codes_balancing_authorities`` to get mapping between # Balancing authorities and regions/interconnects other = core_eia__codes_balancing_authorities[ [ "code", "balancing_authority_region_code_eia", "interconnect_code_eia", ] ].rename(columns={"code": "balancing_authority_code_eia"}) df = out_eia930__hourly_operations.merge(other, on=["balancing_authority_code_eia"]) # Sum to aggregation levels and rename columns aggregated_dfs = [] for level, column in aggregation_levels.items(): aggregated_df = ( df.groupby([column, "datetime_utc"], as_index=False, observed=True)[ "demand_imputed_pudl_mwh" ] .sum() .rename(columns={column: "aggregation_group"}) ) aggregated_df["aggregation_level"] = level aggregated_df["aggregation_group"] = aggregated_df[ "aggregation_group" ].str.lower() aggregated_dfs.append(aggregated_df) conus_df = df.groupby(["datetime_utc"], as_index=False)[ "demand_imputed_pudl_mwh" ].sum() conus_df["aggregation_group"] = "conus" conus_df["aggregation_level"] = "conus" return pd.concat(aggregated_dfs + [conus_df])