Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
c9d3a19
Add optional imports following the polars optional import method (sim…
charles-turner-1 May 5, 2025
0e82529
Move optional import stuff out to a separate module to avoid circular…
charles-turner-1 May 6, 2025
f77bad1
WIP
charles-turner-1 May 6, 2025
df46b2d
Basic working example
charles-turner-1 May 7, 2025
48dc74d
Fix broken test - needed mock
charles-turner-1 May 7, 2025
0ba6afe
Merge branch 'main' into to-iris
charles-turner-1 May 7, 2025
da7658e
Fix another broken test (updated function signature in to_iris,
charles-turner-1 May 7, 2025
5961ec0
Facets now read from esmvalcore intake configuration file instead of …
charles-turner-1 May 9, 2025
43c6e34
WIP (has polars change broken require_all?)
charles-turner-1 May 12, 2025
410a388
Search history memory on `esm_datastores`. Probably won't work in sit…
charles-turner-1 May 12, 2025
93daea8
Fixed bug where pyarrow conversions were causing string accessor to f…
charles-turner-1 May 8, 2025
df051d1
Merge branch 'main' into to-iris
charles-turner-1 May 12, 2025
065b29f
Merge branch 'main' into to-iris
charles-turner-1 May 13, 2025
24c6803
Rebase doing weird things (!)
charles-turner-1 May 13, 2025
4618138
Track search history to build esmvalcore facets, rather than passing …
charles-turner-1 May 13, 2025
73f150e
Move `_merge_search_history` into esmvalcore
charles-turner-1 May 13, 2025
08be40d
Removed irrelevant change
charles-turner-1 May 13, 2025
3b961d6
rename `to_iris` => `to_esmvalcore`
charles-turner-1 May 14, 2025
f424ad6
Merge branch 'main' into to-iris
charles-turner-1 Aug 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dependencies:
- cftime
- codecov
- dask >=2024.12
- esmvalcore >=2.0.0
- fastprogress >=1.0.0
- flaky >=3.8.0
- fsspec >=2024.12
Expand Down
32 changes: 32 additions & 0 deletions intake_esm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,43 @@

# Import intake first to avoid circular imports during discovery.
import intake
import importlib


from intake_esm import tutorial
from intake_esm.core import esm_datastore
from intake_esm.derived import DerivedVariableRegistry, default_registry
from intake_esm.utils import set_options, show_versions
from intake_esm._imports import _to_opt_import_flag, _from_opt_import_flag
from intake_esm import _imports as _import_module

from intake_esm._version import __version__

import_flags = [_to_opt_import_flag(name) for name in _import_module._optional_imports]

__all__ = [
'esm_datastore',
'DerivedVariableRegistry',
'default_registry',
'set_options',
'show_versions',
'tutorial',
'__version__',
] + import_flags


def __getattr__(attr: str) -> object:
"""
Lazy load optional imports.
"""

if attr in (gl := globals()):
return gl[attr]

try:
return getattr(_import_module, attr)
except AttributeError:
raise AttributeError(
f"Module '{__name__}' has no attribute '{attr}'. "
f'Did you mean one of {", ".join(import_flags)}?'
)
41 changes: 41 additions & 0 deletions intake_esm/_imports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import importlib

_optional_imports: dict[str, bool | None] = {'esmvalcore': None}


def _to_opt_import_flag(name: str) -> str:
"""Dynamically create import flags for optional imports."""
return f'_{name.upper()}_AVAILABLE'


def _from_opt_import_flag(name: str) -> str:
"""Dynamically retrive the optional import name from its flag."""
if name.startswith('_') and name.endswith('_AVAILABLE'):
return name[1:-10].lower()
raise ValueError(
f"Invalid optional import flag '{name}'. Expected format: '_<import_name>_AVAILABLE'."
)


def __getattr__(attr: str) -> object:
"""
Lazy load optional imports.
"""

if attr in (gl := globals()):
return gl[attr]

import_flags = [_to_opt_import_flag(name) for name in _optional_imports]

if attr in import_flags:
import_name = _from_opt_import_flag(attr)
if _optional_imports.get(import_name, None) is None:
_optional_imports[import_name] = bool(importlib.util.find_spec(import_name))
return _optional_imports[import_name]
else:
return _optional_imports[import_name]

raise AttributeError(
f"Module '{__name__}' has no attribute '{attr}'. "
f'Did you mean one of {", ".join(import_flags)}?'
)
33 changes: 33 additions & 0 deletions intake_esm/cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class ESMCatalogModel(pydantic.BaseModel):
id: str = ''
catalog_dict: list[dict] | None = None
catalog_file: pydantic.StrictStr | None = None
fhandle: pydantic.StrictStr | None = None
description: pydantic.StrictStr | None = None
title: pydantic.StrictStr | None = None
last_updated: datetime.datetime | datetime.date | None = None
Expand Down Expand Up @@ -269,6 +270,7 @@ def load(
df=pl.DataFrame(cat.catalog_dict).to_pandas(),
)

cat.fhandle = json_file
cat._cast_agg_columns_with_iterables()
return cat

Expand Down Expand Up @@ -496,6 +498,37 @@ def validate_query(cls, model):
model.query = _query
return model

def _extend_search_history(
cls, search_hist: list[dict[str, typing.Any]]
) -> list[dict[str, typing.Any]]:
"""
Extend the search history with the current query. Note this doesn't yet
handle cases where we have set `require_all_on`.

Parameters
----------
search_hist : list[dict]
The current search history.
query : QueryModel
The current query to be added to the search history.

Returns
-------
list[dict[str, typing.Any]]
The updated search history.
"""

_query = cls.query

if not _query:
search_hist.append({})
return search_hist

for colname, search_terms in _query.items():
search_hist.append({colname: search_terms})

return search_hist


class FramesModel(pydantic.BaseModel):
"""A Pydantic model to represent our collection of dataframes - pandas, polars,
Expand Down
95 changes: 94 additions & 1 deletion intake_esm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
import warnings
from copy import deepcopy

if typing.TYPE_CHECKING:
import esmvalcore
import esmvalcore.dataset

import dask
import packaging.version
import xarray as xr
Expand All @@ -24,7 +28,8 @@
from fastprogress.fastprogress import progress_bar
from intake.catalog import Catalog

from .cat import ESMCatalogModel
from ._imports import _ESMVALCORE_AVAILABLE
from .cat import ESMCatalogModel, QueryModel
from .derived import DerivedVariableRegistry, default_registry
from .source import ESMDataSource
from .utils import MinimalExploder
Expand Down Expand Up @@ -397,6 +402,16 @@ def __dir__(self) -> list[str]:
def _ipython_key_completions_(self):
return self.__dir__()

@property
def search_history(self) -> list[dict[str, typing.Any]]:
"""Return the search history for the catalog."""

try:
return self._search_history
except AttributeError:
self._search_history: list[dict[str, typing.Any]] = []
return self._search_history

@pydantic.validate_call
def search(
self,
Expand Down Expand Up @@ -458,6 +473,14 @@ def search(
4 landCoverFrac
"""

_search_hist = (
query
if isinstance(query, QueryModel)
else QueryModel(
query=query, require_all_on=require_all_on, columns=self.df.columns.tolist()
)._extend_search_history(self.search_history)
)

# step 1: Search in the base/main catalog
esmcat_results = self.esmcat.search(require_all_on=require_all_on, query=query)

Expand Down Expand Up @@ -507,6 +530,8 @@ def search(
cat.derivedcat._registry.update(derived_cat_subset)
else:
cat.derivedcat = self.derivedcat

cat._search_history = _search_hist
return cat

@pydantic.validate_call
Expand Down Expand Up @@ -893,6 +918,74 @@ def to_dask(self, **kwargs) -> xr.Dataset:
_, ds = res.popitem()
return ds

def to_esmvalcore(
self,
cmorizer: typing.Any | None = None,
**kwargs,
) -> 'esmvalcore.dataset.Dataset':
"""
Convert result to an ESMValCore Dataset.

This is only possible if the search returned exactly one result.

Parameters
----------
facet_map: dict[FacetValue, str]
Mapping of ESMValCore Dataset facets to their corresponding esm_datastore
attributes. For example, the mapping for a dataset containing keys
'activity_id', 'source_id', 'member_id', 'experiment_id' would look like:
```python
facets = {
"activity": "activity_id",
"dataset": "source_id",
"ensemble": "member_id",
"exp": "experiment_id",
"grid": "grid_label",
},
```
cmorize: Any, optional
CMORizer to use in order to CMORize the datastore search results for
the ESMValCore Dataset. Presumably this will be a callable? If not set,
no CMORization will be done.
kwargs: dict
TBC.
"""
if not _ESMVALCORE_AVAILABLE:
raise ImportError(
'`to_esmvalcore()` requires the esmvalcore package to be installed. '
'To proceed please install esmvalcore using: '
' `python -m pip install esmvalcore` or `conda install -c conda-forge esmvalcore`.'
)

if len(self) != 1: # quick check to fail more quickly if there are many results
raise ValueError(
f'Expected exactly one dataset. Received {len(self)} datasets. Please refine your search.'
)

# Use esmvalcore to load the intake configuration & work out how we
# need to map our facets

from esmvalcore.config._intake import _read_facets, load_intake_config
from esmvalcore.data import merge_intake_search_history as merge_search_history
from esmvalcore.dataset import Dataset

facet_map, project = _read_facets(load_intake_config(), self.esmcat.fhandle)

search = merge_search_history(self.search_history)

facets = {k: search.get(v) for k, v in facet_map.items()}
facets = {k: v for k, v in facets.items() if v is not None}

facets.pop('version', None) # If there's a version, chuck it
facets['project'] = project

ds = Dataset(**facets)

ds.files = self.unique().path
ds.augment_facets()

return ds

def _create_derived_variables(self, datasets, skip_on_error):
if len(self.derivedcat) > 0:
datasets = self.derivedcat.update_datasets(
Expand Down
13 changes: 13 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import pytest

import intake_esm

here = os.path.abspath(os.path.dirname(__file__))


Expand All @@ -13,3 +15,14 @@ def sample_cmip6():
@pytest.fixture
def sample_bad_input():
return os.path.join(here, 'sample-catalogs/bad.json')


@pytest.fixture
def cleanup_init():
"""
This resets the _optional_imports dictionary in intake_esm to it's default
state before & after tests that use it so we can test lazy loading and whatnot
"""
intake_esm._imports._optional_imports = {'esmvalcore': None}
yield
intake_esm._imports._optional_imports = {'esmvalcore': None}
75 changes: 75 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,55 @@ def test_catalog_contains():
)
def test_catalog_search(path, query, expected_size):
cat = intake.open_esm_datastore(path)
assert cat.search_history == []
new_cat = cat.search(**query)
assert len(new_cat) == expected_size


@pytest.mark.parametrize(
'path, query, expected',
[
(cdf_cat_sample_cesmle, {'experiment': 'CTRL'}, [{'experiment': ['CTRL']}]),
(cdf_cat_sample_cesmle, {'experiment': ['CTRL', '20C']}, [{'experiment': ['CTRL', '20C']}]),
(cdf_cat_sample_cesmle, {}, [{}]),
(
cdf_cat_sample_cesmle,
{'variable': 'SHF', 'time_range': ['200601-210012']},
[{'variable': ['SHF']}, {'time_range': ['200601-210012']}],
),
],
)
def test_catalog_search_history(path, query, expected):
cat = intake.open_esm_datastore(path)
assert cat.search_history == []
new_cat = cat.search(**query)
assert new_cat.search_history == expected


@pytest.mark.parametrize(
'path, queries, expected',
[
(cdf_cat_sample_cesmle, [{'experiment': 'CTRL'}, {}], [{'experiment': ['CTRL']}, {}]),
(
cdf_cat_sample_cesmle,
[{'variable': 'SHF'}, {'time_range': ['200601-210012']}],
[{'variable': ['SHF']}, {'time_range': ['200601-210012']}],
),
(
cdf_cat_sample_cesmle,
[{'experiment': ['CTRL', '20C']}, {'variable': 'SHF'}],
[{'experiment': ['CTRL', '20C']}, {'variable': ['SHF']}],
),
],
)
def test_catalog_search_history_sequential(path, queries, expected):
cat = intake.open_esm_datastore(path)
assert cat.search_history == []
q1, q2 = queries
new_cat = cat.search(**q1).search(**q2)
assert new_cat.search_history == expected


@pytest.mark.parametrize(
'path, columns_with_iterables, query, expected_size',
[
Expand Down Expand Up @@ -704,3 +749,33 @@ def test__get_threaded(mock_get_env, threaded, ITK_ESM_THREADING, expected):
intake_esm.core._get_threaded(threaded)
else:
assert intake_esm.core._get_threaded(threaded) == expected


@mock.patch('intake_esm.core._ESMVALCORE_AVAILABLE', False)
def test_to_esmvalcore_unavailable():
cat = intake.open_esm_datastore(zarr_cat_pangeo_cmip6)
cat_sub = cat.search(
**dict(
variable_id=['pr'],
experiment_id='ssp370',
activity_id='AerChemMIP',
source_id='BCC-ESM1',
table_id='Amon',
grid_label='gn',
)
)
with pytest.raises(ImportError, match=r'`to_esmvalcore\(\)` requires the esmvalcore package'):
_ = cat_sub.to_esmvalcore(
search=dict(
variable_id=['pr'],
experiment_id='ssp370',
activity_id='AerChemMIP',
source_id='BCC-ESM1',
table_id='Amon',
grid_label='gn',
),
xarray_open_kwargs={
'consolidated': True,
'backend_kwargs': {'storage_options': {'token': 'anon'}},
},
)
Loading
Loading