diff --git a/miscelleaneous_tests/dask_pickles_etc/dask_failure_in_mem_param_store.py b/miscelleaneous_tests/dask_pickles_etc/dask_failure_in_mem_param_store.py new file mode 100644 index 0000000..7c221b3 --- /dev/null +++ b/miscelleaneous_tests/dask_pickles_etc/dask_failure_in_mem_param_store.py @@ -0,0 +1,52 @@ +import yt +from dask import delayed, compute +from dask.distributed import Client +import sys +import time + +def find_min_val(ds): + # just something to take some time + return ds.all_data()['gas', 'density'].min() + +yt.set_log_level(50) + +# The store_parameter_files config option controls how datasets +# are pickled. If True, an on-disk csv file is created. If False, +# an in-memory store is used, which can introduce some +# tricky state-dependence. Make sure it is False here: +assert not yt.config.ytcfg.get('yt','store_parameter_files') + +# Putting Load HERE works +# fname = 'IsolatedGalaxy/galaxy0030/galaxy0030' +# ds = yt.load(fname) + +if __name__ == "__main__": + + n_workers = int(sys.argv[1]) + tpw = int(sys.argv[2]) + + start_time = time.time() + print((n_workers, tpw)) + + c = Client(n_workers=n_workers, threads_per_worker=tpw) + + tasks = [] + + # Putting Load HERE fails with + # File "/home/chavlin/miniconda3/envs/yt_py39/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 96, in loads + # return pickle.loads(x) + # File "/home/chavlin/src/yt_general/yt/yt/data_objects/static_output.py", line 2053, in _reconstruct_ds + # ds = datasets.get_ds_hash(*args) + # File "/home/chavlin/src/yt_general/yt/yt/utilities/parameter_file_storage.py", line 91, in get_ds_hash + # return self._convert_ds(self._records[hash]) + # KeyError: 'e34a252e426f6cc81be22b03b77786ea' + + fname = 'IsolatedGalaxy/galaxy0030/galaxy0030' + ds = yt.load(fname) + + for _ in range(10): + tasks.append(delayed(find_min_val)(ds)) + + results = compute(*tasks) + print(time.time() - start_time) + c.close() \ No newline at end of file diff --git a/miscelleaneous_tests/dask_pickles_etc/exploring.py b/miscelleaneous_tests/dask_pickles_etc/exploring.py new file mode 100644 index 0000000..9981b43 --- /dev/null +++ b/miscelleaneous_tests/dask_pickles_etc/exploring.py @@ -0,0 +1,75 @@ +import yt +from dask import delayed, compute +from dask.distributed import Client +import sys +import time +import pickle + + +def indexed_operation(ds): + + + return ds * 2 + + +def separate_loads(fname): + ds = yt.load(fname) + _ = ds.index + return ds.all_data()['gas', 'density'].min() + +def pre_loaded(ds): + return ds.all_data()['gas', 'density'].min() + +yt.set_log_level(50) + +""" +important note on dask state: + +When you start the dask distributed cluster, new subprocesses are created. +Unless you use fork (&), this will involve reimporting the current script, with +a name other than __main__ . This is how the if block prevents each worker process +from also trying to create clusters-within-clusters. So, by placing your import +within the if block, you stop the workers from executing them. + +https://stackoverflow.com/questions/75837897/dask-worker-has-different-imports-than-main-thread + +so, need to: + +1. set 'store_parameter_files' config to true + +and before __name__ need to ensure the object registries are updated +""" +# yt.config.ytcfg.set('yt', 'store_parameter_files', True) +# try: +# _ = yt.load("lwkrjewlrkeja") +# except FileNotFoundError: +# pass + +if __name__ == "__main__": + + + n_workers = int(sys.argv[1]) + processes = bool(sys.argv[2]) + tpw = int(sys.argv[3]) + + start_time = time.time() + print((n_workers, processes, tpw)) + + c = Client(n_workers=n_workers, threads_per_worker=tpw) + + tasks = [] + + # case 1: load on each process (works) + # fname = 'IsolatedGalaxy/galaxy0030/galaxy0030' + # for _ in range(10): + # tasks.append(delayed(separate_loads)(fname)) + + # case 2: pre-load, pass ds + fname = 'IsolatedGalaxy/galaxy0030/galaxy0030' + ds = yt.load(fname) + for _ in range(10): + tasks.append(delayed(pre_loaded)(ds)) + + results = compute(*tasks) + print(time.time() - start_time) + c.close() \ No newline at end of file diff --git a/miscelleaneous_tests/dask_pickles_etc/pickle_from_file.py b/miscelleaneous_tests/dask_pickles_etc/pickle_from_file.py new file mode 100644 index 0000000..a3b8f86 --- /dev/null +++ b/miscelleaneous_tests/dask_pickles_etc/pickle_from_file.py @@ -0,0 +1,9 @@ +import yt +import pickle + +if __name__ == '__main__': + + with open('yt_ds.pickle', 'rb') as handle: + ds = pickle.load(handle) + + ds.index diff --git a/miscelleaneous_tests/dask_pickles_etc/pickle_to_file.py b/miscelleaneous_tests/dask_pickles_etc/pickle_to_file.py new file mode 100644 index 0000000..f555e87 --- /dev/null +++ b/miscelleaneous_tests/dask_pickles_etc/pickle_to_file.py @@ -0,0 +1,10 @@ +import yt +import pickle + +if __name__ == '__main__': + + fname = 'IsolatedGalaxy/galaxy0030/galaxy0030' + ds = yt.load(fname) + + with open('yt_ds.pickle', 'wb') as handle: + pickle.dump(ds, handle, protocol=pickle.HIGHEST_PROTOCOL) diff --git a/miscelleaneous_tests/dask_pickles_etc/pickling_test.py b/miscelleaneous_tests/dask_pickles_etc/pickling_test.py new file mode 100644 index 0000000..7ac5077 --- /dev/null +++ b/miscelleaneous_tests/dask_pickles_etc/pickling_test.py @@ -0,0 +1,13 @@ +import pickle +import yt + + +if __name__ == "__main__": + yt.config.ytcfg.set('yt', 'store_parameter_files', True) + yt.set_log_level(1) + fname = 'IsolatedGalaxy/galaxy0030/galaxy0030' + ds = yt.load(fname) + ds.index + + ds_1 = pickle.loads(pickle.dumps(ds)) + ds_1.index \ No newline at end of file diff --git a/miscelleaneous_tests/dask_pickles_etc/yt_ds.pickle b/miscelleaneous_tests/dask_pickles_etc/yt_ds.pickle new file mode 100644 index 0000000..a665abe Binary files /dev/null and b/miscelleaneous_tests/dask_pickles_etc/yt_ds.pickle differ