From 210dd4301caae3cd7b21fc6b40fe883633640ec8 Mon Sep 17 00:00:00 2001 From: William Ronchetti Date: Fri, 8 Sep 2023 13:26:46 -0400 Subject: [PATCH 1/6] in progress --- dcicutils/transfer_utils.py | 93 +++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 dcicutils/transfer_utils.py diff --git a/dcicutils/transfer_utils.py b/dcicutils/transfer_utils.py new file mode 100644 index 000000000..e3f34a79c --- /dev/null +++ b/dcicutils/transfer_utils.py @@ -0,0 +1,93 @@ +import os +from ff_utils import search_metadata +import subprocess +import concurrent.futures +from env_utils import is_cgap_env +from creds_utils import CGAPKeyManager, SMaHTKeyManager + + +class TransferUtilsError(Exception): + pass + + +class Downloader: + CURL = 'curl' + WGET = 'wget' + RCLONE = 'rclone' + GLOBUS = 'globus' + VALID_DOWNLOADERS = [ + CURL, WGET, RCLONE, GLOBUS + ] + + +class TransferUtils: + """ Utility class for downloading files to a local system """ + + def __init__(self, *, ff_env, num_processes=8, download_path, downloader=Downloader.CURL): + """ Builds the TransferUtils object, initializing Auth etc """ + self.num_processes = num_processes + self.download_path = download_path + if downloader not in Downloader.VALID_DOWNLOADERS: + raise TransferUtilsError(f'Passed invalid/unsupported downloader to TransferUtils: {downloader}') + self.downloader = downloader.lower() + self.key = (CGAPKeyManager().get_keydict_for_env(ff_env) if is_cgap_env else + SMaHTKeyManager().get_keydict_for_env(ff_env)) + + def initialize_download_path(self): + """ Creates dirs down to the path if they do not exist """ + if not os.path.exists(self.download_path): + os.makedirs(self.download_path) + + def extract_file_download_urls_from_search(self, search: str) -> dict: + """ Returns dictionary mapping file names to URLs from a File search """ + mapping = {} + for file_item in search_metadata(search, key=self.key): + filename = file_item['accession'] + download_url = f'{self.key.get("server", "invalid-keyfile")}/{file_item["@id"]}/@@download' + mapping[filename] = download_url + return mapping + + @staticmethod + def download_curl(url: str, filename: str) -> None: + """ Downloads from url under filename at the download path using curl """ + subprocess.run(['curl', '-L', url, '-o', filename], check=True) + + @staticmethod + def download_wget(url: str, filename): + """ Downloads from url under filename at the download path using wget """ + subprocess.run(['wget', '-q', url, '-O', filename], check=True) + + @staticmethod + def download_rclone(url, filename): + """ Downloads from url under filename at the download path using rclone """ + subprocess.run(['rclone', 'copy', url, filename], check=True) + + @staticmethod + def download_globus(url, filename): + """ Downloads from url under filename at the download path using curl """ + subprocess.run(['globus', 'transfer', 'download', url, filename], check=True) + + def download_file(self, url, filename): + """ Entrypoint for general download, will select appropriate downloader depending on what was + passed to init + """ + if self.downloader == Downloader.CURL: + return self.download_curl(url, filename) + elif self.downloader == Downloader.WGET: + return self.download_wget(url, filename) + elif self.downloader == Downloader.GLOBUS: + return self.download_globus(url, filename) + else: # rclone + return self.download_rclone(url, filename) + + def parallel_download(self, filename_to_url_mapping): + """ Executes a parallel download given the result of extract_file_download_urls_from_search """ + download_files = [] + with concurrent.futures.ProcessPoolExecutor(max_workers=self.num_processes) as executor: + for filename, download_url in filename_to_url_mapping.items(): + results = list(executor.map(self.download_file, download_url, filename)) + + for result in results: + if result is not None: + download_files.append(result) + return download_files From ac980675d4ca161ab3698e4402ddbcac3dc31359 Mon Sep 17 00:00:00 2001 From: William Ronchetti Date: Fri, 8 Sep 2023 13:53:44 -0400 Subject: [PATCH 2/6] version, type hinting --- CHANGELOG.rst | 7 +++++++ dcicutils/transfer_utils.py | 21 +++++++++++++++------ pyproject.toml | 2 +- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a92be15a1..886766943 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -6,6 +6,13 @@ dcicutils Change Log ---------- +7.11.0 +====== + +* New module ``transfer_utils``: + + * Creates new utilities for downloading files and patching a location back to the portal + 7.10.0 ====== diff --git a/dcicutils/transfer_utils.py b/dcicutils/transfer_utils.py index e3f34a79c..070cbfcab 100644 --- a/dcicutils/transfer_utils.py +++ b/dcicutils/transfer_utils.py @@ -47,30 +47,39 @@ def extract_file_download_urls_from_search(self, search: str) -> dict: mapping[filename] = download_url return mapping + def patch_location_to_portal(self, atid, file_path): + """ Patches a special field to atid indicating it is redundantly stored at file_path """ + pass # implement me after data model is in + @staticmethod - def download_curl(url: str, filename: str) -> None: + def download_curl(url: str, filename: str) -> str: """ Downloads from url under filename at the download path using curl """ subprocess.run(['curl', '-L', url, '-o', filename], check=True) + return filename @staticmethod - def download_wget(url: str, filename): + def download_wget(url: str, filename: str) -> str: """ Downloads from url under filename at the download path using wget """ subprocess.run(['wget', '-q', url, '-O', filename], check=True) + return filename @staticmethod - def download_rclone(url, filename): + def download_rclone(url: str, filename: str) -> str: """ Downloads from url under filename at the download path using rclone """ subprocess.run(['rclone', 'copy', url, filename], check=True) + return filename @staticmethod - def download_globus(url, filename): + def download_globus(url: str, filename: str) -> str: """ Downloads from url under filename at the download path using curl """ subprocess.run(['globus', 'transfer', 'download', url, filename], check=True) + return filename - def download_file(self, url, filename): + def download_file(self, url: str, filename: str) -> str: """ Entrypoint for general download, will select appropriate downloader depending on what was passed to init """ + filename = os.path.join(self.download_path, filename) if self.downloader == Downloader.CURL: return self.download_curl(url, filename) elif self.downloader == Downloader.WGET: @@ -80,7 +89,7 @@ def download_file(self, url, filename): else: # rclone return self.download_rclone(url, filename) - def parallel_download(self, filename_to_url_mapping): + def parallel_download(self, filename_to_url_mapping: dict) -> list: """ Executes a parallel download given the result of extract_file_download_urls_from_search """ download_files = [] with concurrent.futures.ProcessPoolExecutor(max_workers=self.num_processes) as executor: diff --git a/pyproject.toml b/pyproject.toml index 1078a57ea..65dba0353 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dcicutils" -version = "7.10.0" +version = "7.11.0" description = "Utility package for interacting with the 4DN Data Portal and other 4DN resources" authors = ["4DN-DCIC Team "] license = "MIT" From c59fa0dc33fbff8d1e6fce1c9b87b65228dddfdc Mon Sep 17 00:00:00 2001 From: William Ronchetti Date: Fri, 29 Sep 2023 13:37:33 -0400 Subject: [PATCH 3/6] finish initial pass + test --- dcicutils/ff_utils.py | 16 ++++++++++++++++ dcicutils/transfer_utils.py | 12 ++++++++++-- test/test_ff_utils.py | 31 +++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 2 deletions(-) diff --git a/dcicutils/ff_utils.py b/dcicutils/ff_utils.py index 280bdc0df..bbf5cdf85 100644 --- a/dcicutils/ff_utils.py +++ b/dcicutils/ff_utils.py @@ -299,6 +299,22 @@ def get_metadata(obj_id, key=None, ff_env=None, check_queue=False, add_on=''): return get_response_json(response) +def get_download_url(obj_id: str, key: Optional[dict] = None, ff_env: str = None) -> str: + """ + Function to get a download URL for a file without following the redirect so that such results can + be accumulated for retrieval later. Note that by default download URLs expire after 24 hours + + :param obj_id: resource path to object we would like to download + :param key: relevant auth + :param ff_env: if using admin keys, name of env to resolve + :return: url to s3 for download + """ + auth = get_authentication_with_server(key, ff_env) + get_url = '/'.join([auth['server'], _sls(obj_id)]).rstrip('/') + '/@@download' + response = authorized_request(get_url, auth=auth, verb='GET', allow_redirects=False) + return response.headers['Location'] + + def patch_metadata(patch_item, obj_id='', key=None, ff_env=None, add_on=''): """ Patch metadata given the patch body and an optional obj_id (if not provided, diff --git a/dcicutils/transfer_utils.py b/dcicutils/transfer_utils.py index 070cbfcab..2f7250ef9 100644 --- a/dcicutils/transfer_utils.py +++ b/dcicutils/transfer_utils.py @@ -1,9 +1,10 @@ import os -from ff_utils import search_metadata import subprocess import concurrent.futures from env_utils import is_cgap_env from creds_utils import CGAPKeyManager, SMaHTKeyManager +from ff_utils import search_metadata, get_download_url +from misc_utils import PRINT class TransferUtilsError(Exception): @@ -43,7 +44,14 @@ def extract_file_download_urls_from_search(self, search: str) -> dict: mapping = {} for file_item in search_metadata(search, key=self.key): filename = file_item['accession'] - download_url = f'{self.key.get("server", "invalid-keyfile")}/{file_item["@id"]}/@@download' + try: + download_url = get_download_url(file_item['@id']) + except Exception as e: + PRINT(f'Could not retrieve download link for {filename} - is it a file type?') + mapping[filename] = e + continue + if '.s3.amazonaws.com' not in download_url: + PRINT(f'Potentially bad URL retrieved back from application: {download_url} - continuing') mapping[filename] = download_url return mapping diff --git a/test/test_ff_utils.py b/test/test_ff_utils.py index 16413a519..92dc077be 100644 --- a/test/test_ff_utils.py +++ b/test/test_ff_utils.py @@ -726,6 +726,37 @@ def test_get_metadata_integrated(integrated_ff): assert isinstance(res_obj['individual'], str) +@pytest.mark.integratedx +def test_get_download_url_integrated(integrated_ff): + """ Tests in an integrated fashion the download URL API ie: it should not follow redirects and tolerate + various different item type formats + """ + valid_formats = [ + '4DNFIH6Z2ZD5', # accession + 'files-processed/4DNFIH6Z2ZD5/', # variations on actual resource path + '/files-processed/4DNFIH6Z2ZD5/', + '//files-processed/4DNFIH6Z2ZD5/', + 'files-processed/4DNFIH6Z2ZD5//', + '//files-processed/4DNFIH6Z2ZD5//', + 'b9930e7a-49e5-4c33-afab-9ec90d65faf3', # variations on uuid + '/b9930e7a-49e5-4c33-afab-9ec90d65faf3', + '/b9930e7a-49e5-4c33-afab-9ec90d65faf3/', + '//b9930e7a-49e5-4c33-afab-9ec90d65faf3', + 'b9930e7a-49e5-4c33-afab-9ec90d65faf3//' + ] + for format in valid_formats: + assert 's3.amazonaws.com' in ff_utils.get_download_url(format, key=integrated_ff['ff_key']) + + invalid_formats = [ + 'not-a-uuid', + 'b9930e7a-49e5-4c33-afab-9ec90d65faf4', # non-existent uuid + '986b362f-4eb6-4a9c-8173-3ab267307e3a' # uuid of a user (no download) + ] + for format in invalid_formats: + with pytest.raises(Exception): + ff_utils.get_download_url(format, key=integrated_ff['ff_key']) + + @pytest.mark.integrated @pytest.mark.flaky def test_patch_metadata_integrated(integrated_ff): From ca58011898bf30f6493254dd553a28b7f7ac6a22 Mon Sep 17 00:00:00 2001 From: William Ronchetti Date: Fri, 29 Sep 2023 14:03:05 -0400 Subject: [PATCH 4/6] fix module decl --- docs/source/dcicutils.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/source/dcicutils.rst b/docs/source/dcicutils.rst index f15307d0e..6329a5aa7 100644 --- a/docs/source/dcicutils.rst +++ b/docs/source/dcicutils.rst @@ -309,6 +309,13 @@ trace_utils :members: +transfer_utils +^^^^^^^^^^^ + +.. automodule:: dcicutils.transfer_utils + :members: + + variant_utils ^^^^^^^^^^^ From b6247ff88dd18fd9fd4088010e32d1de7baa8063 Mon Sep 17 00:00:00 2001 From: William Ronchetti Date: Wed, 22 Nov 2023 09:02:10 -0500 Subject: [PATCH 5/6] build out some remaining functionality --- dcicutils/transfer_utils.py | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/dcicutils/transfer_utils.py b/dcicutils/transfer_utils.py index 2f7250ef9..e08983d21 100644 --- a/dcicutils/transfer_utils.py +++ b/dcicutils/transfer_utils.py @@ -3,7 +3,7 @@ import concurrent.futures from env_utils import is_cgap_env from creds_utils import CGAPKeyManager, SMaHTKeyManager -from ff_utils import search_metadata, get_download_url +from ff_utils import search_metadata, get_download_url, get_metadata, patch_metadata from misc_utils import PRINT @@ -23,6 +23,7 @@ class Downloader: class TransferUtils: """ Utility class for downloading files to a local system """ + O2_PATH_FIELD = 'o2_path' # set this field on files to indicate download location on o2 def __init__(self, *, ff_env, num_processes=8, download_path, downloader=Downloader.CURL): """ Builds the TransferUtils object, initializing Auth etc """ @@ -50,14 +51,42 @@ def extract_file_download_urls_from_search(self, search: str) -> dict: PRINT(f'Could not retrieve download link for {filename} - is it a file type?') mapping[filename] = e continue + # this check may need to be revised in case we start sending folks other places if '.s3.amazonaws.com' not in download_url: PRINT(f'Potentially bad URL retrieved back from application: {download_url} - continuing') mapping[filename] = download_url return mapping - def patch_location_to_portal(self, atid, file_path): + def get_exiting_o2_path(self, atid: str) -> Optional[str, None]: + """ Does a GET for a file and checks if there is an existing O2 path, if so return the path, else + return None + """ + # ensure minimal but up to date view + meta = get_metadata(atid, key=self.key, add_on='?datastore=database&frame=raw') + if self.O2_PATH_FIELD in meta: + return True + return False + + def delete_existing_o2_path(self, atid: str) -> dict: + """ Deletes an existing download path from a file - do this if it has been removed and you want it + tracked or you need to re-trigiger a download """ + return patch_metadata({}, f'{atid}?delete_fields={self.O2_PATH_FIELD}') + + def patch_location_to_portal(self, atid: str, file_path: str) -> bool: """ Patches a special field to atid indicating it is redundantly stored at file_path """ - pass # implement me after data model is in + path = self.get_exiting_o2_path(atid) + if path and file_path != path: + PRINT(f'WARNING: patching a new path for file {atid} - ensure file is not present at {path}\n' + f'new path: {file_path}\n' + f'Delete o2_path from existing file in order to proceed') + return False + elif path and file_path == path: + PRINT(f'WARNING: potentially triggering duplicate download of {atid} - double check it has not already ' + f'been downloaded to {path} - if it has remove the o2_path before calling again.') + return False + else: + patch_metadata({self.O2_PATH_FIELD: file_path}, atid) + return True @staticmethod def download_curl(url: str, filename: str) -> str: From 8eb492ba9eb9675c942551cee46be2449c0fe0b2 Mon Sep 17 00:00:00 2001 From: William Ronchetti Date: Wed, 29 Nov 2023 09:27:20 -0500 Subject: [PATCH 6/6] basic test --- dcicutils/transfer_utils.py | 13 +++++++------ test/test_transfer_utils.py | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 6 deletions(-) create mode 100644 test/test_transfer_utils.py diff --git a/dcicutils/transfer_utils.py b/dcicutils/transfer_utils.py index e08983d21..d548e02a4 100644 --- a/dcicutils/transfer_utils.py +++ b/dcicutils/transfer_utils.py @@ -1,10 +1,11 @@ import os import subprocess import concurrent.futures -from env_utils import is_cgap_env -from creds_utils import CGAPKeyManager, SMaHTKeyManager -from ff_utils import search_metadata, get_download_url, get_metadata, patch_metadata -from misc_utils import PRINT +from typing import Optional +from .env_utils import is_cgap_env +from .creds_utils import CGAPKeyManager, SMaHTKeyManager +from .ff_utils import search_metadata, get_download_url, get_metadata, patch_metadata +from .misc_utils import PRINT class TransferUtilsError(Exception): @@ -25,7 +26,7 @@ class TransferUtils: """ Utility class for downloading files to a local system """ O2_PATH_FIELD = 'o2_path' # set this field on files to indicate download location on o2 - def __init__(self, *, ff_env, num_processes=8, download_path, downloader=Downloader.CURL): + def __init__(self, *, ff_env, num_processes=8, download_path='./', downloader=Downloader.CURL): """ Builds the TransferUtils object, initializing Auth etc """ self.num_processes = num_processes self.download_path = download_path @@ -57,7 +58,7 @@ def extract_file_download_urls_from_search(self, search: str) -> dict: mapping[filename] = download_url return mapping - def get_exiting_o2_path(self, atid: str) -> Optional[str, None]: + def get_exiting_o2_path(self, atid: str) -> Optional[str]: """ Does a GET for a file and checks if there is an existing O2 path, if so return the path, else return None """ diff --git a/test/test_transfer_utils.py b/test/test_transfer_utils.py new file mode 100644 index 000000000..9718c0259 --- /dev/null +++ b/test/test_transfer_utils.py @@ -0,0 +1,35 @@ +import pytest +from contextlib import contextmanager +from dcicutils import transfer_utils +from dcicutils.transfer_utils import TransferUtils +from unittest import mock + + +# TODO: centralize below utilities elsewhere +def create_dummy_keydict(): + return {'cgap-dummy': { + 'key': 'dummy', 'secret': 'dummy', + 'server': 'cgap-test.com' + }} + + +class CGAPKeyManager: + def get_keydict_for_env(self, keys_file=None): + return create_dummy_keydict()['cgap-dummy'] + + +@contextmanager +def mock_key_manager(): + with mock.patch.object(transfer_utils, 'CGAPKeyManager', new=CGAPKeyManager): + yield + + +class TestTransferUtils: + """ Tests some basic functionality with mocks for TransferUtils """ + + def test_transfer_utils_basic(self): + """ Tests that we can instantiate an object with defaults """ + with mock_key_manager(): + ts = TransferUtils(ff_env='cgap-dummy') + assert ts.downloader == 'curl' + assert ts.download_path == './'