pudl.workspace.datastore#

Datastore manages file retrieval for PUDL datasets.

Attributes#

Exceptions#

ChecksumMismatchError

Resource checksum (md5) does not match.

Classes#

DatapackageDescriptor

A simple wrapper providing access to datapackage.json contents.

ZenodoDoiSettings

Digital Object Identifiers pointing to currently used Zenodo archives.

ZenodoFetcher

API for fetching datapackage descriptors and resource contents from zenodo.

Datastore

Handle connections and downloading of Zenodo Source archives.

Functions#

get_zenodo_dois_path(→ importlib.resources.abc.Traversable)

Return the canonical packaged Zenodo DOI settings path.

validate_cache(→ None)

Validate elements in the datastore cache.

fetch_resources(→ None)

Retrieve all matching resources and store them in the cache.

Module Contents#

pudl.workspace.datastore.logger[source]#
pudl.workspace.datastore.ZenodoDoi[source]#
pudl.workspace.datastore.get_zenodo_dois_path() importlib.resources.abc.Traversable[source]#

Return the canonical packaged Zenodo DOI settings path.

exception pudl.workspace.datastore.ChecksumMismatchError[source]#

Bases: ValueError

Resource checksum (md5) does not match.

class pudl.workspace.datastore.DatapackageDescriptor(datapackage_json: dict, dataset: str, doi: ZenodoDoi)[source]#

A simple wrapper providing access to datapackage.json contents.

datapackage_json[source]#
dataset[source]#
doi[source]#
_get_resource_metadata(name: str) dict[source]#
get_resource_path(name: str) str[source]#

Returns zenodo url that holds contents of given named resource.

get_download_size() int[source]#

Returns the total download size of all the resources in MB.

validate_checksum(name: str, content: str) bool[source]#

Returns True if content matches checksum for given named resource.

_matches(res: dict, **filters: Any)[source]#
_match_from_partition(parts: dict[str, str], k: str, v: str | list[str, str])[source]#
get_resources(name: str = None, **filters: Any) collections.abc.Iterator[pudl.workspace.resource_cache.PudlResourceKey][source]#

Returns series of PudlResourceKey identifiers for matching resources.

Parameters:
  • name – if specified, find resource(s) with this name.

  • filters (dict) – if specified, find resource(s) matching these key=value constraints. The constraints are matched against the ‘parts’ field of the resource entry in the datapackage.json.

get_partitions(name: str = None) dict[str, set[str]][source]#

Return mapping of known partition keys to their allowed known values.

get_partition_filters(**filters: Any) collections.abc.Iterator[dict[str, str]][source]#

Returns list of all known partition mappings.

This can be used to iterate over all resources as the mappings can be directly used as filters and should map to unique resource.

Parameters:

filters – additional constraints for selecting relevant partitions.

_validate_datapackage(datapackage_json: dict)[source]#

Checks the correctness of datapackage.json metadata.

Throws ValueError if invalid.

get_json_string() str[source]#

Exports the underlying json as normalized (sorted, indented) json string.

class pudl.workspace.datastore.ZenodoDoiSettings(**data: Any)[source]#

Bases: pydantic_settings.BaseSettings

Digital Object Identifiers pointing to currently used Zenodo archives.

censusdp1tract: ZenodoDoi[source]#
censuspep: ZenodoDoi[source]#
eia176: ZenodoDoi[source]#
eia191: ZenodoDoi[source]#
eia757a: ZenodoDoi[source]#
eia860: ZenodoDoi[source]#
eia860m: ZenodoDoi[source]#
eia861: ZenodoDoi[source]#
eia923: ZenodoDoi[source]#
eia930: ZenodoDoi[source]#
eiaaeo: ZenodoDoi[source]#
eiaapi: ZenodoDoi[source]#
epacamd_eia: ZenodoDoi[source]#
epacems: ZenodoDoi[source]#
ferc1: ZenodoDoi[source]#
ferc2: ZenodoDoi[source]#
ferc6: ZenodoDoi[source]#
ferc60: ZenodoDoi[source]#
ferc714: ZenodoDoi[source]#
ferceqr: ZenodoDoi[source]#
ferccid: ZenodoDoi[source]#
gridpathratoolkit: ZenodoDoi[source]#
nrelatb: ZenodoDoi[source]#
phmsagas: ZenodoDoi[source]#
rus7: ZenodoDoi[source]#
rus12: ZenodoDoi[source]#
sec10k: ZenodoDoi[source]#
vcerare: ZenodoDoi[source]#
model_config[source]#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

