Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
aa467b1
factor out common HRRR template config
aldenks Nov 14, 2025
0e17d11
test: add tests for NoaaHrrrForecast48HourTemplateConfig
aldenks Nov 14, 2025
c3d2961
feat: add test for _y_x_coordinates in NoaaHrrrTemplateConfig
aldenks Nov 14, 2025
76ac69b
test: add test for _latitude_longitude_coordinates method
aldenks Nov 14, 2025
95cb100
test: add comments and improve test readability for NOAA HRRR templat…
aldenks Nov 14, 2025
e46b5fa
test: use monkeypatch to mock dataset_id in template config fixture
aldenks Nov 14, 2025
d9d7191
fix: mock dataset_attributes in NOAA HRRR template config test
aldenks Nov 14, 2025
c1d1aa1
feat: modify template config test fixture to mock dataset_attributes …
aldenks Nov 14, 2025
351ef29
style: fix linter warnings in template_config_test.py
aldenks Nov 14, 2025
44743e5
docs: add comments explaining template config and test logic
aldenks Nov 14, 2025
1730639
feat: modify spatial reference attribute testing in template config test
aldenks Nov 14, 2025
100584b
style: format code with linter
aldenks Nov 14, 2025
1a650f6
refactor: move DatasetAttributes import to top-level
aldenks Nov 14, 2025
fd09694
refactor: Improve test coverage and spatial info validation for NOAA …
aldenks Nov 14, 2025
c9461a6
factor out base NoaaHrrrRegionJob
aldenks Nov 14, 2025
4564765
test: add tests for NOAA HRRR region job
aldenks Nov 14, 2025
98e24a8
refactor: Move base HRRR region job tests to new location
aldenks Nov 14, 2025
b14dcdf
refactor: remove redundant template_config initialization in test fun…
aldenks Nov 14, 2025
37ae17f
style: move import to top-level to fix linting error
aldenks Nov 14, 2025
3eab40a
use general NoaaHrrrTemplateConfig type annotation in general NoaaHrr…
aldenks Nov 14, 2025
7106447
undo unrelated changes
aldenks Nov 14, 2025
0003847
fix: update import for NoaaHrrrSourceFileCoord in test files
aldenks Nov 14, 2025
19bb5a4
style: reorder imports to follow PEP 8 guidelines
aldenks Nov 14, 2025
978d4c5
fix: HRRR region job tests array dimension order
aldenks Nov 14, 2025
7773884
update test paths in comment
aldenks Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@
ValidationCronJob,
)
from reformatters.noaa.hrrr.hrrr_config_models import NoaaHrrrDataVar
from reformatters.noaa.hrrr.region_job import NoaaHrrrSourceFileCoord

