pudl.analysis.timeseries_cleaning#

Screen timeseries for anomalies and impute missing and anomalous values.

For a narrative discussion of these methods aimed at data users, see Timeseries Imputation.

The screening methods were originally designed to identify unrealistic data in the electricity demand timeseries reported in EIA Form 930 – Hourly and Daily Balancing Authority Operations Report, and we have also applied them to demand data from FERC Form 714 – Annual Electric Balancing Authority Area and Planning Area Report.

Screening methods are adapted from code written and maintained by:

And described at:

The imputation methods were designed for multivariate time series forecasting. They are adapted from code published by Xinyu Chen and described at:

Attributes#

logger

STANDARD_UTC_OFFSETS

Hour offset from Coordinated Universal Time (UTC) by time zone.

Classes#

UTCTimeseriesDataFrame

Define schema of input tables for timeseries cleaning.

AlignedTimeseriesDataFrame

Define schema of input tables for timeseries cleaning.

TimeseriesMatrix

Define schema for timeseries matrix used during imputation.

FlaggedTimeseries

Container class used to flag values in a timeseries matrix for imputation.

SimulateFlagsSettings

Define settings used to simulate flagged values for scoring imputation.

SimulationDataFrame

Collection of months of data which will be used to simulate flagged values.

ImputeTimeseriesSettings

Define settings used for timeseries imputation.

Functions#

_shift_utc(→ pandas.Series)

Shift utc by UTC offset.

utc_dataframe_to_aligned(...)

Return DataFrame with datetime_utc shifted by offset to align timeseries'.

pivot_aligned_timeseries_dataframe(...)

Pivot aligned timeseries dataframe into timeseries matrix and pad if needed.

melt_imputed_timeseries_matrix(...)

Melt imputed timeseries matrix and flag matrix to time-aligned dataframe.

slice_axis(→ tuple[slice, Ellipsis])

Return an index that slices an array along an axis.

array_diff(→ numpy.ndarray)

First discrete difference of array elements.

encode_run_length(→ tuple[numpy.ndarray, numpy.ndarray])

Encode vector with run-length encoding.

insert_run_length(→ numpy.ndarray)

Insert run-length encoded values into a vector.

_mat2ten(→ numpy.ndarray)

Fold matrix into a tensor.

_ten2mat(→ numpy.ndarray)

Unfold tensor into a matrix.

_svt_tnn(→ numpy.ndarray)

Singular value thresholding (SVT) truncated nuclear norm (TNN) minimization.

impute_latc_tnn(→ numpy.ndarray)

Impute tensor values with LATC-TNN method by Chen and Sun (2020).

_tsvt(→ numpy.ndarray)

Tensor singular value thresholding (TSVT).

impute_latc_tubal(→ numpy.ndarray)

Impute tensor values with LATC-Tubal method by Chen, Chen and Sun (2020).

flag_null(→ FlaggedTimeseries)

Flag null values (MISSING_VALUE).

flag_negative_or_zero(→ FlaggedTimeseries)

Flag negative or zero values (NEGATIVE_OR_ZERO).

flag_identical_run(→ FlaggedTimeseries)

Flag the last values in identical runs (IDENTICAL_RUN).

flag_global_outlier(→ FlaggedTimeseries)

Flag values greater or less than n times the global median (GLOBAL_OUTLIER).

flag_global_outlier_neighbor(→ FlaggedTimeseries)

Flag values neighboring global outliers (GLOBAL_OUTLIER_NEIGHBOR).

rolling_median(→ numpy.ndarray)

Rolling median of values.

rolling_median_offset(→ numpy.ndarray)

Values minus the rolling median.

median_of_rolling_median_offset() → numpy.ndarray)

Median of the offset from the rolling median.

rolling_iqr_of_rolling_median_offset(→ numpy.ndarray)

Rolling interquartile range (IQR) of rolling median offset.

median_prediction(, long_window)

Values predicted from local and regional rolling medians.

flag_local_outlier(, long_window, iqr_window, ...)

Flag local outliers (LOCAL_OUTLIER_HIGH, LOCAL_OUTLIER_LOW).

diff(→ numpy.ndarray)

Values minus the value of their neighbor.

rolling_iqr_of_diff(→ numpy.ndarray)

Rolling interquartile range (IQR) of difference between neighboring values.

flag_double_delta(→ FlaggedTimeseries)

Flag values very different from neighbors on either side (DOUBLE_DELTA).

relative_median_prediction(→ numpy.ndarray)

Values divided by their value predicted from medians.

iqr_of_diff_of_relative_median_prediction(→ numpy.ndarray)

Interquartile range of running difference of relative median prediction.

_find_single_delta(→ numpy.ndarray)

flag_single_delta(, long_window, iqr_window, ...)

Flag values very different from the nearest unflagged value (SINGLE_DELTA).

flag_anomalous_region(→ FlaggedTimeseries)

Flag values surrounded by flagged values (ANOMALOUS_REGION).

flag_bad_years(→ FlaggedTimeseries)

Flag entire years, which are missing a large portion of values (BAD_YEAR).

flag_ruggles(...)

Flag values following the method of Ruggles and others (2020).

summarize_flags(→ pandas.DataFrame)

Summarize flagged values by flag, count and median.

simulate_nulls(→ numpy.ndarray)

Find non-null values to null to match a run-length distribution.

fold_tensor(→ numpy.ndarray)

Fold into a 3-dimensional tensor representation.

