From ab655a3735a25174874fbd82a147172b36bcd0d6 Mon Sep 17 00:00:00 2001 From: chrishavlin Date: Thu, 13 Jun 2024 14:58:10 -0500 Subject: [PATCH 1/2] some dask experiments --- .../dask_failure_in_mem_param_store.py | 70 ++++++++++++++++ .../dask_pickles_etc/exploring.py | 75 ++++++++++++++++++ .../dask_pickles_etc/pickle_from_file.py | 9 +++ .../dask_pickles_etc/pickle_to_file.py | 10 +++ .../dask_pickles_etc/pickling_test.py | 13 +++ .../dask_pickles_etc/yt_ds.pickle | Bin 0 -> 103 bytes 6 files changed, 177 insertions(+) create mode 100644 miscelleaneous_tests/dask_pickles_etc/dask_failure_in_mem_param_store.py create mode 100644 miscelleaneous_tests/dask_pickles_etc/exploring.py create mode 100644 miscelleaneous_tests/dask_pickles_etc/pickle_from_file.py create mode 100644 miscelleaneous_tests/dask_pickles_etc/pickle_to_file.py create mode 100644 miscelleaneous_tests/dask_pickles_etc/pickling_test.py create mode 100644 miscelleaneous_tests/dask_pickles_etc/yt_ds.pickle diff --git a/miscelleaneous_tests/dask_pickles_etc/dask_failure_in_mem_param_store.py b/miscelleaneous_tests/dask_pickles_etc/dask_failure_in_mem_param_store.py new file mode 100644 index 0000000..86e2248 --- /dev/null +++ b/miscelleaneous_tests/dask_pickles_etc/dask_failure_in_mem_param_store.py @@ -0,0 +1,70 @@ +import yt +from dask import delayed, compute +from dask.distributed import Client +import sys +import time + +def find_min_val(ds): + # just something to take some time + return ds.all_data()['gas', 'density'].min() + +yt.set_log_level(50) + +""" +important note on dask state: + +When you start the dask distributed cluster, new subprocesses are created. +Unless you use fork (&), this will involve reimporting the current script, with +a name other than __main__ . This is how the if block prevents each worker process +from also trying to create clusters-within-clusters. So, by placing your import +within the if block, you stop the workers from executing them. + +https://stackoverflow.com/questions/75837897/dask-worker-has-different-imports-than-main-thread + +so, need to: + +1. set 'store_parameter_files' config to true + +and before __name__ need to ensure the object registries are updated +""" + +# The store_parameter_files config option controls how datasets +# are pickled. If True, an on-disk csv file is created. If False, +# an in-memory store is used, which can introduce some +# tricky state-dependence. Make sure it is False here: +assert not yt.config.ytcfg.get('yt','store_parameter_files') + +# Putting Load HERE works +# fname = 'IsolatedGalaxy/galaxy0030/galaxy0030' +# ds = yt.load(fname) + +if __name__ == "__main__": + + n_workers = int(sys.argv[1]) + tpw = int(sys.argv[2]) + + start_time = time.time() + print((n_workers, tpw)) + + c = Client(n_workers=n_workers, threads_per_worker=tpw) + + tasks = [] + + # Putting Load HERE fails with + # File "/home/chavlin/miniconda3/envs/yt_py39/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 96, in loads + # return pickle.loads(x) + # File "/home/chavlin/src/yt_general/yt/yt/data_objects/static_output.py", line 2053, in _reconstruct_ds + # ds = datasets.get_ds_hash(*args) + # File "/home/chavlin/src/yt_general/yt/yt/utilities/parameter_file_storage.py", line 91, in get_ds_hash + # return self._convert_ds(self._records[hash]) + # KeyError: 'e34a252e426f6cc81be22b03b77786ea' + + fname = 'IsolatedGalaxy/galaxy0030/galaxy0030' + ds = yt.load(fname) + + for _ in range(10): + tasks.append(delayed(find_min_val)(ds)) + + results = compute(*tasks) + print(time.time() - start_time) + c.close() \ No newline at end of file diff --git a/miscelleaneous_tests/dask_pickles_etc/exploring.py b/miscelleaneous_tests/dask_pickles_etc/exploring.py new file mode 100644 index 0000000..9981b43 --- /dev/null +++ b/miscelleaneous_tests/dask_pickles_etc/exploring.py @@ -0,0 +1,75 @@ +import yt +from dask import delayed, compute +from dask.distributed import Client +import sys +import time +import pickle + + +def indexed_operation(ds): + + + return ds * 2 + + +def separate_loads(fname): + ds = yt.load(fname) + _ = ds.index + return ds.all_data()['gas', 'density'].min() + +def pre_loaded(ds): + return ds.all_data()['gas', 'density'].min() + +yt.set_log_level(50) + +""" +important note on dask state: + +When you start the dask distributed cluster, new subprocesses are created. +Unless you use fork (&), this will involve reimporting the current script, with +a name other than __main__ . This is how the if block prevents each worker process +from also trying to create clusters-within-clusters. So, by placing your import +within the if block, you stop the workers from executing them. + +https://stackoverflow.com/questions/75837897/dask-worker-has-different-imports-than-main-thread + +so, need to: + +1. set 'store_parameter_files' config to true + +and before __name__ need to ensure the object registries are updated +""" +# yt.config.ytcfg.set('yt', 'store_parameter_files', True) +# try: +# _ = yt.load("lwkrjewlrkeja") +# except FileNotFoundError: +# pass + +if __name__ == "__main__": + + + n_workers = int(sys.argv[1]) + processes = bool(sys.argv[2]) + tpw = int(sys.argv[3]) + + start_time = time.time() + print((n_workers, processes, tpw)) + + c = Client(n_workers=n_workers, threads_per_worker=tpw) + + tasks = [] + + # case 1: load on each process (works) + # fname = 'IsolatedGalaxy/galaxy0030/galaxy0030' + # for _ in range(10): + # tasks.append(delayed(separate_loads)(fname)) + + # case 2: pre-load, pass ds + fname = 'IsolatedGalaxy/galaxy0030/galaxy0030' + ds = yt.load(fname) + for _ in range(10): + tasks.append(delayed(pre_loaded)(ds)) + + results = compute(*tasks) + print(time.time() - start_time) + c.close() \ No newline at end of file diff --git a/miscelleaneous_tests/dask_pickles_etc/pickle_from_file.py b/miscelleaneous_tests/dask_pickles_etc/pickle_from_file.py new file mode 100644 index 0000000..a3b8f86 --- /dev/null +++ b/miscelleaneous_tests/dask_pickles_etc/pickle_from_file.py @@ -0,0 +1,9 @@ +import yt +import pickle + +if __name__ == '__main__': + + with open('yt_ds.pickle', 'rb') as handle: + ds = pickle.load(handle) + + ds.index diff --git a/miscelleaneous_tests/dask_pickles_etc/pickle_to_file.py b/miscelleaneous_tests/dask_pickles_etc/pickle_to_file.py new file mode 100644 index 0000000..f555e87 --- /dev/null +++ b/miscelleaneous_tests/dask_pickles_etc/pickle_to_file.py @@ -0,0 +1,10 @@ +import yt +import pickle + +if __name__ == '__main__': + + fname = 'IsolatedGalaxy/galaxy0030/galaxy0030' + ds = yt.load(fname) + + with open('yt_ds.pickle', 'wb') as handle: + pickle.dump(ds, handle, protocol=pickle.HIGHEST_PROTOCOL) diff --git a/miscelleaneous_tests/dask_pickles_etc/pickling_test.py b/miscelleaneous_tests/dask_pickles_etc/pickling_test.py new file mode 100644 index 0000000..7ac5077 --- /dev/null +++ b/miscelleaneous_tests/dask_pickles_etc/pickling_test.py @@ -0,0 +1,13 @@ +import pickle +import yt + + +if __name__ == "__main__": + yt.config.ytcfg.set('yt', 'store_parameter_files', True) + yt.set_log_level(1) + fname = 'IsolatedGalaxy/galaxy0030/galaxy0030' + ds = yt.load(fname) + ds.index + + ds_1 = pickle.loads(pickle.dumps(ds)) + ds_1.index \ No newline at end of file diff --git a/miscelleaneous_tests/dask_pickles_etc/yt_ds.pickle b/miscelleaneous_tests/dask_pickles_etc/yt_ds.pickle new file mode 100644 index 0000000000000000000000000000000000000000..a665abeba5d41b16d7acf28fe89bc1d948159f76 GIT binary patch literal 103 zcmZo*of^Xc0X?#nC3-1|C5iF*Nm;4MCB=HhC5a`O$?^H6B?YA=Q+oL0i&B&G^NLG~ zN|Q_CQ;Mfdp3WNvP5VV0UWrFBZs6g>b2 Cs3S=L literal 0 HcmV?d00001 From 76a759b834d29e3b6ec1bb9a3ffb0c85886664b6 Mon Sep 17 00:00:00 2001 From: chrishavlin Date: Thu, 13 Jun 2024 14:59:34 -0500 Subject: [PATCH 2/2] update comment --- .../dask_failure_in_mem_param_store.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/miscelleaneous_tests/dask_pickles_etc/dask_failure_in_mem_param_store.py b/miscelleaneous_tests/dask_pickles_etc/dask_failure_in_mem_param_store.py index 86e2248..7c221b3 100644 --- a/miscelleaneous_tests/dask_pickles_etc/dask_failure_in_mem_param_store.py +++ b/miscelleaneous_tests/dask_pickles_etc/dask_failure_in_mem_param_store.py @@ -10,24 +10,6 @@ def find_min_val(ds): yt.set_log_level(50) -""" -important note on dask state: - -When you start the dask distributed cluster, new subprocesses are created. -Unless you use fork (&), this will involve reimporting the current script, with -a name other than __main__ . This is how the if block prevents each worker process -from also trying to create clusters-within-clusters. So, by placing your import -within the if block, you stop the workers from executing them. - -https://stackoverflow.com/questions/75837897/dask-worker-has-different-imports-than-main-thread - -so, need to: - -1. set 'store_parameter_files' config to true - -and before __name__ need to ensure the object registries are updated -""" - # The store_parameter_files config option controls how datasets # are pickled. If True, an on-disk csv file is created. If False, # an in-memory store is used, which can introduce some