-
Notifications
You must be signed in to change notification settings - Fork 2
[OBSOLETE but still functional for testing] Test optimal kerchunk #186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
valeriupredoi
wants to merge
67
commits into
main
Choose a base branch
from
test_optimal_kerchunk
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
67 commits
Select commit
Hold shift + click to select a range
c9812c7
add big file test
valeriupredoi 4afb750
run big file test
valeriupredoi 2b0c7c4
use timeit see memory
valeriupredoi 935bcaf
one single process Vasili
valeriupredoi 40f6e6e
back to all Pythons
valeriupredoi f8f930f
toss some timings
valeriupredoi b1a1aeb
save file and toss some timings
valeriupredoi 6030040
fail a test and add v0 test
valeriupredoi 6423a14
GA workflow new test names
valeriupredoi c2d426c
turn off storage caching
valeriupredoi 67ce586
kluge to get a Group
valeriupredoi 748c266
kluge to get a Group
valeriupredoi 16e5656
fix module
valeriupredoi fb5c5d2
add code change suggested for Kerchunk
valeriupredoi 256cb65
allow for cases
valeriupredoi c3775cb
allow for cases
valeriupredoi e3184a7
correct file url for kerchunk and pass storage options
valeriupredoi 6dd3a32
add function that readds the correct compressor to the reductionist
valeriupredoi 0677ca4
just print chunk selections to look at them
valeriupredoi f3dc818
finally made it work all through
valeriupredoi d20cdc2
streamline
valeriupredoi 0425e3c
cleanup prints
valeriupredoi f52693c
handle exception
valeriupredoi e9a8409
revert to JSON tempfile before I die of not realizing WTF was going o…
valeriupredoi ecc203d
added comments
valeriupredoi 8f92ae7
unfail test
valeriupredoi dbae564
add info about PytestUnraisableError
valeriupredoi ddd38fa
Merge branch 'main' into test_optimal_kerchunk
valeriupredoi 40f110b
cleanup GA flow
valeriupredoi 91d3f57
pin kerchunk
valeriupredoi 8cbf1f0
pin kerchunk
valeriupredoi fcefd23
use correct attributes compression and shuffle
valeriupredoi a4aadce
Revert "use correct attributes compression and shuffle"
valeriupredoi 8f9e9ab
fix case filters is None
valeriupredoi ea20e16
use two procs for pytest
valeriupredoi f6b1189
Merge branch 'main' into test_optimal_kerchunk
valeriupredoi 151284d
replace s3 test file because I am a pillock and I overwrote the other
valeriupredoi 8c82fb8
fix SegFault
valeriupredoi aa30b0b
add extension to truncate to the other kerchunking block and lots of …
valeriupredoi 0a6bd1d
add valid storage options for file opening in Kerchunk
valeriupredoi 2f75873
reinstance variable in test
valeriupredoi 6a4206d
del pointer
valeriupredoi 23cc9d5
separate calls to Active
valeriupredoi 86a22d8
reduce test
valeriupredoi 3d50db8
reduce test
valeriupredoi 2582d96
don't run exploratory tests
valeriupredoi 662a69f
cleanup test
valeriupredoi 3ee8732
reinstate test; we need to figure out the segfaulting not brushit und…
valeriupredoi 48df2cd
cleanup and restore test
valeriupredoi 30d435f
reinstate full workflow
valeriupredoi 110b96a
try create new variable
valeriupredoi 8b9c890
same var name again
valeriupredoi 0079a8b
try returning after each conditional
valeriupredoi 96c9ba4
comment out Dataset/group selection when preset S3 options are used
valeriupredoi 03c11b3
add note
valeriupredoi d064319
comment out heavy printing to stdout
valeriupredoi 730f6ec
turn off some printing to stdout
valeriupredoi d82b916
Merge branch 'main' into test_optimal_kerchunk
valeriupredoi d88cd99
Merge branch 'main' into test_optimal_kerchunk
valeriupredoi 3292612
Merge branch 'main' into test_optimal_kerchunk
valeriupredoi a46c59a
Merge branch 'main' into test_optimal_kerchunk
valeriupredoi 66a566e
Merge branch 'main' into test_optimal_kerchunk
valeriupredoi c21d02d
Merge branch 'main' into test_optimal_kerchunk
valeriupredoi 319220a
Merge branch 'main' into test_optimal_kerchunk
valeriupredoi a1f54c4
Merge branch 'main' into test_optimal_kerchunk
valeriupredoi 9429fae
Merge branch 'main' into test_optimal_kerchunk
valeriupredoi 726ebe0
Merge branch 'main' into test_optimal_kerchunk
valeriupredoi File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,64 @@ | |
| from activestorage.config import * | ||
| from kerchunk.hdf import SingleHdf5ToZarr | ||
|
|
||
| import time | ||
| import h5py | ||
|
|
||
|
|
||
| def _correct_compressor_and_filename(content, varname, bryan_bucket=False): | ||
| """ | ||
| Correct the compressor type as it comes out of Kerchunk. | ||
| Also correct file name as Kerchnk now prefixes it with "s3://" | ||
| and for special buckets like Bryan's bnl the correct file is bnl/file.nc | ||
| not s3://bnl/file.nc | ||
| """ | ||
| new_content = content.copy() | ||
|
|
||
| # prelimniary assembly | ||
| try: | ||
| new_zarray = ujson.loads(new_content['refs'][f"{varname}/.zarray"]) | ||
| group = False | ||
| except KeyError: | ||
| new_zarray = ujson.loads(new_content['refs'][f"{varname} /{varname}/.zarray"]) | ||
| group = True | ||
|
|
||
| # re-add the correct compressor if it's in the "filters" list | ||
| if new_zarray["compressor"] is None and new_zarray["filters"]: | ||
| for zfilter in new_zarray["filters"]: | ||
| if zfilter["id"] == "zlib": | ||
| new_zarray["compressor"] = zfilter | ||
| new_zarray["filters"].remove(zfilter) | ||
|
|
||
| if not group: | ||
| new_content['refs'][f"{varname}/.zarray"] = ujson.dumps(new_zarray) | ||
| else: | ||
| new_content['refs'][f"{varname} /{varname}/.zarray"] = ujson.dumps(new_zarray) | ||
|
|
||
| # FIXME TODO this is an absolute nightmate: the type of bucket on UOR ACES | ||
| # this is a HACK and it works only with the crazy Bryan S3 bucket "bnl/file.nc" | ||
| # the problem: filename gets written to JSON as "s3://bnl/file.nc" but Reductionist doesn't | ||
| # find it since it needs url=bnl/file.nc, with endpoint URL being extracted from the | ||
| # endpoint_url of storage_options. BAH! | ||
| if bryan_bucket: | ||
| for key in new_content['refs'].keys(): | ||
| if varname in key and isinstance(new_content['refs'][key], list) and "s3://" in new_content['refs'][key][0]: | ||
| new_content['refs'][key][0] = new_content['refs'][key][0].replace("s3://", "") | ||
|
|
||
| return new_content | ||
|
|
||
|
|
||
| def _return_zcomponents(content, varname): | ||
| """Return zarr array and attributes.""" | ||
| # account for both Group and Dataset | ||
| try: | ||
| zarray = ujson.loads(content['refs'][f"{varname}/.zarray"]) | ||
| zattrs = ujson.loads(content['refs'][f"{varname}/.zattrs"]) | ||
| except KeyError: | ||
| zarray = ujson.loads(content['refs'][f"{varname} /{varname}/.zarray"]) | ||
| zattrs = ujson.loads(content['refs'][f"{varname} /{varname}/.zattrs"]) | ||
|
|
||
| return zarray, zattrs | ||
|
|
||
|
|
||
| def _correct_compressor_and_filename(content, varname, bryan_bucket=False): | ||
| """ | ||
|
|
@@ -64,8 +122,26 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): | |
| ) | ||
| fs2 = fsspec.filesystem('') | ||
| with fs.open(file_url, 'rb') as s3file: | ||
| # this block allows for Dataset/Group selection but is causing | ||
| # SegFaults in S3/Minio tests; h5py backend very brittle: see below for reasoning behind this | ||
| # since this case is only for the S3/Minio tests, it's OK to not have it, test files are small | ||
| # with fs.open(file_url, 'rb') as s3file_o_1: # -> best to have unique name | ||
| # s3file_r_1 = h5py.File(s3file_o_1, mode="r") | ||
| # s3file_w_1 = h5py.File(s3file_o_1, mode="w") | ||
| # if isinstance(s3file_r_1[varname], h5py.Dataset): | ||
| # print("Looking only at a single Dataset", s3file_r_1[varname]) | ||
| # s3file_w_1.create_group(varname + " ") | ||
| # s3file_w_1[varname + " "][varname] = s3file_w_1[varname] | ||
| # s3file = s3file_w_1[varname + " "] | ||
| # elif isinstance(s3file_r_1[varname], h5py.Group): | ||
| # print("Looking only at a single Group", s3file_r_1[varname]) | ||
| # s3file = s3file_r_1[varname] | ||
| # storage_options = {"key": S3_ACCESS_KEY, | ||
| # "secret": S3_SECRET_KEY, | ||
| # "client_kwargs": {'endpoint_url': S3_URL}} | ||
| h5chunks = SingleHdf5ToZarr(s3file, file_url, | ||
| inline_threshold=0) | ||
| # storage_options=storage_options) | ||
|
|
||
| # TODO absolute crap, this needs to go | ||
| # see comments in _correct_compressor_and_filename | ||
|
|
@@ -79,6 +155,8 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): | |
| varname, | ||
| bryan_bucket=bryan_bucket) | ||
| f.write(ujson.dumps(content).encode()) | ||
| zarray, zattrs = _return_zcomponents(content, varname) | ||
| return outf, zarray, zattrs | ||
|
|
||
| # S3 passed-in configuration | ||
| elif storage_type == "s3" and storage_options is not None: | ||
|
|
@@ -87,7 +165,36 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): | |
| storage_options['default_cache_type'] = "first" # best for HDF5 | ||
| fs = s3fs.S3FileSystem(**storage_options) | ||
| fs2 = fsspec.filesystem('') | ||
| with fs.open(file_url, 'rb') as s3file: | ||
| tk1 = time.time() | ||
| with fs.open(file_url, 'rb') as s3file_o: | ||
| # restrict only to the Group/Dataset that the varname belongs to | ||
| # this saves 4-5x time in Kerchunk | ||
| # Restrict the s3file HDF5 file only to the Dataset or Group of interest. | ||
| # This bit extracts the Dataset or Group of interest | ||
| # (depending what type of object the varname is in). It is the best we can do with | ||
| # non-breaking h5py API, This is a touchy bit of said API, and depending on the | ||
| # way things are coded, can easily through SegFaults. Explanations: | ||
| # - an s3fs File object with HDF5 structure is passed in | ||
| # - h5py allows structural edits (creating/adding a Group) | ||
| # only if opening said file in WRITE mode | ||
| # - clear distinction between said File open in W mode as opposed to | ||
| # said file open in R(B) mode | ||
| # - the reason we open it in R mode is that we can only truncate it (select on key) if in R mode | ||
| # and then migrate extracted data to the file open in W mode | ||
| # - operations like copy or selection/truncating will always throw SegFaults | ||
| # if not operating with two open Files: W and R | ||
| # - this block can not be extracted into a function because we need to dealloc each instance of | ||
| # s3file_o, s3file_r and s3file_w (hence the naming is different in the step above) | ||
| s3file_r = h5py.File(s3file_o, mode="r") | ||
| s3file_w = h5py.File(s3file_o, mode="w") | ||
| if isinstance(s3file_r[varname], h5py.Dataset): | ||
| print("Looking only at a single Dataset", s3file_r[varname]) | ||
| s3file_w.create_group(varname + " ") | ||
| s3file_w[varname + " "][varname] = s3file_w[varname] | ||
| s3file = s3file_w[varname + " "] | ||
| elif isinstance(s3file_r[varname], h5py.Group): | ||
| print("Looking only at a single Group", s3file_r[varname]) | ||
| s3file = s3file_r[varname] | ||
|
|
||
| # Kerchunk wants the correct file name in S3 format | ||
| if not file_url.startswith("s3://"): | ||
|
|
@@ -100,13 +207,21 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): | |
| bryan_bucket = True | ||
|
|
||
| h5chunks = SingleHdf5ToZarr(s3file, file_url, | ||
| inline_threshold=0) | ||
| inline_threshold=0, | ||
| storage_options=storage_options) | ||
| tk2 = time.time() | ||
| with fs2.open(outf, 'wb') as f: | ||
| content = h5chunks.translate() | ||
| content = _correct_compressor_and_filename(content, | ||
| varname, | ||
| bryan_bucket=bryan_bucket) | ||
| f.write(ujson.dumps(content).encode()) | ||
| tk3 = time.time() | ||
| print("Time to Kerchunk and write JSON file", tk3 - tk2) | ||
|
|
||
| zarray, zattrs = _return_zcomponents(content, varname) | ||
| return outf, zarray, zattrs | ||
|
|
||
| # not S3 | ||
| else: | ||
| fs = fsspec.filesystem('') | ||
|
|
@@ -132,10 +247,8 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): | |
| bryan_bucket=False) | ||
| f.write(ujson.dumps(content).encode()) | ||
|
|
||
| zarray = ujson.loads(content['refs'][f"{varname}/.zarray"]) | ||
| zattrs = ujson.loads(content['refs'][f"{varname}/.zattrs"]) | ||
|
|
||
| return outf, zarray, zattrs | ||
| zarray, zattrs = _return_zcomponents(content, varname) | ||
| return outf, zarray, zattrs | ||
|
|
||
|
|
||
| def open_zarr_group(out_json, varname): | ||
|
|
@@ -150,14 +263,20 @@ def open_zarr_group(out_json, varname): | |
| mapper = fs.get_mapper("") # local FS mapper | ||
| #mapper.fs.reference has the kerchunk mapping, how does this propagate into the Zarr array? | ||
| zarr_group = zarr.open_group(mapper) | ||
|
|
||
|
|
||
| not_group = False | ||
| try: | ||
| zarr_array = getattr(zarr_group, varname) | ||
| except AttributeError as attrerr: | ||
| print(f"Zarr Group does not contain variable {varname}. " | ||
| f"Zarr Group info: {zarr_group.info}") | ||
| raise attrerr | ||
| #print("Zarr array info:", zarr_array.info) | ||
| zarr_array = getattr(zarr_group, varname + " ") | ||
| except AttributeError: | ||
| not_group = True | ||
| pass | ||
| if not_group: | ||
| try: | ||
| zarr_array = getattr(zarr_group, varname) | ||
| except AttributeError: | ||
| print(f"Zarr Group does not contain variable {varname}. " | ||
| f"Zarr Group info: {zarr_group.info}") | ||
| raise | ||
|
|
||
| return zarr_array | ||
|
|
||
|
|
@@ -179,17 +298,3 @@ def load_netcdf_zarr_generic(fileloc, varname, storage_type, storage_options, bu | |
| ref_ds = open_zarr_group(out_json.name, varname) | ||
|
|
||
| return ref_ds, zarray, zattrs | ||
|
|
||
|
|
||
| #d = {'version': 1, | ||
| # 'refs': { | ||
| # '.zgroup': '{"zarr_format":2}', | ||
| # '.zattrs': '{"Conventions":"CF-1.6","access-list":"grenvillelister simonwilson jeffcole","awarning":"**** THIS SUITE WILL ARCHIVE NON-DUPLEXED DATA TO MOOSE. FOR CRITICAL MODEL RUNS SWITCH TO DUPLEXED IN: postproc --> Post Processing - common settings --> Moose Archiving --> non_duplexed_set. Follow guidance in http:\\/\\/www-twiki\\/Main\\/MassNonDuplexPolicy","branch-date":"1950-01-01","calendar":"360_day","code-version":"UM 11.6, NEMO vn3.6","creation_time":"2022-10-28 12:28","decription":"Initialised from EN4 climatology","description":"Copy of u-ar696\\/trunk@77470","email":"[email protected]","end-date":"2015-01-01","experiment-id":"historical","forcing":"AA,BC,CO2","forcing-info":"blah, blah, blah","institution":"NCAS","macro-parent-experiment-id":"historical","macro-parent-experiment-mip":"CMIP","macro-parent-variant-id":"r1i1p1f3","model-id":"HadGEM3-CG31-MM","name":"\\/work\\/n02\\/n02\\/grenvill\\/cylc-run\\/u-cn134\\/share\\/cycle\\/19500101T0000Z\\/3h_","owner":"rosalynhatcher","project":"Coupled Climate","timeStamp":"2022-Oct-28 12:20:33 GMT","title":"[CANARI] GC3.1 N216 ORCA025 UM11.6","uuid":"51e5ef20-d376-4aa6-938e-4c242886b7b1"}', | ||
| # 'lat/.zarray': '{"chunks":[324],"compressor":{"id":"zlib","level":1},"dtype":"<f4","fill_value":null,"filters":[{"elementsize":4,"id":"shuffle"}],"order":"C","shape":[324],"zarr_format":2}', 'lat/.zattrs': '{"_ARRAY_DIMENSIONS":["lat"],"axis":"Y","long_name":"Latitude","standard_name":"latitude","units":"degrees_north"}', | ||
| # 'lat/0': ['/home/david/Downloads/3h__19500101-19500110.nc', 26477, 560], | ||
| # 'lon/.zarray': '{"chunks":[432],"compressor":{"id":"zlib","level":1},"dtype":"<f4","fill_value":null,"filters":[{"elementsize":4,"id":"shuffle"}],"order":"C","shape":[432],"zarr_format":2}', | ||
| # 'lon/.zattrs': '{"_ARRAY_DIMENSIONS":["lon"],"axis":"X","long_name":"Longitude","standard_name":"longitude","units":"degrees_east"}', | ||
| # 'lon/0': ['/home/david/Downloads/3h__19500101-19500110.nc', 27037, 556], | ||
| # 'm01s00i507_10/.zarray': '{"chunks":[1,324,432],"compressor":{"id":"zlib","level":1},"dtype":"<f4","fill_value":-1073741824.0,"filters":[{"elementsize":4,"id":"shuffle"}],"order":"C","shape":[80,324,432],"zarr_format":2}', | ||
| # 'm01s00i507_10/.zattrs': '{"_ARRAY_DIMENSIONS":["time_counter","lat","lon"],"cell_methods":"time: mean (interval: 900 s)","coordinates":"time_centered","interval_offset":"0ts","interval_operation":"900 s","interval_write":"3 h","long_name":"OPEN SEA SURFACE TEMP AFTER TIMESTEP","missing_value":-1073741824.0,"online_operation":"average","standard_name":"surface_temperature","units":"K"}', | ||
| # }} | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bnlawrence here is the fix for the new compressor in filters feature of Kerchunk