Source code for pudl.transform.rus7

"""Transform the RUS7 tables."""

import pandas as pd
from dagster import AssetIn, AssetOut, Output, asset, multi_asset

import pudl.transform.rus as rus
from pudl import logging_helpers
from pudl.helpers import make_changelog, multi_index_stack
from pudl.metadata.enums import (
    LOAN_STATUS_TYPES_RUS7,
    LOAN_UNIT_TYPES_RUS7,
    SERVICE_INTERRUPTION_PERIODS_RUS7,
    SERVICE_INTERRUPTION_TYPES_RUS7,
    SERVICE_STATUS_RUS7,
    TRANSMISSION_DISTRIBUTION_TYPES_RUS7,
    UTILITY_PLANT_GROUP_RUS7,
    UTILITY_PLANT_ITEM_RUS7,
)
from pudl.metadata.resources.rus12 import HARVESTED_CORE_TABLES_RUS7
from pudl.transform.eia import harvest_entity_tables

[docs] logger = logging_helpers.get_logger(__name__)
@asset
[docs] def _core_rus7__yearly_meeting_and_board(raw_rus7__meeting_and_board): """Transform the meeting and board (aka governance) table.""" df = rus.early_transform( raw_df=raw_rus7__meeting_and_board, boolean_columns_to_fix=[ "does_manager_have_written_contract", "was_quorum_present", ], ) rus.early_check_pk(df) df.last_annual_meeting_date = pd.to_datetime( df.last_annual_meeting_date, format="mixed" ) return df
@asset
[docs] def _core_rus7__yearly_balance_sheet_assets(raw_rus7__balance_sheet): """Transform the balance sheet assets table.""" df = rus.early_transform(raw_df=raw_rus7__balance_sheet) rus.early_check_pk(df) # MELT idx_ish = ["report_date", "borrower_id_rus", "borrower_name_rus"] value_vars = list(df.filter(regex=r"_assets$").columns) df = df.melt( id_vars=idx_ish, value_vars=value_vars, var_name="asset_type", value_name="ending_balance", ) df.asset_type = df.asset_type.str.removesuffix("_assets") # POST-MELT df["is_total"] = df.asset_type.str.startswith("total_") return df
@asset
[docs] def _core_rus7__yearly_balance_sheet_liabilities(raw_rus7__balance_sheet): """Transform the balance sheet liabilities table.""" df = rus.early_transform(raw_df=raw_rus7__balance_sheet) rus.early_check_pk(df) # MELT idx_ish = ["report_date", "borrower_id_rus", "borrower_name_rus"] value_vars = list(df.filter(regex=r"_liabilities$").columns) df = df.melt( id_vars=idx_ish, value_vars=value_vars, var_name="liability_type", value_name="ending_balance", ) df.liability_type = df.liability_type.str.removesuffix("_liabilities") # POST-MELT df["is_total"] = df.liability_type.str.startswith("total_") return df
@asset
[docs] def _core_rus7__scd_borrowers(raw_rus7__borrowers): """Transform the borrowers table.""" df = rus.early_transform(raw_df=raw_rus7__borrowers) rus.early_check_pk(df) # TODO: encode region_code? return df.assign( state=lambda x: x.borrower_id_rus.str.extract(r"^([A-Z]{2})\d{4}$") )
@asset
[docs] def _core_rus7__yearly_employee_statistics(raw_rus7__employee_statistics): """Transform the employee statistics table.""" df = rus.early_transform(raw_df=raw_rus7__employee_statistics) rus.early_check_pk(df) return df
@asset
[docs] def _core_rus7__yearly_energy_efficiency(raw_rus7__energy_efficiency): """Transform the energy efficiency table.""" df = rus.early_transform(raw_df=raw_rus7__energy_efficiency) rus.early_check_pk(df) # Multi-Stack data_cols = ["customers_num", "savings_mmbtu", "invested"] df = multi_index_stack( df, idx_ish=["report_date", "borrower_id_rus", "borrower_name_rus"], data_cols=data_cols, pattern=rf"^({'|'.join(data_cols)})_(.+)_(cumulative|new_in_report_year)$", match_names=["data_cols", "customer_class", "observation_period"], unstack_level=["customer_class", "observation_period"], ) return df
@asset
[docs] def _core_rus7__power_requirements(raw_rus7__power_requirements): """Early transform an internal power_requirements table. This main input gets used serval times in several downstream ``core_rus7__yearly_power_requirements*`` assets. The raw asset needs some cleaning and dropping of duplicate records so we do it once. """ df = rus.early_transform( raw_df=raw_rus7__power_requirements, boolean_columns_to_fix=["is_peak_coincident"], ) # PK duplicate management df = df.reset_index(drop=True) dupe_mask = df.duplicated(subset=["report_date", "borrower_id_rus"], keep=False) if not df[dupe_mask].empty: # visually inspecting these two dupes i learned that most of the values in # one record are null. And all of the non-null values seem to be the exact same. # Which led me to want to drop the mostly null record. # First check this assumption: Are all of the non-null values the same? assert ( df[dupe_mask] .dropna(axis=1, how="any") .reset_index(drop=True) .T.assign(is_same=lambda x: x[0] == x[1]) .is_same.all() ) # find the mostly null record of these two dupes and drop it more_null_loc = ( df[dupe_mask].isna().sum(axis=1).sort_values(ascending=False).index[0] ) df = df.drop(more_null_loc, axis="index") rus.early_check_pk(df) return df
@asset
[docs] def _core_rus7__yearly_power_requirements_electric_sales( _core_rus7__power_requirements: pd.DataFrame, ) -> pd.DataFrame: """Transform the power requirements of electric sales table. The resulting table is a portion of the power_requirements tables, which pertains to the sales and revenue of electricity. """ df = _core_rus7__power_requirements # Multi-Stack data_cols = ["sales_kwh", "revenue"] df = multi_index_stack( df, idx_ish=["report_date", "borrower_id_rus", "borrower_name_rus"], data_cols=data_cols, pattern=rf"^(.+)_({'|'.join(data_cols)})$", match_names=["customer_class", "data_cols"], unstack_level=["customer_class"], expected_dropped_cols=27, ) # then convert all of the units from kWh to MWh df = rus.convert_units( df, old_unit="kwh", new_unit="mwh", converter=0.001, ) return df
@asset
[docs] def _core_rus7__yearly_power_requirements_electric_customers( _core_rus7__power_requirements: pd.DataFrame, ) -> pd.DataFrame: """Transform the power requirements of electric customers table. The resulting table is a portion of the power_requirements tables, which pertains to the number of customers in different customer classes. """ df = _core_rus7__power_requirements # Multi-Stack data_cols = ["customers_num"] df = multi_index_stack( df, idx_ish=["report_date", "borrower_id_rus", "borrower_name_rus"], data_cols=data_cols, pattern=rf"^(.+)_({data_cols[0]})_(december|avg)$", match_names=["customer_class", "data_cols", "observation_period"], unstack_level=["customer_class", "observation_period"], expected_dropped_cols=29, ) return df
@asset
[docs] def _core_rus7__yearly_power_requirements( _core_rus7__power_requirements: pd.DataFrame, ) -> pd.DataFrame: """Transform the power requirements table. The resulting table is a portion of the power_requirements tables, which pertains to the revenue from several portions of the borrower's business as well as several types of electricity generated, purchased or used. """ df = _core_rus7__power_requirements # The electric sales portion of this table gets reshaped and pulled into two # separate tables. The electric sales portion of this table ends with two totals # the rest of the table pertains to other utility functions. The totals show up # in the reshaped electric sales portion of the table but we also rename them # and include them here as well. That way this table has all of the sectors of # power requirements reported in one place. df = ( df.rename( columns={ "total_revenue": "electric_sales_revenue", "total_sales_kwh": "electric_sales_kwh", } ) # then convert all of the units from kW* to MW* .pipe( rus.convert_units, old_unit="kwh", new_unit="mwh", converter=0.001, ) .pipe( rus.convert_units, old_unit="kw", new_unit="mw", converter=0.001, ) ) # this portion of the table does not need a reshape. Applying enforce_schema # will effectively drop all the other columns in this table. return df
@asset
[docs] def _core_rus7__yearly_investments( raw_rus7__investments: pd.DataFrame, ) -> pd.DataFrame: """Transform the investments table.""" df = rus.early_transform( raw_df=raw_rus7__investments, boolean_columns_to_fix=["for_rural_development"], ) # No PK in this table return df
@asset
[docs] def _core_rus7__yearly_long_term_debt( raw_rus7__long_term_debt: pd.DataFrame, ) -> pd.DataFrame: """Transform the core_rus7__yearly_investments table.""" df = rus.early_transform(raw_df=raw_rus7__long_term_debt) # No PK in this table return df
@asset
[docs] def _core_rus7__yearly_patronage_capital( raw_rus7__patronage_capital: pd.DataFrame, ) -> pd.DataFrame: """Transform the patronage capital table.""" df = rus.early_transform(raw_df=raw_rus7__patronage_capital) rus.early_check_pk(df) def _melt_on_date(df, period): idx_ish = ["report_date", "borrower_id_rus", "borrower_name_rus"] value_vars = list(df.filter(regex=rf"_{period}$").columns) range_df = df.melt( id_vars=idx_ish, value_vars=value_vars, var_name="patronage_type", value_name=f"patronage_{period}", ) range_df.patronage_type = range_df.patronage_type.str.removesuffix(f"_{period}") return range_df.set_index(idx_ish + ["patronage_type"]) df = pd.merge( _melt_on_date(df, "cumulative"), _melt_on_date(df, "report_year"), right_index=True, left_index=True, how="outer", ).reset_index() df["is_total"] = df.patronage_type.str.startswith("total_") return df
@asset
[docs] def _core_rus7__yearly_statement_of_operations( raw_rus7__statement_of_operations: pd.DataFrame, ) -> pd.DataFrame: """Transform the statement of operations table.""" df = rus.early_transform(raw_df=raw_rus7__statement_of_operations) rus.early_check_pk(df) statement_groups = [ "cost_of_electric_service", "opex", "patronage_and_operating_margins", "patronage_capital_or_margins", ] periods = ["opex_ytd", "opex_ytd_budget", "opex_report_month"] pattern = rf"^({'|'.join(statement_groups)})_(.+)_({'|'.join(periods)})$" df = multi_index_stack( df, idx_ish=["report_date", "borrower_id_rus", "borrower_name_rus"], data_cols=periods, pattern=pattern, match_names=["opex_group", "opex_type", "data_cols"], unstack_level=["opex_group", "opex_type"], ) df["is_total"] = df.opex_type.str.startswith("total") return df
@multi_asset( outs={ "_core_rus7__yearly_owed_by_customers": AssetOut(), "_core_rus7__yearly_customer_energy_efficiency_and_conservation_loans": AssetOut(), } )
[docs] def _core_rus7__consumer_debt(raw_rus7__owed_by_customers: pd.DataFrame): """Transform the owed by consumer table. This transform splits the owed_by_consumers table into one table describing general consumer debts and one table describing the status of energy efficiency and conservation loan program debts. """ df = rus.early_transform(raw_df=raw_rus7__owed_by_customers) rus.early_check_pk(df) # Split tables df_owed_by_consumers = df[ [ "report_date", "borrower_id_rus", "borrower_name_rus", "amount_due_over_60_days", "amount_written_off_ytd", ] ] df_loan_program_debt = df[ ["report_date", "borrower_id_rus", "borrower_name_rus"] + [col for col in df.columns if col not in df_owed_by_consumers.columns] ] pattern = ( rf"^({'|'.join(LOAN_STATUS_TYPES_RUS7)})_({'|'.join(LOAN_UNIT_TYPES_RUS7)})$" ) df_loan_program_debt = multi_index_stack( df_loan_program_debt, idx_ish=["report_date", "borrower_id_rus", "borrower_name_rus"], data_cols=LOAN_UNIT_TYPES_RUS7, pattern=pattern, match_names=["loan_status", "data_cols"], unstack_level=["loan_status"], ) return ( Output( output_name="_core_rus7__yearly_owed_by_customers", value=df_owed_by_consumers, ), Output( output_name="_core_rus7__yearly_customer_energy_efficiency_and_conservation_loans", value=df_loan_program_debt, ), )
@asset
[docs] def _core_rus7__yearly_service_interruptions( raw_rus7__service_interruptions: pd.DataFrame, ) -> pd.DataFrame: """Transform the service_interruptions table.""" df = rus.early_transform(raw_df=raw_rus7__service_interruptions) rus.early_check_pk(df) pattern = rf"^({'|'.join(SERVICE_INTERRUPTION_TYPES_RUS7)})_({'|'.join(SERVICE_INTERRUPTION_PERIODS_RUS7)})_(saidi_minutes)$" df = multi_index_stack( df, idx_ish=["report_date", "borrower_id_rus", "borrower_name_rus"], data_cols="saidi_minutes", pattern=pattern, match_names=["service_interruption_cause", "observation_period", "data_cols"], unstack_level=["service_interruption_cause", "observation_period"], ) df["is_total"] = df.service_interruption_cause == "total" return df
@multi_asset( outs={ "_core_rus7__yearly_distribution_services": AssetOut(), "_core_rus7__yearly_transmission_and_distribution_mileage": AssetOut(), } )
[docs] def _core_rus7__transmission_and_distribution( raw_rus7__transmission_and_distribution: pd.DataFrame, ): """Transform the transmission_and_distribution table.""" df = rus.early_transform(raw_df=raw_rus7__transmission_and_distribution) rus.early_check_pk(df) # Split into services and miles tables id_cols = ["borrower_id_rus", "borrower_name_rus", "report_date"] services_df = df[id_cols + [col for col in df.columns if "services" in col]] miles_df = df[id_cols + [col for col in df.columns if col not in services_df]] # Stack the services table pattern = rf"^(services)_({'|'.join(SERVICE_STATUS_RUS7)})$" services_df = multi_index_stack( services_df, idx_ish=id_cols, data_cols="services", pattern=pattern, match_names=["data_cols", "service_status"], unstack_level=["service_status"], ) services_df["is_total"] = services_df["service_status"] == "total" # Stack the mileage dataframe pattern = rf"^({'|'.join(TRANSMISSION_DISTRIBUTION_TYPES_RUS7)})_(?:length|energized)_(miles)$" miles_df = multi_index_stack( miles_df, idx_ish=id_cols, data_cols="miles", pattern=pattern, match_names=["line_type", "data_cols"], unstack_level=["line_type"], ) miles_df["is_total"] = miles_df["line_type"] == "total" return ( Output( output_name="_core_rus7__yearly_distribution_services", value=services_df, ), Output( output_name="_core_rus7__yearly_transmission_and_distribution_mileage", value=miles_df, ), )
@asset
[docs] def _core_rus7__yearly_long_term_leases( raw_rus7__long_term_leases: pd.DataFrame, ) -> pd.DataFrame: """Transform the long term leases table.""" df = rus.early_transform(raw_df=raw_rus7__long_term_leases) # Spot fix negative rental value that should be positive based on the same property_type # reported in other years to the same borrower. # Need 2013 conditional for integration tests that run the fast ETL with # only the most recent year of data. if 2013 in df.report_date.dt.year.to_numpy(): mask = ( (df.borrower_id_rus == "LA0015") & (df.report_date == "2013-12-01") & (df.property_type == "Tower Right-Of-Way") ) assert len(df[mask]) == 1, ( "Expected exactly one record to be affected by this spot fix." ) df.loc[mask, "rental_cost_ytd"] = abs(df.loc[mask, "rental_cost_ytd"]) # TO-DO: there are some sus rows where rental cost is 0 or all categories are NA. # We could remove these? return df
@asset
[docs] def _core_rus7__yearly_loans( raw_rus7__loan_guarantees: pd.DataFrame, raw_rus7__loans: pd.DataFrame, ) -> pd.DataFrame: """Transform the raw_rus7__loans and raw_rus7__loan_guarantees tables.""" df_loans = rus.early_transform( raw_df=raw_rus7__loans, boolean_columns_to_fix=["is_for_rural_development"], string_cols_to_simplify=["loan_recipient"], ).assign(is_loan_guarantee=False) df_loan_guarantees = rus.early_transform( raw_df=raw_rus7__loan_guarantees, boolean_columns_to_fix=["is_for_rural_development"], string_cols_to_simplify=["loan_recipient"], ).assign(is_loan_guarantee=True) # Combine raw tables df = pd.concat([df_loans, df_loan_guarantees]) # Spot fix bad year in loan guarantees table for NC0050 loan from kenansville fire dept # Was reported as 6202 and should be 2028 based on the same loan from # prior years. # Need 2020 conditional for integration tests that run the fast ETL with # only the most recent year of data. if 2020 in df.report_date.dt.year.to_numpy(): mask1 = ( (df.borrower_id_rus == "NC0050") & (df.loan_recipient.str.contains("kenansville")) & (df.report_date.dt.year == 2020) ) assert len(df[mask1]) == 1, ( "Expected exactly one record to be affected by this spot fix." ) df.loc[mask1, "loan_maturity_date"] = "1/19/2028 12:00:00 AM" # Spot fix bad year in loan table for ND0051 loan from erc - paulson, david # Was reported as 2/8/2820 12:00:00 AM but because it was reported in 2006 # there is no prior year to compare it to and there are no other matching loans # in future years because it was paid. Just NA for now. # Need 2006 conditional for integration tests that run the fast ETL with # only the most recent year of data. if 2006 in df.report_date.dt.year.to_numpy(): mask2 = ( (df.borrower_id_rus == "ND0051") & (df.loan_recipient.str.contains("erc - paulson, david")) & (df.report_date.dt.year == 2006) & (df.loan_original_amount == 5000) ) assert len(df[mask2]) == 1, ( "Expected exactly one record to be affected by this spot fix." ) df.loc[mask2, "loan_maturity_date"] = pd.NaT # Convert all loan_maturity_dates to datetime df.loan_maturity_date = pd.to_datetime(df.loan_maturity_date) # TO-DO: could standardize loan_recipient names # TO-DO: there are some validation cases where loan balance exceeds the original amount. return df
@asset
[docs] def _core_rus7__yearly_external_financial_risk_ratio( raw_rus7__external_financial_risk_ratio: pd.DataFrame, ) -> pd.DataFrame: """Transform the raw_rus7__external_financial_risk_ratio table.""" df = rus.early_transform(raw_df=raw_rus7__external_financial_risk_ratio) df["external_financial_risk_ratio"] = df["external_financial_risk_ratio"] return df
@asset
[docs] def _core_rus7__yearly_energy_purchased( raw_rus7__energy_purchased: pd.DataFrame, ) -> pd.DataFrame: """Transform the raw_rus7__energy_purchased table.""" df = rus.early_transform( raw_df=raw_rus7__energy_purchased, boolean_columns_to_fix=["is_supplier_eia_respondent"], ) # Convert units df = rus.convert_units( df, old_unit="kwh", new_unit="mwh", converter=0.001, ).pipe( rus.convert_units, old_unit="cents_per_mwh", new_unit="dollars_per_mwh", converter=0.01, ) # Spot fix fuel_types df["fuel_type"] = df.fuel_type.replace( { "Solar - photvoltaic": "Solar - photovoltaic", "12": "Solar - photovoltaic", "14": "Wind", } ) # Spot fix fuel_type_codes df["fuel_type_code_rus"] = df.fuel_type_code_rus.replace( { " LLC - Commercial Solar": 12, " Iowa Windfarm": 14, } ) df["fuel_type_code_rus"] = pd.to_numeric(df["fuel_type_code_rus"]).astype("Int64") # TO-DO: it looks like the supplier_code_rus, is_supplier_eia_respondent, and utility_name_eia fields could # be turned into their own table. I investigated a bit and it's not a perfect 1:1 mapping, but # it's close. Could turn this function into a multi-asset with an scd table... return df
@asset
[docs] def _core_rus7__yearly_materials_and_supplies( raw_rus7__materials_and_supplies: pd.DataFrame, ) -> pd.DataFrame: """Transform the materials and supplies table.""" df = rus.early_transform(raw_df=raw_rus7__materials_and_supplies) data_cols = [ "adjustment", "ending_balance", "purchased", "salvaged", "sold", "used", ] electric_or_other_materials = [ "electric_materials", "other_materials", ] df = multi_index_stack( df, idx_ish=["report_date", "borrower_id_rus", "borrower_name_rus"], data_cols=data_cols, pattern=rf"^({'|'.join(electric_or_other_materials)})_({'|'.join(data_cols)})$", match_names=["electric_or_other_materials", "data_cols"], unstack_level=["electric_or_other_materials"], ) df = df.rename(columns={x: "materials_" + x for x in data_cols}) return df
@asset
[docs] def _core_rus7__yearly_utility_plant_changes( raw_rus7__utility_plant_changes: pd.DataFrame, ): """Transform the utility plant changes table.""" df = rus.early_transform(raw_df=raw_rus7__utility_plant_changes) data_cols = [ "retirements", "additions", "adjustments_and_transfers", "ending_balance", ] df = multi_index_stack( df, idx_ish=["report_date", "borrower_id_rus", "borrower_name_rus"], data_cols=data_cols, pattern=rf"^({'|'.join(UTILITY_PLANT_GROUP_RUS7)})_({'|'.join(UTILITY_PLANT_ITEM_RUS7)})_({'|'.join(data_cols)})$", match_names=["utility_plant_group", "utility_plant_item", "data_cols"], unstack_level=["utility_plant_group", "utility_plant_item"], ) df["is_total"] = df.utility_plant_item == "total" return df
###################################### # HARVESTING aka NORMALIZATION ###################################### # The USDA would be proud of this name
[docs] _CORE_RUS7_TABLES = [f"_{t}" for t in HARVESTED_CORE_TABLES_RUS7]
@multi_asset( ins={ table_name: AssetIn() for table_name in ["_core_rus7__scd_borrowers"] + _CORE_RUS7_TABLES }, outs={ "core_rus7__entity_borrowers": AssetOut(io_manager_key="pudl_io_manager"), "_core_rus7__forensics_entity_resolution_borrowers": AssetOut( io_manager_key="pudl_io_manager" ), }, )
[docs] def core_rus7__entity_borrowers(context, **clean_dfs): """Harvesting IDs & consistent static attributes for RUS7 entity.""" entity = rus.RusEntity.BORROWERS logger.info("Harvesting IDs & consistent static attributes for RUS Borrowers") # We want **all** borrowers to have non-null names in this entity # table. They aren't always super consistent over time, but we have # vetted them (see https://github.com/catalyst-cooperative/pudl/pull/5056#issuecomment-4008247047) # and thus decided to set the threshold for consistency strictness # at 0% (instead of the default 70%) so we the most consistent value # no matter what. special_case_strictness = {"borrower_name_rus": 0} # We only need the entity table, but the harvesting process # always produces entity (aka static) as annual (aka scd) tables. # as well as a helpful-for-debugging dictionary of dfs for all # values columns we are harvesting entity_df, annual_df, _col_dfs = harvest_entity_tables( entity, clean_dfs, special_case_strictness=special_case_strictness, debug=True, ) out_all = pd.concat( [df for harvested_col_name, df in _col_dfs.items()], axis="index" ).reset_index(drop=True) forensics = make_changelog(out_all, ["borrower_id_rus"]) return entity_df, forensics
[docs] finished_rus_assets = [ rus.finished_rus_asset_factory( table_name=_core_table_name.removeprefix("_"), _core_table_name=_core_table_name, io_manager_key="pudl_io_manager", ) for _core_table_name in _CORE_RUS7_TABLES ]