unfold_tensor(→ numpy.ndarray)

Unfold a 3-dimensional tensor representation.

impute(→ pandera.typing.DataFrame[TimeseriesMatrix])

Impute null values.

summarize_imputed(→ pandas.DataFrame)

Summarize the fit of imputed values to actual values.

impute_flagged_values(...)

Impute null values in input timeseries matrix.

_merge_imputed(→ pandas.DataFrame)

Helper function to melt imputed timeseries matrix and merge back on input asset.

_add_simulated_flag_col(...)

Return a modified imputed_df with a column indicating which rows should be flagged for simulation.

get_simulated_flag_mask(...)

Return a flag mask to flag values for simulated imputation.

impute_timeseries_asset_factory() → pandas.DataFrame)

Produces assets to impute values for a given timeseries table/column.

Module Contents#

pudl.analysis.timeseries_cleaning.logger[source]#
pudl.analysis.timeseries_cleaning.STANDARD_UTC_OFFSETS: dict[str, str][source]#

Hour offset from Coordinated Universal Time (UTC) by time zone.

Time zones are canonical names (e.g. ‘America/Denver’) from tzdata ( https://www.iana.org/time-zones) mapped to their standard-time UTC offset.

class pudl.analysis.timeseries_cleaning.UTCTimeseriesDataFrame[source]#

Bases: pandera.pandas.DataFrameModel

Define schema of input tables for timeseries cleaning.

This model defines the expected structure of an input dataframe to the timeseries imputation process. It will be be immediately converted to a AlignedTimeseriesDataFrame, then pivoted to a TimeseriesMatrix.

id_col: pandera.typing.Series[Any][source]#

Entity ID column(s). Used to group timeseries by entity.

datetime_utc: pandera.typing.Series[pandera.pandas.dtypes.DateTime][source]#

Datetimes in UTC timezone.

timezone: pandera.typing.Series[str] | None[source]#

Local timezone of entity.

value_col: pandera.typing.Series[pandas.Float64Dtype][source]#

Column containing actual values to impute.

class pudl.analysis.timeseries_cleaning.AlignedTimeseriesDataFrame[source]#

Bases: pandera.pandas.DataFrameModel

Define schema of input tables for timeseries cleaning.

This model is nearly identical to a UTCTimeseriesDataFrame, but the datetime_utc values are aligned to “local” datetime’s using a fixed UTC offset.

id_col: pandera.typing.Series[Any][source]#

Entity ID column(s). Used to group timeseries by entity.

datetime: pandera.typing.Series[pandera.pandas.dtypes.DateTime][source]#

Datetimes shifted by UTC offset to align all timeseries’.

value_col: pandera.typing.Series[pandas.Float64Dtype][source]#

Column containing actual values to impute.

flags: pandera.typing.Series[str] | None[source]#

Column indicating why value was flagged for imputation.

class pudl.analysis.timeseries_cleaning.TimeseriesMatrix[source]#

Bases: pandera.pandas.DataFrameModel

Define schema for timeseries matrix used during imputation.

TimeseriesMatrix is the main type used during imputation. It is a dataframe with a datetime row index (e.g. ‘2006-01-01 00:00:00’, …, ‘2019-12-31 23:00:00’) in local time ignoring daylight-savings, and a id_col column index (e.g. 101, …, 329). Since the columns are dynamically generated by pivoting a AlignedTimeseriesDataFrame, this model only explicitly defines the datetime index. The primary purpose of this type is to annotate methods in this module, so the expected inputs and outputs are immediately clear.

datetime: pandera.typing.Index[pandera.pandas.dtypes.DateTime][source]#

Index timeseries matrix by datetime.

pudl.analysis.timeseries_cleaning._shift_utc(utc: pandas.Series, utc_offset: pandas.Series) pandas.Series[source]#

Shift utc by UTC offset.

Parameters:
  • utc – UTC times (tz-naive datetime64[ns] or datetime64[ns, UTC]).

  • utc_offset – For each datetime in utc a corresponding offset in hours.

Returns:

Shifted datetimes (tz-naive datetime64[ns]).

Examples

>>> s = pd.Series([pd.Timestamp(2020, 1, 1), pd.Timestamp(2020, 1, 1)])
>>> _shift_utc(s, [-7, -6])
0   2019-12-31 17:00:00
1   2019-12-31 18:00:00
dtype: datetime64[ns]
pudl.analysis.timeseries_cleaning.utc_dataframe_to_aligned(input_df: pandera.typing.DataFrame[UTCTimeseriesDataFrame]) pandera.typing.DataFrame[AlignedTimeseriesDataFrame][source]#

Return DataFrame with datetime_utc shifted by offset to align timeseries’.

pudl.analysis.timeseries_cleaning.pivot_aligned_timeseries_dataframe(aligned_df: pandera.typing.DataFrame[AlignedTimeseriesDataFrame], value_col: str = 'value_col') pandera.typing.DataFrame[TimeseriesMatrix][source]#

Pivot aligned timeseries dataframe into timeseries matrix and pad if needed.

Padding finds the complete list of hours from the start of the first day present in the timeseries to the end of the last, and then fills any missing hours with NULLs.

pudl.analysis.timeseries_cleaning.melt_imputed_timeseries_matrix(imputed_matrix: pandera.typing.DataFrame[TimeseriesMatrix], flag_matrix: pandera.typing.DataFrame[TimeseriesMatrix]) pandera.typing.DataFrame[AlignedTimeseriesDataFrame][source]#

Melt imputed timeseries matrix and flag matrix to time-aligned dataframe.

class pudl.analysis.timeseries_cleaning.FlaggedTimeseries[source]#

Container class used to flag values in a timeseries matrix for imputation.

x: numpy.ndarray[source]#
columns: pandas.Index[source]#
index: pandas.Index[source]#
flags: numpy.ndarray[source]#
uuid: str[source]#
__hash__()[source]#

Implement hash for lru_cache.

classmethod from_timeseries_matrix(matrix: pandas.DataFrame, flags: pandas.DataFrame | None = None) FlaggedTimeseries[source]#

Create a timeseries object from a dataframe.

to_dataframes() tuple[pandas.DataFrame, pandas.DataFrame][source]#

Convert back to a dataframe.

flag(mask: numpy.ndarray, flag: pudl.metadata.dfs.ImputationReasonCodes) FlaggedTimeseries[source]#

Flag values.

Flags values (if not already flagged) and nulls flagged values.

Parameters:
  • mask – Boolean mask of the values to flag.

  • flag – Flag name.

pudl.analysis.timeseries_cleaning.slice_axis(x: numpy.ndarray, start: int = None, end: int = None, step: int = None, axis: int = 0) tuple[slice, Ellipsis][source]#

Return an index that slices an array along an axis.

Parameters:
  • x – Array to slice.

  • start – Start index of slice.

  • end – End index of slice.

  • step – Step size of slice.

  • axis – Axis along which to slice.

Returns:

Tuple of slice that slices array x along axis axis (x[…, start:stop:step]).

Examples

>>> x = np.random.random((3, 4, 5))
>>> np.all(x[1:] == x[slice_axis(x, start=1, axis=0)])
np.True_
>>> np.all(x[:, 1:] == x[slice_axis(x, start=1, axis=1)])
np.True_
>>> np.all(x[:, :, 1:] == x[slice_axis(x, start=1, axis=2)])
np.True_
pudl.analysis.timeseries_cleaning.array_diff(x: numpy.ndarray, periods: int = 1, axis: int = 0, fill: Any = np.nan) numpy.ndarray[source]#

First discrete difference of array elements.

This is a fast numpy implementation of pd.DataFrame.diff().

Parameters:
  • periods – Periods to shift for calculating difference, accepts negative values.

  • axis – Array axis along which to calculate the difference.

  • fill – Value to use at the margins where a difference cannot be calculated.

Returns:

Array of same shape and type as x with discrete element differences.

Examples

>>> x = np.random.random((4, 2))
>>> np.all(array_diff(x, 1)[1:] == pd.DataFrame(x).diff(1).to_numpy()[1:])
np.True_
>>> np.all(array_diff(x, 2)[2:] == pd.DataFrame(x).diff(2).to_numpy()[2:])
np.True_
>>> np.all(array_diff(x, -1)[:-1] == pd.DataFrame(x).diff(-1).to_numpy()[:-1])
np.True_
pudl.analysis.timeseries_cleaning.encode_run_length(x: collections.abc.Sequence | numpy.ndarray) tuple[numpy.ndarray, numpy.ndarray][source]#

Encode vector with run-length encoding.

Parameters:

x – Vector to encode.

Returns:

Values and their run lengths.

Examples

>>> x = np.array([0, 1, 1, 0, 1])
>>> encode_run_length(x)
(array([0, 1, 0, 1]), array([1, 2, 1, 1]))
>>> encode_run_length(x.astype('bool'))
(array([False,  True, False,  True]), array([1, 2, 1, 1]))
>>> encode_run_length(x.astype('<U1'))
(array(['0', '1', '0', '1'], dtype='<U1'), array([1, 2, 1, 1]))
>>> encode_run_length(np.where(x == 0, np.nan, x))
(array([nan,  1., nan,  1.]), array([1, 2, 1, 1]))
pudl.analysis.timeseries_cleaning.insert_run_length(x: collections.abc.Sequence | numpy.ndarray, values: collections.abc.Sequence | numpy.ndarray, lengths: collections.abc.Sequence[int], mask: collections.abc.Sequence[bool] = None, padding: int = 0, intersect: bool = False) numpy.ndarray[source]#

Insert run-length encoded values into a vector.

Parameters:
  • x – Vector to insert values into.

  • values – Values to insert.

  • lengths – Length of run to insert for each value in values.

  • mask – Boolean mask, of the same length as x, where values can be inserted. By default, values can be inserted anywhere in x.

  • padding – Minimum space between inserted runs and, if mask is provided, the edges of masked-out areas.

  • intersect – Whether to allow inserted runs to intersect each other.

Raises:
  • ValueError – Padding must zero or greater.

  • ValueError – Run length must be greater than zero.

  • ValueError – Could not find space for run of length {length}.

Returns:

Copy of array x with values inserted.

Example

>>> x = [0, 0, 0, 0]
>>> mask = [True, False, True, True]
>>> insert_run_length(x, values=[1, 2], lengths=[1, 2], mask=mask)
array([1, 0, 2, 2])

If we use unique values for the background and each inserted run, the run length encoding of the result (ignoring the background) is the same as the inserted run, albeit in a different order.

>>> x = np.zeros(10, dtype=int)
>>> values = [1, 2, 3]
>>> lengths = [1, 2, 3]
>>> x = insert_run_length(x, values=values, lengths=lengths)
>>> rvalues, rlengths = encode_run_length(x[x != 0])
>>> order = np.argsort(rvalues)
>>> all(rvalues[order] == values) and all(rlengths[order] == lengths)
True

Null values can be inserted into a vector such that the new null runs match the run length encoding of the existing null runs.

>>> x = [1, 2, np.nan, np.nan, 5, 6, 7, 8, np.nan]
>>> is_nan = np.isnan(x)
>>> rvalues, rlengths = encode_run_length(is_nan)
>>> xi = insert_run_length(
...     x,
...     values=[np.nan] * rvalues.sum(),
...     lengths=rlengths[rvalues],
...     mask=~is_nan
... )
>>> np.isnan(xi).sum() == 2 * is_nan.sum()
np.True_

The same as above, with non-zero padding, yields a unique solution:

>>> insert_run_length(
...     x,
...     values=[np.nan] * rvalues.sum(),
...     lengths=rlengths[rvalues],
...     mask=~is_nan,
...     padding=1
... )
array([nan,  2., nan, nan,  5., nan, nan,  8., nan])
pudl.analysis.timeseries_cleaning._mat2ten(matrix: numpy.ndarray, shape: numpy.ndarray, mode: int) numpy.ndarray[source]#

Fold matrix into a tensor.

pudl.analysis.timeseries_cleaning._ten2mat(tensor: numpy.ndarray, mode: int) numpy.ndarray[source]#

Unfold tensor into a matrix.

pudl.analysis.timeseries_cleaning._svt_tnn(matrix: numpy.ndarray, tau: float, theta: int) numpy.ndarray[source]#

Singular value thresholding (SVT) truncated nuclear norm (TNN) minimization.

pudl.analysis.timeseries_cleaning.impute_latc_tnn(tensor: numpy.ndarray, lags: collections.abc.Sequence[int] = [1], alpha: collections.abc.Sequence[float] = [1 / 3, 1 / 3, 1 / 3], rho0: float = 1e-07, lambda0: float = 2e-07, theta: int = 20, epsilon: float = 1e-07, maxiter: int = 300) numpy.ndarray[source]#

Impute tensor values with LATC-TNN method by Chen and Sun (2020).

Uses low-rank autoregressive tensor completion (LATC) with truncated nuclear norm (TNN) minimization.

Parameters:
  • tensor – Observational series in the form (series, groups, periods). Null values are replaced with zeros, so any zeros will be treated as null.

  • lags

  • alpha

  • rho0

  • lambda0

  • theta

  • epsilon – Convergence criterion. A smaller number will result in more iterations.

  • maxiter – Maximum number of iterations.

Returns:

Tensor with missing values in tensor replaced by imputed values.

pudl.analysis.timeseries_cleaning._tsvt(tensor: numpy.ndarray, phi: numpy.ndarray, tau: float) numpy.ndarray[source]#

Tensor singular value thresholding (TSVT).

pudl.analysis.timeseries_cleaning.impute_latc_tubal(tensor: numpy.ndarray, lags: collections.abc.Sequence[int] = [1], rho0: float = 1e-07, lambda0: float = 2e-07, epsilon: float = 1e-07, maxiter: int = 300) numpy.ndarray[source]#

Impute tensor values with LATC-Tubal method by Chen, Chen and Sun (2020).

Uses low-tubal-rank autoregressive tensor completion (LATC-Tubal). It is much faster than impute_latc_tnn() for very large datasets, with comparable accuracy.

Parameters:
  • tensor – Observational series in the form (series, groups, periods). Null values are replaced with zeros, so any zeros will be treated as null.

  • lags

  • rho0

  • lambda0

  • epsilon – Convergence criterion. A smaller number will result in more iterations.

  • maxiter – Maximum number of iterations.

Returns:

Tensor with missing values in tensor replaced by imputed values.

pudl.analysis.timeseries_cleaning.flag_null(ts: FlaggedTimeseries) FlaggedTimeseries[source]#

Flag null values (MISSING_VALUE).

pudl.analysis.timeseries_cleaning.flag_negative_or_zero(ts: FlaggedTimeseries) FlaggedTimeseries[source]#

Flag negative or zero values (NEGATIVE_OR_ZERO).

pudl.analysis.timeseries_cleaning.flag_identical_run(ts: FlaggedTimeseries, length: int = 3) FlaggedTimeseries[source]#

Flag the last values in identical runs (IDENTICAL_RUN).

Parameters:

length – Run length to flag. If 3, the third (and subsequent) identical values are flagged.

Raises:

ValueError – Run length must be 2 or greater.

pudl.analysis.timeseries_cleaning.flag_global_outlier(ts: FlaggedTimeseries, medians: float = 9) FlaggedTimeseries[source]#

Flag values greater or less than n times the global median (GLOBAL_OUTLIER).

Parameters:

medians – Number of times the median the value must exceed the median.

pudl.analysis.timeseries_cleaning.flag_global_outlier_neighbor(ts: FlaggedTimeseries, neighbors: int = 1) FlaggedTimeseries[source]#

Flag values neighboring global outliers (GLOBAL_OUTLIER_NEIGHBOR).

Parameters:

neighbors – Number of neighbors to flag on either side of each outlier.

Raises:

ValueError – Global outliers must be flagged first.

pudl.analysis.timeseries_cleaning.rolling_median(ts: FlaggedTimeseries, window: int = 48) numpy.ndarray[source]#

Rolling median of values.

Parameters:

window – Number of values in the moving window.

pudl.analysis.timeseries_cleaning.rolling_median_offset(ts: FlaggedTimeseries, window: int = 48) numpy.ndarray[source]#

Values minus the rolling median.

Estimates the local cycle in cyclical data by removing longterm trends.

Parameters:

window – Number of values in the moving window.

pudl.analysis.timeseries_cleaning.median_of_rolling_median_offset(ts: FlaggedTimeseries, window: int = 48, shifts: collections.abc.Sequence[int] = range(-240, 241, 24)) numpy.ndarray[source]#

Median of the offset from the rolling median.

Calculated by shifting the rolling median offset (rolling_median_offset()) by different numbers of values, then taking the median at each position. Estimates the typical local cycle in cyclical data.

Parameters:
  • window – Number of values in the moving window for the rolling median.

  • shifts – Number of values to shift the rolling median offset by.

pudl.analysis.timeseries_cleaning.rolling_iqr_of_rolling_median_offset(ts: FlaggedTimeseries, window: int = 48, iqr_window: int = 240) numpy.ndarray[source]#

Rolling interquartile range (IQR) of rolling median offset.

Estimates the spread of the local cycles in cyclical data.

Parameters:
  • window – Number of values in the moving window for the rolling median.

  • iqr_window – Number of values in the moving window for the rolling IQR.

pudl.analysis.timeseries_cleaning.median_prediction(ts: FlaggedTimeseries, window: int = 48, shifts: collections.abc.Sequence[int] = range(-240, 241, 24), long_window: int = 480) numpy.ndarray[source]#

Values predicted from local and regional rolling medians.

Calculated as { local median } + { median of local median offset } * { local median } / { regional median }.

Parameters:
  • window – Number of values in the moving window for the local rolling median.

  • shifts – Positions to shift the local rolling median offset by, for computing its median.

  • long_window – Number of values in the moving window for the regional (long) rolling median.

pudl.analysis.timeseries_cleaning.flag_local_outlier(ts: FlaggedTimeseries, window: int = 48, shifts: collections.abc.Sequence[int] = range(-240, 241, 24), long_window: int = 480, iqr_window: int = 240, multiplier: tuple[float, float] = (3.5, 2.5)) FlaggedTimeseries[source]#

Flag local outliers (LOCAL_OUTLIER_HIGH, LOCAL_OUTLIER_LOW).

Flags values which are above or below the median_prediction() by more than a multiplier times the rolling_iqr_of_rolling_median_offset().

Parameters:
  • window – Number of values in the moving window for the local rolling median.

  • shifts – Positions to shift the local rolling median offset by, for computing its median.

  • long_window – Number of values in the moving window for the regional (long) rolling median.

  • iqr_window – Number of values in the moving window for the rolling interquartile range (IQR).

  • multiplier – Number of times the rolling_iqr_of_rolling_median_offset() the value must be above (HIGH) and below (LOW) the median_prediction() to be flagged.

pudl.analysis.timeseries_cleaning.diff(ts: FlaggedTimeseries, shift: int = 1) numpy.ndarray[source]#

Values minus the value of their neighbor.

Parameters:

shift – Positions to shift for calculating the difference. Positive values select a preceding (left) neighbor.

pudl.analysis.timeseries_cleaning.rolling_iqr_of_diff(ts: FlaggedTimeseries, shift: int = 1, window: int = 240) numpy.ndarray[source]#

Rolling interquartile range (IQR) of difference between neighboring values.

Parameters:
  • shift – Positions to shift for calculating the difference.

  • window – Number of values in the moving window for the rolling IQR.

pudl.analysis.timeseries_cleaning.flag_double_delta(ts: FlaggedTimeseries, iqr_window: int = 240, multiplier: float = 2) FlaggedTimeseries[source]#

Flag values very different from neighbors on either side (DOUBLE_DELTA).

Flags values whose differences to both neighbors on either side exceeds a multiplier times the rolling interquartile range (IQR) of neighbor difference.

Parameters:
  • iqr_window – Number of values in the moving window for the rolling IQR of neighbor difference.

  • multiplier – Number of times the rolling IQR of neighbor difference the value’s difference to its neighbors must exceed for the value to be flagged.

pudl.analysis.timeseries_cleaning.relative_median_prediction(ts: FlaggedTimeseries, **kwargs: Any) numpy.ndarray[source]#

Values divided by their value predicted from medians.

Parameters:

kwargs – Arguments to median_prediction().

pudl.analysis.timeseries_cleaning.iqr_of_diff_of_relative_median_prediction(ts: FlaggedTimeseries, shift: int = 1, **kwargs: Any) numpy.ndarray[source]#

Interquartile range of running difference of relative median prediction.

Parameters:
  • shift – Positions to shift for calculating the difference. Positive values select a preceding (left) neighbor.

  • kwargs – Arguments to relative_median_prediction().

pudl.analysis.timeseries_cleaning._find_single_delta(ts: FlaggedTimeseries, relative_median_prediction: numpy.ndarray, relative_median_prediction_long: numpy.ndarray, rolling_iqr_of_diff: numpy.ndarray, iqr_of_diff_of_relative_median_prediction: numpy.ndarray, reverse: bool = False) numpy.ndarray[source]#
pudl.analysis.timeseries_cleaning.flag_single_delta(ts: FlaggedTimeseries, window: int = 48, shifts: collections.abc.Sequence[int] = range(-240, 241, 24), long_window: int = 480, iqr_window: int = 240, multiplier: float = 5, rel_multiplier: float = 15) FlaggedTimeseries[source]#

Flag values very different from the nearest unflagged value (SINGLE_DELTA).

Flags values whose difference to the nearest unflagged value, with respect to value and relative median prediction, differ by less than a multiplier times the rolling interquartile range (IQR) of the difference - multiplier times rolling_iqr_of_diff() and rel_multiplier times iqr_of_diff_of_relative_mean_prediction(), respectively.

Parameters:
  • window – Number of values in the moving window for the rolling median (for the relative median prediction).

  • shifts – Positions to shift the local rolling median offset by, for computing its median (for the relative median prediction).

  • long_window – Number of values in the moving window for the long rolling median (for the relative median prediction).

  • iqr_window – Number of values in the moving window for the rolling IQR of neighbor difference.

  • multiplier – Number of times the rolling IQR of neighbor difference the value’s difference to its neighbor must exceed for the value to be flagged.

  • rel_multiplier – Number of times the rolling IQR of relative median prediction the value’s prediction difference to its neighbor must exceed for the value to be flagged.

pudl.analysis.timeseries_cleaning.flag_anomalous_region(ts: FlaggedTimeseries, window: int = 48, threshold: float = 0.15) FlaggedTimeseries[source]#

Flag values surrounded by flagged values (ANOMALOUS_REGION).

Original null values are not considered flagged values.

Parameters:
  • window – Width of regions.

  • threshold – Fraction of flagged values required for a region to be flagged.

pudl.analysis.timeseries_cleaning.flag_bad_years(ts: FlaggedTimeseries, min_data: int = 100, min_data_fraction: float = 0.9) FlaggedTimeseries[source]#

Flag entire years, which are missing a large portion of values (BAD_YEAR).

This method checks two separate thresholds to determine whether a year is “bad”. First, it finds the range from the first non-null hour to the last non-null hour for each respondent-year. If that total range is less than min_data, then the year is dropped. Next, it checks if the ratio of values within that range which are non-null is greater than min_data_fraction. If not, then the year will also be dropped. This ensures that if there is a section of the year that is mostly complete, even if the rest of the year is NULL, then it will still be included for imputation.

Parameters:
  • ts – Timeseries matrix as described in FlaggedTimeseries.

  • min_data – Minimum number of non-null hours in a year.

  • min_data_fraction – Minimum fraction of non-null hours between the first and last non-null hour in a year.

pudl.analysis.timeseries_cleaning.flag_ruggles(timeseries_matrix: pandera.typing.DataFrame[TimeseriesMatrix], min_data: int = 100, min_data_fraction: float = 0.9) tuple[pandera.typing.DataFrame[TimeseriesMatrix], pandera.typing.DataFrame[TimeseriesMatrix]][source]#

Flag values following the method of Ruggles and others (2020).

Assumes values are hourly electricity demand.

Parameters:
  • ts – Aligned timeseries matrix for imputation.

  • min_data – Minimum number of non-null hours in a year.

  • min_data_fraction – Minimum fraction of non-null hours between the first and last

Returns:

Two TimeseriesMatrix dataframes with the same shape. The first contains the input timeseries with flagged values Nulled out in preparation for imputation. The second contains the actual flags for reference.

pudl.analysis.timeseries_cleaning.summarize_flags(imputed_df: pandas.DataFrame, id_col: str, value_col: str, flag_col: str) pandas.DataFrame[source]#

Summarize flagged values by flag, count and median.

Parameters:

imputed_df – DataFrame

pudl.analysis.timeseries_cleaning.simulate_nulls(x: numpy.ndarray, lengths: collections.abc.Sequence[int] = None, padding: int = 1, intersect: bool = False, overlap: bool = False) numpy.ndarray[source]#

Find non-null values to null to match a run-length distribution.

Parameters:
  • x – Timeseries matrix as described in _prepare_timeseries_matrix() defined within impute_timeseries_asset_factory().

  • length – Length of null runs to simulate for each series. By default, uses the run lengths of null values in each series.

  • padding – Minimum number of non-null values between simulated null runs and between simulated and existing null runs.

  • intersect – Whether simulated null runs can intersect each other.

  • overlap – Whether simulated null runs can overlap existing null runs. If True, padding is ignored.

Returns:

Boolean mask of current non-null values to set to null.

Raises:

ValueError – Could not find space for run of length {length}.

Examples

>>> x = np.column_stack([[1, 2, np.nan, 4, 5, 6, 7, np.nan, np.nan]])
>>> simulate_nulls(x).ravel()
array([ True, False, False, False, True, True, False, False, False])
>>> simulate_nulls(x, lengths=[4], padding=0).ravel()
array([False, False, False, True, True, True, True, False, False])
pudl.analysis.timeseries_cleaning.fold_tensor(x: numpy.ndarray, periods: int = 24) numpy.ndarray[source]#

Fold into a 3-dimensional tensor representation.

Folds the series x (number of observations, number of series) into a 3-d tensor (number of series, number of groups, number of periods), splitting observations into groups of length periods. For example, each group may represent a day and each period the hour of the day.

Parameters:
  • x – Series array to fold. Uses x by default.

  • periods – Number of consecutive values in each series to fold into a group.

Returns:

>>> x = np.column_stack([[1, 2, 3, 4, 5, 6], [10, 20, 30, 40, 50, 60]])
>>> tensor = fold_tensor(x, periods=3)
>>> tensor[0]
array([[1, 2, 3],
       [4, 5, 6]])
>>> np.all(x == unfold_tensor(tensor, x.shape))
np.True_

pudl.analysis.timeseries_cleaning.unfold_tensor(tensor: numpy.ndarray, shape) numpy.ndarray[source]#

Unfold a 3-dimensional tensor representation.

Performs the reverse of fold_tensor().

pudl.analysis.timeseries_cleaning.impute(df: pandera.typing.DataFrame[TimeseriesMatrix], mask: numpy.ndarray = None, periods: int = 24, blocks: int = 1, method: str = 'tubal', **kwargs: Any) pandera.typing.DataFrame[TimeseriesMatrix][source]#

Impute null values.

Note

The imputation method requires that nulls be replaced by zeros, so the series cannot already contain zeros.

Parameters:
  • mask – Boolean mask of values to impute in addition to any null values in x.

  • periods – Number of consecutive values in each series to fold into a group. See fold_tensor(). Default of 24 is meant for hourly data with a diurnal periodicity.

  • blocks – Number of blocks into which to split the series for imputation. This has been found to reduce processing time for method=’tnn’.

  • method – Imputation method to use (‘tubal’: impute_latc_tubal(), ‘tnn’: impute_latc_tnn()).

  • kwargs – Optional arguments to method.

Returns:

Array of same shape as x with all null values (and those selected by mask) replaced with imputed values.

Raises:

ValueError – Zero values present. Replace with very small value.

pudl.analysis.timeseries_cleaning.summarize_imputed(matrix: pandera.typing.DataFrame[TimeseriesMatrix], imputed_matrix: pandera.typing.DataFrame[TimeseriesMatrix], mask: numpy.ndarray) pandas.DataFrame[source]#

Summarize the fit of imputed values to actual values.

Summarizes the agreement between actual and imputed values with the following statistics:

  • mpe: Mean percent error, (actual - imputed) / actual.

  • mape: Mean absolute percent error, abs(mpe).

Parameters:
  • imputed – Series of same shape as x with imputed values. See impute().

  • mask – Boolean mask of imputed values that were not null in x. See simulate_nulls().

Returns:

Table of imputed value statistics for each series.

pudl.analysis.timeseries_cleaning.impute_flagged_values(df: pandera.typing.DataFrame[TimeseriesMatrix], years: list[int], method: dict[int, Literal['tubal', 'tnn']], periods: int = 24, blocks: int = 1) pandera.typing.DataFrame[TimeseriesMatrix][source]#

Impute null values in input timeseries matrix.

Imputation is performed separately for each year, with only the respondents reporting data in that year.

Note

The imputation is parallelized internally, and by default will use all available CPU cores. If you want to limit the number of cores used, you can set the OMP_NUM_THREADS environment variable to the desired number of threads.

Parameters:
  • df – Timeseries matrix as described in _prepare_timeseries_matrix() defined within impute_timeseries_asset_factory().

  • years – list of years to input

  • periods – Number of consecutive values in each series to fold into a group. See fold_tensor().

  • blocks – Number of blocks into which to split the series for imputation. This has been found to reduce processing time for the tnn method.

  • method – Maps each year to the appropriate imputation method. “tubal” uses impute_latc_tubal() and “tnn” uses impute_latc_tnn().

Returns:

Copy of df with imputed values.

class pudl.analysis.timeseries_cleaning.SimulateFlagsSettings[source]#

Define settings used to simulate flagged values for scoring imputation.

num_months: int = 30[source]#

The number of months of data to simulate.

min_flag_rate: float = 0.1[source]#

Min ratio of bad points in a section of data to be used for reference.

max_flag_rate: float = 0.5[source]#

Max ratio of bad points in a section of data to be used for reference.

output_io_manager_key: str = 'io_manager'[source]#

Specify io-manager for final simulated asset.

In some cases we use the parquet IO-manager so we can build notebooks/visualizations on simulated data.

mape_threshold: float = 0.05[source]#

Maximum allowable mean absolute percent error computed on simulated values. Will be checked in an asset check.

class pudl.analysis.timeseries_cleaning.SimulationDataFrame[source]#

Bases: pandera.pandas.DataFrameModel

Collection of months of data which will be used to simulate flagged values.

Each row in this dataframe identifies a pairing of two entity IDs and two months that can be used to evaluate the performance of the imputation. The “reference” is a month in which a high proportion of reported values were flagged for imputation, and the “simulation” is a month in which there were no values flagged for imputation. The pattern of flagged (null) values in the reference month will be used to mask the reported values found in the simulation month so they can be imputed, and then the imputed values will be compared to the originally reported data to evaluate the imputation’s performance.

reference_id_col: pandera.typing.Series[Any][source]#
reference_month: pandera.typing.Series[pandera.pandas.dtypes.DateTime][source]#
simulation_id_col: pandera.typing.Series[Any][source]#
simulation_month: pandera.typing.Series[pandera.pandas.dtypes.DateTime][source]#
pudl.analysis.timeseries_cleaning._merge_imputed(aligned_df: pandera.typing.DataFrame[AlignedTimeseriesDataFrame], matrix: pandera.typing.DataFrame[TimeseriesMatrix], flags: pandera.typing.DataFrame[TimeseriesMatrix]) pandas.DataFrame[source]#

Helper function to melt imputed timeseries matrix and merge back on input asset.

pudl.analysis.timeseries_cleaning._add_simulated_flag_col(imputed_df: pandera.typing.DataFrame[AlignedTimeseriesDataFrame], simulation_df: pandera.typing.DataFrame[SimulationDataFrame]) pandera.typing.DataFrame[AlignedTimeseriesDataFrame][source]#

Return a modified imputed_df with a column indicating which rows should be flagged for simulation.

This will find all flagged values from a reference month and apply the flag pattern to a simulation month. The flag pattern is determined by calculating the hour of the month for each flagged (how many hours is this after the start of the month), and flagging the corresponding hour in the simulation month. Reference months are chosen by finding months with a relatively high rate of imputation, while simulation months have no values which were flagged for imputation.

Parameters:
  • imputed_df – Production DataFrame with imputed values, which is used to find sections with high rates of imputation.

  • simulation_df – DataFrame with reference and simulation months.

Returns:

DataFrame which contains all ID/datetime pairs that should be flagged for simulated imputation.

pudl.analysis.timeseries_cleaning.get_simulated_flag_mask(settings: SimulateFlagsSettings, imputed_df: pandera.typing.DataFrame[AlignedTimeseriesDataFrame], simulation_group: str) tuple[pandera.typing.DataFrame[TimeseriesMatrix], set[int]][source]#

Return a flag mask to flag values for simulated imputation.

Find months of data with high rate of flagged values, and use these sections as a reference to flag values in otherwise good sections of data. This allows us to impute data in a realistic scenario where we have good reported data, which we can compare to in order to compute quantitative metrics to validate the quality of our imputation.

Parameters:
  • settings – Settings object, which contains all configurable settings for simulation.

  • imputed_df – Production DataFrame with imputed values, which is used to find sections with high rates of imputation.

  • simulation_group – Allows testing imputation performance on different groups of data like BA/subregion demand, which can be combined into a single imputation.

Returns:

Tuple of timeseries_matrix, and flag_matrix modified with simulation data.

class pudl.analysis.timeseries_cleaning.ImputeTimeseriesSettings[source]#

Define settings used for timeseries imputation.

min_data_fraction: float = 0.7[source]#

Fraction of values in a year which must be non-null to do imputation on year.

min_data: int = 100[source]#

Minimum number of values which must be non-null to do imputation on year.

periods: int = 24[source]#

Number of consecutive values in each series to fold into a group.

See fold_tensor(). The default of 24 is meant for hourly data with a diurnal periodicity.

blocks: int = 1[source]#

Split timeseries matrix into equal sized blocks before running imputation.

method: Literal['tubal', 'tnn'] = 'tubal'[source]#

Imputation method to use.

method_overrides: dict[int, Literal['tubal', 'tnn']][source]#

Override stated imputation method for specific years.

simulate_flags_settings: SimulateFlagsSettings | None = None[source]#

Settings to simulate flagged values and score imputation.

Defaults to None which will not do any simulation/scoring.

pudl.analysis.timeseries_cleaning.impute_timeseries_asset_factory(input_asset_name: str, output_asset_name: str, years_from_context: collections.abc.Callable, id_col: str, value_col: str = 'demand_mwh', imputed_value_col: str = 'demand_imputed_mwh', reported_value_col: str = 'demand_reported_mwh', simulation_group_col: str | None = None, output_io_manager_key: str = 'parquet_io_manager', op_tags: dict[str, Any] | None = None, settings: ImputeTimeseriesSettings = ImputeTimeseriesSettings()) pandas.DataFrame[source]#

Produces assets to impute values for a given timeseries table/column.

This factory function produces a set of assets which perform timeseries imputation on one column in a specified table. This process is split into a series of assets to reduce peak memory usage by offloading intermediate products onto disk. The assets also correspond with the three steps that make up the the timeseries imputation process:

  1. Convert datetime UTC to local datetimes and pivot dataframe to timeseries matrix

  2. Flag anomalous and missing values in timeseries

  3. Perform imputation and melt back to expected output table structure

This factory also has the ability to produce a set of simulation assets. These assets mirror the production assets, but they will impute a selection of values which were not actually flagged for imputation. This means we can impute data where the reported data is actually deemed “good”, allowing us to compare the imputed values to the reported. We then compute Mean Absolute Percentage Error to score the imputation. We can produce these simulated assets during our nightly builds for ongoing monitoring of the imputation, or just as one off way to validate or compare imputation methods.

Parameters:
  • input_asset_name – Name of upstream asset to perform imputation on.

  • output_asset_name – Name of final output asset with imputed column.

  • years_from_context – Function to generate the list of years on which to perform imputation on.

  • id_col – Name of column identifying entities to group timeseries by.

  • value_col – Column imputation will be performed on.

  • imputed_value_col – Name of column in output asset with imputed values.

  • reported_value_col – Name of column in output asset with original reported values.

  • output_io_manager_key – IO-manager to use for final output asset.

  • simulation_group_col – In cases where we are combining multiple datasets into a single imputation run (like BA/subregion demand), this column is used to compute simulation results for each set independently. This should point to a categorical column which defines which group a row belongs to.

  • op_tags – Tags applied to every op produced by the factory. Use {"dagster/priority": N} to raise scheduling priority for assets on the critical execution path.

  • settings – Configurable options for imputation (see ImputeTimeseriesSettings).