from .region_job import (
NoaaHrrrForecast48HourRegionJob,
NoaaHrrrSourceFileCoord,
)
from .region_job import NoaaHrrrForecast48HourRegionJob
from .template_config import NoaaHrrrForecast48HourTemplateConfig
from .validators import (
check_data_is_current,
Expand Down
187 changes: 7 additions & 180 deletions src/reformatters/noaa/hrrr/forecast_48_hour/region_job.py
Original file line number Diff line number Diff line change
@@ -1,129 +1,41 @@
from collections.abc import Callable, Mapping, Sequence
from pathlib import Path
from collections.abc import Mapping, Sequence

import numpy as np
import pandas as pd
import rasterio # type: ignore[import-untyped]
import xarray as xr
import zarr

from reformatters.common.binary_rounding import round_float32_inplace
from reformatters.common.deaccumulation import deaccumulate_to_rates_inplace
from reformatters.common.download import http_download_to_disk
from reformatters.common.iterating import digest, group_by, item
from reformatters.common.iterating import item
from reformatters.common.logging import get_logger
from reformatters.common.region_job import (
CoordinateValueOrRange,
RegionJob,
SourceFileCoord,
)
from reformatters.common.time_utils import whole_hours
from reformatters.common.types import (
AppendDim,
ArrayFloat32,
DatetimeLike,
Dim,
Timedelta,
Timestamp,
)
from reformatters.noaa.hrrr.hrrr_config_models import (
NoaaHrrrDataVar,
NoaaHrrrDomain,
NoaaHrrrFileType,
)
from reformatters.noaa.noaa_grib_index import grib_message_byte_ranges_from_index
from reformatters.noaa.hrrr.region_job import NoaaHrrrRegionJob, NoaaHrrrSourceFileCoord
from reformatters.noaa.noaa_utils import has_hour_0_values

log = get_logger(__name__)


class NoaaHrrrSourceFileCoord(SourceFileCoord):
"""Source file coordinate for HRRR forecast data."""

init_time: Timestamp
lead_time: Timedelta
domain: NoaaHrrrDomain
file_type: NoaaHrrrFileType
data_vars: Sequence[NoaaHrrrDataVar]

def get_url(self) -> str:
"""Return the URL for this HRRR file."""
lead_time_hours = whole_hours(self.lead_time)
init_date_str = self.init_time.strftime("%Y%m%d")
init_hour_str = self.init_time.strftime("%H")

return f"https://noaa-hrrr-bdp-pds.s3.amazonaws.com/hrrr.{init_date_str}/{self.domain}/hrrr.t{init_hour_str}z.wrf{self.file_type}f{int(lead_time_hours):02d}.grib2"

def get_idx_url(self) -> str:
"""Return the URL for the GRIB index file."""
return f"{self.get_url()}.idx"

class NoaaHrrrForecast48HourSourceFileCoord(NoaaHrrrSourceFileCoord):
def out_loc(self) -> Mapping[Dim, CoordinateValueOrRange]:
"""Return the output location for this file's data in the dataset."""
# Map to the standard dimension names used in the template
return {
"init_time": self.init_time,
"lead_time": self.lead_time,
}


class NoaaHrrrForecast48HourRegionJob(
RegionJob[NoaaHrrrDataVar, NoaaHrrrSourceFileCoord]
):
class NoaaHrrrForecast48HourRegionJob(NoaaHrrrRegionJob):
"""Region job for HRRR 48-hour forecast data processing."""

max_vars_per_download_group = 5

@classmethod
def source_groups(
cls,
data_vars: Sequence[NoaaHrrrDataVar],
) -> Sequence[Sequence[NoaaHrrrDataVar]]:
return group_by(
data_vars, lambda v: (v.internal_attrs.hrrr_file_type, has_hour_0_values(v))
)

@classmethod
def operational_update_jobs(
cls,
primary_store: zarr.abc.store.Store,
tmp_store: Path,
get_template_fn: Callable[[DatetimeLike], xr.Dataset],
append_dim: AppendDim,
all_data_vars: Sequence[NoaaHrrrDataVar],
reformat_job_name: str,
) -> tuple[
Sequence[RegionJob[NoaaHrrrDataVar, NoaaHrrrSourceFileCoord]],
xr.Dataset,
]:
"""Generate operational update jobs for HRRR forecast data."""
# For operational updates, we want to process recent forecast data
# HRRR provides forecasts every hour, but 48-hour forecasts are only available
# every 6 hours (00, 06, 12, 18 UTC)

existing_ds = xr.open_zarr(primary_store, chunks=None, decode_timedelta=True)
append_dim_start = cls._update_append_dim_start(existing_ds)

append_dim_end = cls._update_append_dim_end()
template_ds = get_template_fn(append_dim_end)

jobs = cls.get_jobs(
kind="operational-update",
tmp_store=tmp_store,
template_ds=template_ds,
append_dim=append_dim,
all_data_vars=all_data_vars,
reformat_job_name=reformat_job_name,
filter_start=append_dim_start,
filter_end=append_dim_end,
)
return jobs, template_ds

def generate_source_file_coords(
self,
processing_region_ds: xr.Dataset,
data_var_group: Sequence[NoaaHrrrDataVar],
) -> Sequence[NoaaHrrrSourceFileCoord]:
) -> Sequence[NoaaHrrrForecast48HourSourceFileCoord]:
"""Generate source file coordinates for the processing region."""
init_times = pd.to_datetime(processing_region_ds["init_time"].values)
lead_times = pd.to_timedelta(processing_region_ds["lead_time"].values)
Expand All @@ -134,7 +46,7 @@ def generate_source_file_coords(
file_type = item({var.internal_attrs.hrrr_file_type for var in data_var_group})

return [
NoaaHrrrSourceFileCoord(
NoaaHrrrForecast48HourSourceFileCoord(
init_time=init_time,
lead_time=lead_time,
domain="conus",
Expand All @@ -144,88 +56,3 @@ def generate_source_file_coords(
for init_time in init_times
for lead_time in lead_times
]

def download_file(self, coord: NoaaHrrrSourceFileCoord) -> Path:
"""Download a subset of variables from a HRRR file and return the local path."""
idx_url = coord.get_idx_url()
idx_local_path = http_download_to_disk(idx_url, self.dataset_id)

byte_range_starts, byte_range_ends = grib_message_byte_ranges_from_index(
idx_local_path, coord.data_vars, coord.init_time, coord.lead_time
)
vars_suffix = digest(
f"{s}-{e}" for s, e in zip(byte_range_starts, byte_range_ends, strict=True)
)

return http_download_to_disk(
coord.get_url(),
self.dataset_id,
byte_ranges=(byte_range_starts, byte_range_ends),
local_path_suffix=f"-{vars_suffix}",
)

def read_data(
self,
coord: NoaaHrrrSourceFileCoord,
data_var: NoaaHrrrDataVar,
) -> ArrayFloat32:
"""Read data from a HRRR file for a specific variable."""
assert coord.downloaded_path is not None # for type check, system guarantees it
grib_description = data_var.internal_attrs.grib_description

grib_element = data_var.internal_attrs.grib_element
# grib element has the accumulation window as a suffix in the grib file attributes, but not in the .idx file
if (reset_freq := data_var.internal_attrs.window_reset_frequency) is not None:
grib_element = f"{grib_element}{whole_hours(reset_freq):02d}"

with rasterio.open(coord.downloaded_path) as reader:
matching_bands = [
rasterio_band_i
for band_i in range(reader.count)
if reader.descriptions[band_i] == grib_description
and reader.tags(rasterio_band_i := band_i + 1)["GRIB_ELEMENT"]
== grib_element
]

assert len(matching_bands) == 1, (
f"Expected exactly 1 matching band, found {len(matching_bands)}: {matching_bands}. "
f"{grib_element=}, {grib_description=}, {coord.downloaded_path=}"
)
rasterio_band_index = item(matching_bands)

result: ArrayFloat32 = reader.read(
rasterio_band_index, out_dtype=np.float32
)
return result

def apply_data_transformations(
self, data_array: xr.DataArray, data_var: NoaaHrrrDataVar
) -> None:
"""Apply in-place data transformations to the output data array for a given data variable."""
if data_var.internal_attrs.deaccumulate_to_rate:
assert data_var.internal_attrs.window_reset_frequency is not None
log.info(f"Converting {data_var.name} from accumulations to rates")
try:
deaccumulate_to_rates_inplace(
data_array,
dim="lead_time",
reset_frequency=data_var.internal_attrs.window_reset_frequency,
)
except ValueError:
# Log exception so we are notified if deaccumulation errors are larger than expected.
log.exception(f"Error deaccumulating {data_var.name}")

keep_mantissa_bits = data_var.internal_attrs.keep_mantissa_bits
if isinstance(keep_mantissa_bits, int):
round_float32_inplace(data_array.values, keep_mantissa_bits)

@classmethod
def _update_append_dim_end(cls) -> pd.Timestamp:
"""Get the end time for operational updates."""
return pd.Timestamp.now()

@classmethod
def _update_append_dim_start(cls, existing_ds: xr.Dataset) -> pd.Timestamp:
"""Get the start time for operational updates based on existing data."""
ds_max_time = existing_ds["init_time"].max().item()
return pd.Timestamp(ds_max_time)
Loading
Loading