"""Transform the RUS7 tables."""
import pandas as pd
from dagster import AssetIn, AssetOut, Output, asset, multi_asset
import pudl.transform.rus as rus
from pudl import logging_helpers
from pudl.helpers import make_changelog, multi_index_stack
from pudl.metadata.enums import (
LOAN_STATUS_TYPES_RUS7,
LOAN_UNIT_TYPES_RUS7,
SERVICE_INTERRUPTION_PERIODS_RUS7,
SERVICE_INTERRUPTION_TYPES_RUS7,
SERVICE_STATUS_RUS7,
TRANSMISSION_DISTRIBUTION_TYPES_RUS7,
UTILITY_PLANT_GROUP_RUS7,
UTILITY_PLANT_ITEM_RUS7,
)
from pudl.metadata.resources.rus12 import HARVESTED_CORE_TABLES_RUS7
from pudl.transform.eia import harvest_entity_tables
[docs]
logger = logging_helpers.get_logger(__name__)
@asset
[docs]
def _core_rus7__yearly_meeting_and_board(raw_rus7__meeting_and_board):
"""Transform the meeting and board (aka governance) table."""
df = rus.early_transform(
raw_df=raw_rus7__meeting_and_board,
boolean_columns_to_fix=[
"does_manager_have_written_contract",
"was_quorum_present",
],
)
rus.early_check_pk(df)
df.last_annual_meeting_date = pd.to_datetime(
df.last_annual_meeting_date, format="mixed"
)
return df
@asset
[docs]
def _core_rus7__yearly_balance_sheet_assets(raw_rus7__balance_sheet):
"""Transform the balance sheet assets table."""
df = rus.early_transform(raw_df=raw_rus7__balance_sheet)
rus.early_check_pk(df)
# MELT
idx_ish = ["report_date", "borrower_id_rus", "borrower_name_rus"]
value_vars = list(df.filter(regex=r"_assets$").columns)
df = df.melt(
id_vars=idx_ish,
value_vars=value_vars,
var_name="asset_type",
value_name="ending_balance",
)
df.asset_type = df.asset_type.str.removesuffix("_assets")
# POST-MELT
df["is_total"] = df.asset_type.str.startswith("total_")
return df
@asset
[docs]
def _core_rus7__yearly_balance_sheet_liabilities(raw_rus7__balance_sheet):
"""Transform the balance sheet liabilities table."""
df = rus.early_transform(raw_df=raw_rus7__balance_sheet)
rus.early_check_pk(df)
# MELT
idx_ish = ["report_date", "borrower_id_rus", "borrower_name_rus"]
value_vars = list(df.filter(regex=r"_liabilities$").columns)
df = df.melt(
id_vars=idx_ish,
value_vars=value_vars,
var_name="liability_type",
value_name="ending_balance",
)
df.liability_type = df.liability_type.str.removesuffix("_liabilities")
# POST-MELT
df["is_total"] = df.liability_type.str.startswith("total_")
return df
@asset
[docs]
def _core_rus7__scd_borrowers(raw_rus7__borrowers):
"""Transform the borrowers table."""
df = rus.early_transform(raw_df=raw_rus7__borrowers)
rus.early_check_pk(df)
# TODO: encode region_code?
return df.assign(
state=lambda x: x.borrower_id_rus.str.extract(r"^([A-Z]{2})\d{4}$")
)
@asset
[docs]
def _core_rus7__yearly_employee_statistics(raw_rus7__employee_statistics):
"""Transform the employee statistics table."""
df = rus.early_transform(raw_df=raw_rus7__employee_statistics)
rus.early_check_pk(df)
return df
@asset
[docs]
def _core_rus7__yearly_energy_efficiency(raw_rus7__energy_efficiency):
"""Transform the energy efficiency table."""
df = rus.early_transform(raw_df=raw_rus7__energy_efficiency)
rus.early_check_pk(df)
# Multi-Stack
data_cols = ["customers_num", "savings_mmbtu", "invested"]
df = multi_index_stack(
df,
idx_ish=["report_date", "borrower_id_rus", "borrower_name_rus"],
data_cols=data_cols,
pattern=rf"^({'|'.join(data_cols)})_(.+)_(cumulative|new_in_report_year)$",
match_names=["data_cols", "customer_class", "observation_period"],
unstack_level=["customer_class", "observation_period"],
)
return df
@asset
[docs]
def _core_rus7__power_requirements(raw_rus7__power_requirements):
"""Early transform an internal power_requirements table.
This main input gets used serval times in several downstream
``core_rus7__yearly_power_requirements*`` assets. The raw asset needs some
cleaning and dropping of duplicate records so we do it once.
"""
df = rus.early_transform(
raw_df=raw_rus7__power_requirements,
boolean_columns_to_fix=["is_peak_coincident"],
)
# PK duplicate management
df = df.reset_index(drop=True)
dupe_mask = df.duplicated(subset=["report_date", "borrower_id_rus"], keep=False)
if not df[dupe_mask].empty:
# visually inspecting these two dupes i learned that most of the values in
# one record are null. And all of the non-null values seem to be the exact same.
# Which led me to want to drop the mostly null record.
# First check this assumption: Are all of the non-null values the same?
assert (
df[dupe_mask]
.dropna(axis=1, how="any")
.reset_index(drop=True)
.T.assign(is_same=lambda x: x[0] == x[1])
.is_same.all()
)
# find the mostly null record of these two dupes and drop it
more_null_loc = (
df[dupe_mask].isna().sum(axis=1).sort_values(ascending=False).index[0]
)
df = df.drop(more_null_loc, axis="index")
rus.early_check_pk(df)
return df
@asset
[docs]
def _core_rus7__yearly_power_requirements_electric_sales(
_core_rus7__power_requirements: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the power requirements of electric sales table.
The resulting table is a portion of the power_requirements tables, which
pertains to the sales and revenue of electricity.
"""
df = _core_rus7__power_requirements
# Multi-Stack
data_cols = ["sales_kwh", "revenue"]
df = multi_index_stack(
df,
idx_ish=["report_date", "borrower_id_rus", "borrower_name_rus"],
data_cols=data_cols,
pattern=rf"^(.+)_({'|'.join(data_cols)})$",
match_names=["customer_class", "data_cols"],
unstack_level=["customer_class"],
expected_dropped_cols=27,
)
# then convert all of the units from kWh to MWh
df = rus.convert_units(
df,
old_unit="kwh",
new_unit="mwh",
converter=0.001,
)
return df
@asset
[docs]
def _core_rus7__yearly_power_requirements_electric_customers(
_core_rus7__power_requirements: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the power requirements of electric customers table.
The resulting table is a portion of the power_requirements tables, which
pertains to the number of customers in different customer classes.
"""
df = _core_rus7__power_requirements
# Multi-Stack
data_cols = ["customers_num"]
df = multi_index_stack(
df,
idx_ish=["report_date", "borrower_id_rus", "borrower_name_rus"],
data_cols=data_cols,
pattern=rf"^(.+)_({data_cols[0]})_(december|avg)$",
match_names=["customer_class", "data_cols", "observation_period"],
unstack_level=["customer_class", "observation_period"],
expected_dropped_cols=29,
)
return df
@asset
[docs]
def _core_rus7__yearly_power_requirements(
_core_rus7__power_requirements: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the power requirements table.
The resulting table is a portion of the power_requirements tables, which
pertains to the revenue from several portions of the borrower's business as well
as several types of electricity generated, purchased or used.
"""
df = _core_rus7__power_requirements
# The electric sales portion of this table gets reshaped and pulled into two
# separate tables. The electric sales portion of this table ends with two totals
# the rest of the table pertains to other utility functions. The totals show up
# in the reshaped electric sales portion of the table but we also rename them
# and include them here as well. That way this table has all of the sectors of
# power requirements reported in one place.
df = (
df.rename(
columns={
"total_revenue": "electric_sales_revenue",
"total_sales_kwh": "electric_sales_kwh",
}
)
# then convert all of the units from kW* to MW*
.pipe(
rus.convert_units,
old_unit="kwh",
new_unit="mwh",
converter=0.001,
)
.pipe(
rus.convert_units,
old_unit="kw",
new_unit="mw",
converter=0.001,
)
)
# this portion of the table does not need a reshape. Applying enforce_schema
# will effectively drop all the other columns in this table.
return df
@asset
[docs]
def _core_rus7__yearly_investments(
raw_rus7__investments: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the investments table."""
df = rus.early_transform(
raw_df=raw_rus7__investments,
boolean_columns_to_fix=["for_rural_development"],
)
# No PK in this table
return df
@asset
[docs]
def _core_rus7__yearly_long_term_debt(
raw_rus7__long_term_debt: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the core_rus7__yearly_investments table."""
df = rus.early_transform(raw_df=raw_rus7__long_term_debt)
# No PK in this table
return df
@asset
[docs]
def _core_rus7__yearly_patronage_capital(
raw_rus7__patronage_capital: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the patronage capital table."""
df = rus.early_transform(raw_df=raw_rus7__patronage_capital)
rus.early_check_pk(df)
def _melt_on_date(df, period):
idx_ish = ["report_date", "borrower_id_rus", "borrower_name_rus"]
value_vars = list(df.filter(regex=rf"_{period}$").columns)
range_df = df.melt(
id_vars=idx_ish,
value_vars=value_vars,
var_name="patronage_type",
value_name=f"patronage_{period}",
)
range_df.patronage_type = range_df.patronage_type.str.removesuffix(f"_{period}")
return range_df.set_index(idx_ish + ["patronage_type"])
df = pd.merge(
_melt_on_date(df, "cumulative"),
_melt_on_date(df, "report_year"),
right_index=True,
left_index=True,
how="outer",
).reset_index()
df["is_total"] = df.patronage_type.str.startswith("total_")
return df
@asset
[docs]
def _core_rus7__yearly_statement_of_operations(
raw_rus7__statement_of_operations: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the statement of operations table."""
df = rus.early_transform(raw_df=raw_rus7__statement_of_operations)
rus.early_check_pk(df)
statement_groups = [
"cost_of_electric_service",
"opex",
"patronage_and_operating_margins",
"patronage_capital_or_margins",
]
periods = ["opex_ytd", "opex_ytd_budget", "opex_report_month"]
pattern = rf"^({'|'.join(statement_groups)})_(.+)_({'|'.join(periods)})$"
df = multi_index_stack(
df,
idx_ish=["report_date", "borrower_id_rus", "borrower_name_rus"],
data_cols=periods,
pattern=pattern,
match_names=["opex_group", "opex_type", "data_cols"],
unstack_level=["opex_group", "opex_type"],
)
df["is_total"] = df.opex_type.str.startswith("total")
return df
@multi_asset(
outs={
"_core_rus7__yearly_owed_by_customers": AssetOut(),
"_core_rus7__yearly_customer_energy_efficiency_and_conservation_loans": AssetOut(),
}
)
[docs]
def _core_rus7__consumer_debt(raw_rus7__owed_by_customers: pd.DataFrame):
"""Transform the owed by consumer table.
This transform splits the owed_by_consumers table into one table describing general
consumer debts and one table describing the status of energy efficiency and
conservation loan program debts.
"""
df = rus.early_transform(raw_df=raw_rus7__owed_by_customers)
rus.early_check_pk(df)
# Split tables
df_owed_by_consumers = df[
[
"report_date",
"borrower_id_rus",
"borrower_name_rus",
"amount_due_over_60_days",
"amount_written_off_ytd",
]
]
df_loan_program_debt = df[
["report_date", "borrower_id_rus", "borrower_name_rus"]
+ [col for col in df.columns if col not in df_owed_by_consumers.columns]
]
pattern = (
rf"^({'|'.join(LOAN_STATUS_TYPES_RUS7)})_({'|'.join(LOAN_UNIT_TYPES_RUS7)})$"
)
df_loan_program_debt = multi_index_stack(
df_loan_program_debt,
idx_ish=["report_date", "borrower_id_rus", "borrower_name_rus"],
data_cols=LOAN_UNIT_TYPES_RUS7,
pattern=pattern,
match_names=["loan_status", "data_cols"],
unstack_level=["loan_status"],
)
return (
Output(
output_name="_core_rus7__yearly_owed_by_customers",
value=df_owed_by_consumers,
),
Output(
output_name="_core_rus7__yearly_customer_energy_efficiency_and_conservation_loans",
value=df_loan_program_debt,
),
)
@asset
[docs]
def _core_rus7__yearly_service_interruptions(
raw_rus7__service_interruptions: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the service_interruptions table."""
df = rus.early_transform(raw_df=raw_rus7__service_interruptions)
rus.early_check_pk(df)
pattern = rf"^({'|'.join(SERVICE_INTERRUPTION_TYPES_RUS7)})_({'|'.join(SERVICE_INTERRUPTION_PERIODS_RUS7)})_(saidi_minutes)$"
df = multi_index_stack(
df,
idx_ish=["report_date", "borrower_id_rus", "borrower_name_rus"],
data_cols="saidi_minutes",
pattern=pattern,
match_names=["service_interruption_cause", "observation_period", "data_cols"],
unstack_level=["service_interruption_cause", "observation_period"],
)
df["is_total"] = df.service_interruption_cause == "total"
return df
@multi_asset(
outs={
"_core_rus7__yearly_distribution_services": AssetOut(),
"_core_rus7__yearly_transmission_and_distribution_mileage": AssetOut(),
}
)
[docs]
def _core_rus7__transmission_and_distribution(
raw_rus7__transmission_and_distribution: pd.DataFrame,
):
"""Transform the transmission_and_distribution table."""
df = rus.early_transform(raw_df=raw_rus7__transmission_and_distribution)
rus.early_check_pk(df)
# Split into services and miles tables
id_cols = ["borrower_id_rus", "borrower_name_rus", "report_date"]
services_df = df[id_cols + [col for col in df.columns if "services" in col]]
miles_df = df[id_cols + [col for col in df.columns if col not in services_df]]
# Stack the services table
pattern = rf"^(services)_({'|'.join(SERVICE_STATUS_RUS7)})$"
services_df = multi_index_stack(
services_df,
idx_ish=id_cols,
data_cols="services",
pattern=pattern,
match_names=["data_cols", "service_status"],
unstack_level=["service_status"],
)
services_df["is_total"] = services_df["service_status"] == "total"
# Stack the mileage dataframe
pattern = rf"^({'|'.join(TRANSMISSION_DISTRIBUTION_TYPES_RUS7)})_(?:length|energized)_(miles)$"
miles_df = multi_index_stack(
miles_df,
idx_ish=id_cols,
data_cols="miles",
pattern=pattern,
match_names=["line_type", "data_cols"],
unstack_level=["line_type"],
)
miles_df["is_total"] = miles_df["line_type"] == "total"
return (
Output(
output_name="_core_rus7__yearly_distribution_services",
value=services_df,
),
Output(
output_name="_core_rus7__yearly_transmission_and_distribution_mileage",
value=miles_df,
),
)
@asset
[docs]
def _core_rus7__yearly_long_term_leases(
raw_rus7__long_term_leases: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the long term leases table."""
df = rus.early_transform(raw_df=raw_rus7__long_term_leases)
# Spot fix negative rental value that should be positive based on the same property_type
# reported in other years to the same borrower.
# Need 2013 conditional for integration tests that run the fast ETL with
# only the most recent year of data.
if 2013 in df.report_date.dt.year.to_numpy():
mask = (
(df.borrower_id_rus == "LA0015")
& (df.report_date == "2013-12-01")
& (df.property_type == "Tower Right-Of-Way")
)
assert len(df[mask]) == 1, (
"Expected exactly one record to be affected by this spot fix."
)
df.loc[mask, "rental_cost_ytd"] = abs(df.loc[mask, "rental_cost_ytd"])
# TO-DO: there are some sus rows where rental cost is 0 or all categories are NA.
# We could remove these?
return df
@asset
[docs]
def _core_rus7__yearly_loans(
raw_rus7__loan_guarantees: pd.DataFrame,
raw_rus7__loans: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the raw_rus7__loans and raw_rus7__loan_guarantees tables."""
df_loans = rus.early_transform(
raw_df=raw_rus7__loans,
boolean_columns_to_fix=["is_for_rural_development"],
string_cols_to_simplify=["loan_recipient"],
).assign(is_loan_guarantee=False)
df_loan_guarantees = rus.early_transform(
raw_df=raw_rus7__loan_guarantees,
boolean_columns_to_fix=["is_for_rural_development"],
string_cols_to_simplify=["loan_recipient"],
).assign(is_loan_guarantee=True)
# Combine raw tables
df = pd.concat([df_loans, df_loan_guarantees])
# Spot fix bad year in loan guarantees table for NC0050 loan from kenansville fire dept
# Was reported as 6202 and should be 2028 based on the same loan from
# prior years.
# Need 2020 conditional for integration tests that run the fast ETL with
# only the most recent year of data.
if 2020 in df.report_date.dt.year.to_numpy():
mask1 = (
(df.borrower_id_rus == "NC0050")
& (df.loan_recipient.str.contains("kenansville"))
& (df.report_date.dt.year == 2020)
)
assert len(df[mask1]) == 1, (
"Expected exactly one record to be affected by this spot fix."
)
df.loc[mask1, "loan_maturity_date"] = "1/19/2028 12:00:00 AM"
# Spot fix bad year in loan table for ND0051 loan from erc - paulson, david
# Was reported as 2/8/2820 12:00:00 AM but because it was reported in 2006
# there is no prior year to compare it to and there are no other matching loans
# in future years because it was paid. Just NA for now.
# Need 2006 conditional for integration tests that run the fast ETL with
# only the most recent year of data.
if 2006 in df.report_date.dt.year.to_numpy():
mask2 = (
(df.borrower_id_rus == "ND0051")
& (df.loan_recipient.str.contains("erc - paulson, david"))
& (df.report_date.dt.year == 2006)
& (df.loan_original_amount == 5000)
)
assert len(df[mask2]) == 1, (
"Expected exactly one record to be affected by this spot fix."
)
df.loc[mask2, "loan_maturity_date"] = pd.NaT
# Convert all loan_maturity_dates to datetime
df.loan_maturity_date = pd.to_datetime(df.loan_maturity_date)
# TO-DO: could standardize loan_recipient names
# TO-DO: there are some validation cases where loan balance exceeds the original amount.
return df
@asset
[docs]
def _core_rus7__yearly_external_financial_risk_ratio(
raw_rus7__external_financial_risk_ratio: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the raw_rus7__external_financial_risk_ratio table."""
df = rus.early_transform(raw_df=raw_rus7__external_financial_risk_ratio)
df["external_financial_risk_ratio"] = df["external_financial_risk_ratio"]
return df
@asset
[docs]
def _core_rus7__yearly_energy_purchased(
raw_rus7__energy_purchased: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the raw_rus7__energy_purchased table."""
df = rus.early_transform(
raw_df=raw_rus7__energy_purchased,
boolean_columns_to_fix=["is_supplier_eia_respondent"],
)
# Convert units
df = rus.convert_units(
df,
old_unit="kwh",
new_unit="mwh",
converter=0.001,
).pipe(
rus.convert_units,
old_unit="cents_per_mwh",
new_unit="dollars_per_mwh",
converter=0.01,
)
# Spot fix fuel_types
df["fuel_type"] = df.fuel_type.replace(
{
"Solar - photvoltaic": "Solar - photovoltaic",
"12": "Solar - photovoltaic",
"14": "Wind",
}
)
# Spot fix fuel_type_codes
df["fuel_type_code_rus"] = df.fuel_type_code_rus.replace(
{
" LLC - Commercial Solar": 12,
" Iowa Windfarm": 14,
}
)
df["fuel_type_code_rus"] = pd.to_numeric(df["fuel_type_code_rus"]).astype("Int64")
# TO-DO: it looks like the supplier_code_rus, is_supplier_eia_respondent, and utility_name_eia fields could
# be turned into their own table. I investigated a bit and it's not a perfect 1:1 mapping, but
# it's close. Could turn this function into a multi-asset with an scd table...
return df
@asset
[docs]
def _core_rus7__yearly_materials_and_supplies(
raw_rus7__materials_and_supplies: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the materials and supplies table."""
df = rus.early_transform(raw_df=raw_rus7__materials_and_supplies)
data_cols = [
"adjustment",
"ending_balance",
"purchased",
"salvaged",
"sold",
"used",
]
electric_or_other_materials = [
"electric_materials",
"other_materials",
]
df = multi_index_stack(
df,
idx_ish=["report_date", "borrower_id_rus", "borrower_name_rus"],
data_cols=data_cols,
pattern=rf"^({'|'.join(electric_or_other_materials)})_({'|'.join(data_cols)})$",
match_names=["electric_or_other_materials", "data_cols"],
unstack_level=["electric_or_other_materials"],
)
df = df.rename(columns={x: "materials_" + x for x in data_cols})
return df
@asset
[docs]
def _core_rus7__yearly_utility_plant_changes(
raw_rus7__utility_plant_changes: pd.DataFrame,
):
"""Transform the utility plant changes table."""
df = rus.early_transform(raw_df=raw_rus7__utility_plant_changes)
data_cols = [
"retirements",
"additions",
"adjustments_and_transfers",
"ending_balance",
]
df = multi_index_stack(
df,
idx_ish=["report_date", "borrower_id_rus", "borrower_name_rus"],
data_cols=data_cols,
pattern=rf"^({'|'.join(UTILITY_PLANT_GROUP_RUS7)})_({'|'.join(UTILITY_PLANT_ITEM_RUS7)})_({'|'.join(data_cols)})$",
match_names=["utility_plant_group", "utility_plant_item", "data_cols"],
unstack_level=["utility_plant_group", "utility_plant_item"],
)
df["is_total"] = df.utility_plant_item == "total"
return df
######################################
# HARVESTING aka NORMALIZATION
######################################
# The USDA would be proud of this name
[docs]
_CORE_RUS7_TABLES = [f"_{t}" for t in HARVESTED_CORE_TABLES_RUS7]
@multi_asset(
ins={
table_name: AssetIn()
for table_name in ["_core_rus7__scd_borrowers"] + _CORE_RUS7_TABLES
},
outs={
"core_rus7__entity_borrowers": AssetOut(io_manager_key="pudl_io_manager"),
"_core_rus7__forensics_entity_resolution_borrowers": AssetOut(
io_manager_key="pudl_io_manager"
),
},
)
[docs]
def core_rus7__entity_borrowers(context, **clean_dfs):
"""Harvesting IDs & consistent static attributes for RUS7 entity."""
entity = rus.RusEntity.BORROWERS
logger.info("Harvesting IDs & consistent static attributes for RUS Borrowers")
# We want **all** borrowers to have non-null names in this entity
# table. They aren't always super consistent over time, but we have
# vetted them (see https://github.com/catalyst-cooperative/pudl/pull/5056#issuecomment-4008247047)
# and thus decided to set the threshold for consistency strictness
# at 0% (instead of the default 70%) so we the most consistent value
# no matter what.
special_case_strictness = {"borrower_name_rus": 0}
# We only need the entity table, but the harvesting process
# always produces entity (aka static) as annual (aka scd) tables.
# as well as a helpful-for-debugging dictionary of dfs for all
# values columns we are harvesting
entity_df, annual_df, _col_dfs = harvest_entity_tables(
entity,
clean_dfs,
special_case_strictness=special_case_strictness,
debug=True,
)
out_all = pd.concat(
[df for harvested_col_name, df in _col_dfs.items()], axis="index"
).reset_index(drop=True)
forensics = make_changelog(out_all, ["borrower_id_rus"])
return entity_df, forensics
[docs]
finished_rus_assets = [
rus.finished_rus_asset_factory(
table_name=_core_table_name.removeprefix("_"),
_core_table_name=_core_table_name,
io_manager_key="pudl_io_manager",
)
for _core_table_name in _CORE_RUS7_TABLES
]