get_doi(dataset: str) ZenodoDoi[source]#

Look up configured DOI by dataset.

Throws a KeyError if dataset not configured.

classmethod from_yaml(path: str | pathlib.Path) ZenodoDoiSettings[source]#

Create a ZenodoDoiSettings instance from a YAML file path.

Parameters:

path – Path to a YAML file.

Returns:

A ZenodoDoiSettings object with DOIs loaded from the YAML file.

class pudl.workspace.datastore.ZenodoFetcher(zenodo_dois: ZenodoDoiSettings | None = None, timeout: float = 100.0)[source]#

API for fetching datapackage descriptors and resource contents from zenodo.

_descriptor_cache: dict[str, DatapackageDescriptor][source]#
zenodo_dois: ZenodoDoiSettings[source]#
timeout: float[source]#
http[source]#
get_doi(dataset: str) ZenodoDoi[source]#

Returns DOI for given dataset.

get_known_datasets() list[str][source]#

Returns list of supported datasets.

_get_url(doi: ZenodoDoi) pydantic.HttpUrl[source]#

Construct a Zenodo depsition URL based on its Zenodo DOI.

_fetch_from_url(url: pydantic.HttpUrl) requests.Response[source]#
get_descriptor(dataset: str) DatapackageDescriptor[source]#

Returns class:DatapackageDescriptor for given dataset.

get_resource(res: pudl.workspace.resource_cache.PudlResourceKey) bytes[source]#

Given resource key, retrieve contents of the file from zenodo.

class pudl.workspace.datastore.Datastore(local_cache_path: str | pathlib.Path | upath.UPath | None = None, cloud_cache_path: str | upath.UPath | None = 's3://pudl.catalyst.coop/zenodo', timeout: float = 15.0, zenodo_dois: ZenodoDoiSettings | None = None)[source]#

Handle connections and downloading of Zenodo Source archives.

_cache[source]#
_datapackage_descriptors: dict[str, DatapackageDescriptor][source]#
temporary_extraction_dir[source]#
_zenodo_fetcher[source]#
property zenodo_dois: ZenodoDoiSettings[source]#

Expose the DOI settings used by this datastore instance.

get_doi(dataset: str) ZenodoDoi[source]#

Return the configured DOI for a dataset.

get_known_datasets() list[str][source]#

Returns list of supported datasets.

get_datapackage_descriptor(dataset: str) DatapackageDescriptor[source]#

Fetch datapackage descriptor for dataset either from cache or Zenodo.

get_resources(dataset: str, cached_only: bool = False, skip_optimally_cached: bool = False, **filters: Any) collections.abc.Iterator[tuple[pudl.workspace.resource_cache.PudlResourceKey, bytes]][source]#

Return content of the matching resources.

Parameters:
  • dataset – name of the dataset to query.

  • cached_only – if True, only retrieve resources that are present in the cache.

  • skip_optimally_cached – if True, only retrieve resources that are not optimally cached. This triggers attempt to optimally cache these resources.

  • filters (key=val) – only return resources that match the key-value mapping in their

  • metadata["parts"].

Yields:

(PudlResourceKey, io.BytesIO) holding content for each matching resource

remove_from_cache(res: pudl.workspace.resource_cache.PudlResourceKey) None[source]#

Remove given resource from the associated cache.

get_unique_resource(dataset: str, **filters: Any) bytes[source]#

Returns content of a resource assuming there is exactly one that matches.

get_zipfile_resource(dataset: str, **filters: Any) zipfile.ZipFile[source]#

Retrieves unique resource and opens it as a ZipFile.

get_zipfile_resources(dataset: str, **filters: Any) collections.abc.Iterator[tuple[pudl.workspace.resource_cache.PudlResourceKey, zipfile.ZipFile]][source]#

Iterates over resources that match filters and opens each as ZipFile.

get_zipfile_file_names(zip_file: zipfile.ZipFile)[source]#

Given a zipfile, return a list of the file names in it.

pudl.workspace.datastore.validate_cache(dstore: Datastore, datasets: list[str], partition: dict[str, int | str]) None[source]#

Validate elements in the datastore cache.

Delete invalid entries from cache.

pudl.workspace.datastore.fetch_resources(dstore: Datastore, datasets: list[str], partition: dict[str, int | str], cloud_cache_path: str, bypass_local_cache: bool) None[source]#

Retrieve all matching resources and store them in the cache.