pudl.io_managers ================ .. py:module:: pudl.io_managers .. autoapi-nested-parse:: Dagster IO Managers. Attributes ---------- .. autoapisummary:: pudl.io_managers.logger pudl.io_managers.MINIMUM_SQLITE_VERSION Classes ------- .. autoapisummary:: pudl.io_managers.PudlMixedFormatIOManager pudl.io_managers.SQLiteIOManager pudl.io_managers.PudlParquetIOManager pudl.io_managers.PudlSQLiteIOManager pudl.io_managers.FercSQLiteIOManager pudl.io_managers.FercDBFSQLiteIOManager pudl.io_managers.FercXBRLSQLiteIOManager pudl.io_managers.EpaCemsIOManager Functions --------- .. autoapisummary:: pudl.io_managers.get_table_name_from_context pudl.io_managers.pudl_mixed_format_io_manager pudl.io_managers.parquet_io_manager pudl.io_managers.ferc1_dbf_sqlite_io_manager pudl.io_managers.ferc1_xbrl_sqlite_io_manager pudl.io_managers.ferc714_xbrl_sqlite_io_manager pudl.io_managers.epacems_io_manager Module Contents --------------- .. py:data:: logger .. py:data:: MINIMUM_SQLITE_VERSION :value: '3.32.0' .. py:function:: get_table_name_from_context(context: dagster.OutputContext) -> str Retrieves the table name from the context object. .. py:class:: PudlMixedFormatIOManager(write_to_parquet: bool = False, read_from_parquet: bool = False) Bases: :py:obj:`dagster.IOManager` Format switching IOManager that supports sqlite and parquet. This IOManager provides for the use of parquet files along with the standard SQLite database produced by PUDL. .. py:attribute:: write_to_parquet :type: bool If true, data will be written to parquet files. .. py:attribute:: read_from_parquet :type: bool If true, data will be read from parquet files instead of sqlite. .. py:attribute:: _sqlite_io_manager .. py:attribute:: _parquet_io_manager .. py:method:: handle_output(context: dagster.OutputContext, obj: pandas.DataFrame | str) -> pandas.DataFrame Passes the output to the appropriate IO manager instance. .. py:method:: load_input(context: dagster.InputContext) -> pandas.DataFrame Reads input from the appropriate IO manager instance. .. py:class:: SQLiteIOManager(base_dir: str, db_name: str, md: sqlalchemy.MetaData | None = None, timeout: float = 1000.0) Bases: :py:obj:`dagster.IOManager` IO Manager that writes and retrieves dataframes from a SQLite database. .. py:attribute:: base_dir .. py:attribute:: db_name .. py:attribute:: md :value: None .. py:attribute:: engine :value: None .. py:method:: _setup_database(timeout: float = 1000.0) -> sqlalchemy.Engine Create database and metadata if they don't exist. :param timeout: How many seconds the connection should wait before raising an exception, if the database is locked by another connection. If another connection opens a transaction to modify the database, it will be locked until that transaction is committed. :returns: SQL Alchemy engine that connects to a database in the base_dir. :rtype: engine .. py:method:: _get_sqlalchemy_table(table_name: str) -> sqlalchemy.Table Get SQL Alchemy Table object from metadata given a table_name. :param table_name: The name of the table to look up. :returns: Corresponding SQL Alchemy Table in SQLiteIOManager metadata. :rtype: table :raises ValueError: if table_name does not exist in the SQLiteIOManager metadata. .. py:method:: _handle_pandas_output(context: dagster.OutputContext, df: pandas.DataFrame) Write dataframe to the database. SQLite does not support concurrent writes to the database. Instead, SQLite queues write transactions and executes them one at a time. This allows the assets to be processed in parallel. See the `SQLAlchemy docs `__ to learn more about SQLite concurrency. :param context: dagster keyword that provides access to output information like asset name. :param df: dataframe to write to the database. .. py:method:: _handle_str_output(context: dagster.OutputContext, query: str) Execute a sql query on the database. This is used for creating output views in the database. :param context: dagster keyword that provides access output information like asset name. :param query: sql query to execute in the database. .. py:method:: handle_output(context: dagster.OutputContext, obj: pandas.DataFrame | str) Handle an op or asset output. If the output is a dataframe, write it to the database. If it is a string execute it as a SQL query. :param context: dagster keyword that provides access output information like asset name. :param obj: a sql query or dataframe to add to the database. :raises Exception: if an asset or op returns an unsupported datatype. .. py:method:: load_input(context: dagster.InputContext) -> pandas.DataFrame Load a dataframe from a sqlite database. :param context: dagster keyword that provides access output information like asset name. .. py:class:: PudlParquetIOManager Bases: :py:obj:`dagster.IOManager` IOManager that writes pudl tables to pyarrow parquet files. .. py:method:: handle_output(context: dagster.OutputContext, df: Any) -> None Writes pudl dataframe to parquet file. .. py:method:: load_input(context: dagster.InputContext) -> pandas.DataFrame Loads pudl table from parquet file. .. py:class:: PudlSQLiteIOManager(base_dir: str, db_name: str, package: pudl.metadata.classes.Package | None = None, timeout: float = 1000.0) Bases: :py:obj:`SQLiteIOManager` IO Manager that writes and retrieves dataframes from a SQLite database. This class extends the SQLiteIOManager class to manage database metadata and dtypes using the :class:`pudl.metadata.classes.Package` class. .. py:attribute:: package :value: None .. py:method:: _handle_str_output(context: dagster.OutputContext, query: str) Execute a sql query on the database. This is used for creating output views in the database. :param context: dagster keyword that provides access output information like asset name. :param query: sql query to execute in the database. .. py:method:: _handle_pandas_output(context: dagster.OutputContext, df: pandas.DataFrame) Enforce PUDL DB schema and write dataframe to SQLite. .. py:method:: load_input(context: dagster.InputContext) -> pandas.DataFrame Load a dataframe from a sqlite database. :param context: dagster keyword that provides access output information like asset name. .. py:function:: pudl_mixed_format_io_manager(init_context: dagster.InitResourceContext) -> dagster.IOManager Create a SQLiteManager dagster resource for the pudl database. .. py:function:: parquet_io_manager(init_context: dagster.InitResourceContext) -> dagster.IOManager Create a Parquet only IO manager. .. py:class:: FercSQLiteIOManager(base_dir: str = None, db_name: str = None, md: sqlalchemy.MetaData = None, timeout: float = 1000.0) Bases: :py:obj:`SQLiteIOManager` IO Manager for reading tables from FERC databases. This class should be subclassed and the load_input and handle_output methods should be implemented. This IOManager expects the database to already exist. .. py:method:: _setup_database(timeout: float = 1000.0) -> sqlalchemy.Engine Create database engine and read the metadata. :param timeout: How many seconds the connection should wait before raising an exception, if the database is locked by another connection. If another connection opens a transaction to modify the database, it will be locked until that transaction is committed. :returns: SQL Alchemy engine that connects to a database in the base_dir. :rtype: engine .. py:method:: handle_output(context: dagster.OutputContext, obj) :abstractmethod: Handle an op or asset output. .. py:method:: load_input(context: dagster.InputContext) -> pandas.DataFrame :abstractmethod: Load a dataframe from a sqlite database. :param context: dagster keyword that provides access output information like asset name. .. py:class:: FercDBFSQLiteIOManager(base_dir: str = None, db_name: str = None, md: sqlalchemy.MetaData = None, timeout: float = 1000.0) Bases: :py:obj:`FercSQLiteIOManager` IO Manager for only reading tables from the FERC 1 database. This IO Manager is for reading data only. It does not handle outputs because the raw FERC tables are not known prior to running the ETL and are not recorded in our metadata. .. py:method:: handle_output(context: dagster.OutputContext, obj: pandas.DataFrame | str) :abstractmethod: Handle an op or asset output. .. py:method:: load_input(context: dagster.InputContext) -> pandas.DataFrame Load a dataframe from a sqlite database. :param context: dagster keyword that provides access output information like asset name. .. py:function:: ferc1_dbf_sqlite_io_manager(init_context) -> FercDBFSQLiteIOManager Create a SQLiteManager dagster resource for the ferc1 dbf database. .. py:class:: FercXBRLSQLiteIOManager(base_dir: str = None, db_name: str = None, md: sqlalchemy.MetaData = None, timeout: float = 1000.0) Bases: :py:obj:`FercSQLiteIOManager` IO Manager for only reading tables from the XBRL database. This IO Manager is for reading data only. It does not handle outputs because the raw FERC tables are not known prior to running the ETL and are not recorded in our metadata. .. py:method:: refine_report_year(df: pandas.DataFrame, xbrl_years: list[int]) -> pandas.DataFrame :staticmethod: Set a fact's report year by its actual dates. Sometimes a fact belongs to a context which has no ReportYear associated with it; other times there are multiple ReportYears associated with a single filing. In these cases the report year of a specific fact may be associated with the other years in the filing. In many cases we can infer the actual report year from the fact's associated time period - either duration or instant. .. py:method:: handle_output(context: dagster.OutputContext, obj: pandas.DataFrame | str) :abstractmethod: Handle an op or asset output. .. py:method:: load_input(context: dagster.InputContext) -> pandas.DataFrame Load a dataframe from a sqlite database. :param context: dagster keyword that provides access output information like asset name. .. py:function:: ferc1_xbrl_sqlite_io_manager(init_context) -> FercXBRLSQLiteIOManager Create a SQLiteManager dagster resource for the ferc1 xbrl database. .. py:function:: ferc714_xbrl_sqlite_io_manager(init_context) -> FercXBRLSQLiteIOManager Create a SQLiteManager dagster resource for the ferc714 xbrl database. .. py:class:: EpaCemsIOManager(base_path: upath.UPath, schema: pyarrow.Schema) Bases: :py:obj:`dagster.UPathIOManager` An IO Manager that dumps outputs to a parquet file. .. py:attribute:: extension :type: str :value: '.parquet' .. py:attribute:: schema .. py:method:: dump_to_path(context: dagster.OutputContext, obj: dask.dataframe.DataFrame, path: upath.UPath) :abstractmethod: Write dataframe to parquet file. .. py:method:: load_from_path(context: dagster.InputContext, path: upath.UPath) -> dask.dataframe.DataFrame Load a directory of parquet files to a dask dataframe. .. py:function:: epacems_io_manager(init_context: dagster.InitResourceContext) -> EpaCemsIOManager IO Manager that writes EPA CEMS partitions to individual parquet files.