diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c7a7202c0..52b576850 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -6,6 +6,59 @@ dcicutils Change Log ---------- +7.5.0 +===== + +* In new module ``bucket_utils.py``: + + * ``parse_s3_object_name`` + +* In ``common.py``: + + * New glacier-related constants: + + * ``STANDARD`` + * ``REDUCED_REDUNDANCY`` + * ``STANDARD_IA`` + * ``ONEZONE_IA`` + * ``INTELLIGENT_TIERING`` + * ``GLACIER`` + * ``DEEP_ARCHIVE`` + * ``OUTPOSTS`` + * ``GLACIER_IR`` + + * New type hint ``S3ObjectNameSpec`` + +* In ``glacier_utils.py``: + + * Allow a ``version_id=`` argument to ``GlacierUtils.is_restore_finished`` + + * Some improved error messages. + + * Some small code refactors. + +* In ``misc_utils.py``: + + * Make ``make_counter`` threadsafe so that threaded functionality can call it. + +* In ``qa_utils.py``: + + * Support for mock glacier testing in ``MockBotoS3Client`` for methods: + + * ``create_multipart_upload`` + * ``upload_part_copy`` + * ``complete_multipart_upload`` + + * Revamp the abstractions for managing MockFileSystem to allow for centralized + changes that might be needed to handle new file content types, such as + + * ``MockAbstractContent`` + + * ``MockBigContent`` for mocking large files quickly and space-efficiently. + + * ``MockPartableBytes`` for mocking small content that still wants to test + piecewise-copying in support of the multipart upload protocol. + 7.4.1 ===== diff --git a/dcicutils/bucket_utils.py b/dcicutils/bucket_utils.py new file mode 100644 index 000000000..76f9c98df --- /dev/null +++ b/dcicutils/bucket_utils.py @@ -0,0 +1,32 @@ +import re + +from dcicutils.common import S3ObjectNameDict +from typing import Optional + + +# NOTE: This could be done with urllib's parsing tech, but it accepts a variety of things we don't want, +# so the error-checking would be more complicated. The documentation says particular string formats +# are accepted, so that's what we're using for now. -kmp 16-May-2023 +LOCATION_STRING_PATTERN = re.compile("^([^/?]+)/([^?]+)(?:[?]versionId=([^&]*))?$") + + +def parse_s3_object_name(object_name, ignore_errors=False) -> Optional[S3ObjectNameDict]: + """ + Parses a string of the form bucket/key or bucket/key?versionId=version, yielding a dictionary form + {"Bucket": bucket, "Key": key} or {"Bucket": bucket, "Key": key, "VersionId": version_id} + + :param object_name: a string specifying a bucket, key, and optionally a version + :return: a dictionary + """ + location_data = LOCATION_STRING_PATTERN.match(object_name) + if not location_data: + if ignore_errors: + return None + else: + raise ValueError(f"Not a valid S3 object name: {object_name!r}." + f" Format must be bucket/key or bucket/key?versionId=version") + bucket, key, version_id = location_data.groups() + result: S3ObjectNameDict = {'Bucket': bucket, 'Key': key} + if version_id: + result['VersionId'] = version_id + return result diff --git a/dcicutils/common.py b/dcicutils/common.py index d8ee2d5cf..33660ac7d 100644 --- a/dcicutils/common.py +++ b/dcicutils/common.py @@ -1,6 +1,11 @@ import os -from typing import Dict, Union, Tuple, List, Any +from typing import ( + Any, Dict, List, Optional, Tuple, Union, + # Notes on use of Final and TypedDict available at: https://peps.python.org/pep-0589/ + # TODO: Available in Python 3.8 (i.e., when we drop Python 3.7 support) + # Final, TypedDict, +) from typing_extensions import Literal @@ -8,6 +13,11 @@ REGION = 'us-east-1' +# TODO: Available in Python 3.8 (i.e., when we drop Python 3.7 support) +# +# APP_CGAP: Final = 'cgap' +# APP_FOURFRONT: Final = 'fourfront' + APP_CGAP = 'cgap' APP_FOURFRONT = 'fourfront' @@ -18,6 +28,11 @@ ORCHESTRATED_APPS = [APP_CGAP, APP_FOURFRONT] +# TODO: Available in Python 3.8 (i.e., when we drop Python 3.7 support) +# +# CHALICE_STAGE_DEV: Final = 'dev' +# CHALICE_STAGE_PROD: Final = 'prod' + CHALICE_STAGE_DEV = 'dev' CHALICE_STAGE_PROD = 'prod' @@ -30,7 +45,14 @@ # Nicknames for enumerated sets of symbols. Note that these values must be syntactic literals, # so they can't use the variables defined above. +# TODO: Available in Python 3.8 (i.e., when we drop Python 3.7 support) +# ChaliceStage = Literal[CHALICE_STAGE_DEV, CHALICE_STAGE_PROD] + ChaliceStage = Literal['dev', 'prod'] + +# TODO: Available in Python 3.8 (i.e., when we drop Python 3.7 support) +# OrchestratedApp = Literal[APP_CGAP, APP_FOURFRONT] + OrchestratedApp = Literal['cgap', 'fourfront'] LIBRARY_DIR = os.path.dirname(__file__) @@ -39,8 +61,24 @@ AuthStr = str + +# TODO: Available in Python 3.8 (i.e., when we drop Python 3.7 support) +# class SimpleAuthDict(TypedDict): +# key: str +# secret: str + SimpleAuthDict = Dict[Literal['key', 'secret'], str] + + +# TODO: Available in Python 3.8 (i.e., when we drop Python 3.7 support) +# class ServerAuthDict(TypedDict): +# key: str +# secret: str +# server: str + ServerAuthDict = Dict[Literal['key', 'secret', 'server'], str] + + AuthDict = Union[SimpleAuthDict, ServerAuthDict] LegacyAuthDict = Dict[Literal['default'], AuthDict] @@ -55,6 +93,12 @@ AnyJsonData = Union[Dict[str, 'AnyJsonData'], List['AnyJsonData'], str, bool, int, float, None] + +# TODO: Available in Python 3.8 (i.e., when we drop Python 3.7 support) +# class KeyValueDict(TypedDict): +# Key: str +# Value: Any + KeyValueDict = Dict[Literal['Key', 'Value'], Any] KeyValueDictList = List[KeyValueDict] @@ -81,6 +125,18 @@ # plus the intelligent tiering. Most of the others have a latency issue or are otherwise # fragile. In practice, we just want to not overly warn about normal kinds of storage. +# Commonly used storage classes +STANDARD = 'STANDARD' +REDUCED_REDUNDANCY = 'REDUCED_REDUNDANCY' +STANDARD_IA = 'STANDARD_IA' +ONEZONE_IA = 'ONEZONE_IA' +INTELLIGENT_TIERING = 'INTELLIGENT_TIERING' +GLACIER = 'GLACIER' +DEEP_ARCHIVE = 'DEEP_ARCHIVE' +OUTPOSTS = 'OUTPOSTS' +GLACIER_IR = 'GLACIER_IR' + + ALL_S3_STORAGE_CLASSES = [ 'STANDARD', 'REDUCED_REDUNDANCY', 'STANDARD_IA', 'ONEZONE_IA', 'INTELLIGENT_TIERING', 'GLACIER', 'DEEP_ARCHIVE', 'OUTPOSTS', 'GLACIER_IR', @@ -117,6 +173,16 @@ ] +# TODO: Available in Python 3.8 (i.e., when we drop Python 3.7 support) +# class S3ObjectNameSpec(TypedDict): +# Bucket: str +# Key: str +# VersionId: Optional[str] + +S3ObjectNameDict = Dict[Literal['Bucket', 'Key', 'VersionId'], Optional[str]] +S3ObjectNameSpec = Union[str, S3ObjectNameDict] + + # This constant is used in our Lifecycle management system to automatically transition objects ENCODED_LIFECYCLE_TAG_KEY = 'Lifecycle' diff --git a/dcicutils/ff_mocks.py b/dcicutils/ff_mocks.py index 76292372e..c1ee66172 100644 --- a/dcicutils/ff_mocks.py +++ b/dcicutils/ff_mocks.py @@ -135,7 +135,8 @@ def mocked_s3utils(environments=None, require_sse=False, other_access_key_names= def write_config(config_name, record): record_string = json.dumps(record) - s3_client.s3_files.files[f"{LEGACY_GLOBAL_ENV_BUCKET}/{config_name}"] = bytes(record_string.encode('utf-8')) + s3_client.s3_files.set_file_content_for_testing(f"{LEGACY_GLOBAL_ENV_BUCKET}/{config_name}", + record_string.encode('utf-8')) ecosystem_file = "main.ecosystem" for environment in environments: @@ -200,7 +201,7 @@ def mocked_s3utils_with_sse(beanstalks=None, environments=None, require_sse=True s3 = mock_boto3.client('s3') assert isinstance(s3, MockBotoS3Client) for filename, string in (files or {}).items(): - s3.s3_files.files[filename] = string.encode('utf-8') + s3.s3_files.set_file_content_for_testing(filename, string.encode('utf-8')) yield mock_boto3 diff --git a/dcicutils/glacier_utils.py b/dcicutils/glacier_utils.py index 53f51582e..f0f18e461 100644 --- a/dcicutils/glacier_utils.py +++ b/dcicutils/glacier_utils.py @@ -1,13 +1,15 @@ import boto3 -from typing import Union, List, Tuple + from concurrent.futures import ThreadPoolExecutor from tqdm import tqdm +from typing import Union, List, Tuple, Optional from .common import ( - S3_GLACIER_CLASSES, S3StorageClass, MAX_MULTIPART_CHUNKS, MAX_STANDARD_COPY_SIZE, + S3_GLACIER_CLASSES, S3StorageClass, STANDARD, MAX_MULTIPART_CHUNKS, MAX_STANDARD_COPY_SIZE, ENCODED_LIFECYCLE_TAG_KEY ) from .command_utils import require_confirmation -from .misc_utils import PRINT +from .lang_utils import n_of +from .misc_utils import PRINT, get_error_message from .ff_utils import get_metadata, search_metadata, get_health_page, patch_metadata from .creds_utils import CGAPKeyManager @@ -160,23 +162,27 @@ def restore_s3_from_glacier(self, bucket: str, key: str, days: int = 7, args['VersionId'] = version_id response = self.s3.restore_object(**args) PRINT(f'Object {bucket}/{key} restored from Glacier storage class and will be available in S3' - f' for {days} days after restore has been processed (24 hours)') + f' for {n_of(days, "day")} after restore has been processed (24 hours)') return response except Exception as e: - PRINT(f'Error restoring object {key} from Glacier storage class: {str(e)}') + PRINT(f'Error restoring object {key} from Glacier storage class: {get_error_message(e)}') return None - def is_restore_finished(self, bucket: str, key: str) -> bool: + def is_restore_finished(self, bucket: str, key: str, version_id: Optional[str] = None) -> bool: """ Heads the object to see if it has been restored - note that from the POV of the API, the object is still in Glacier, but it has been restored to its original location and can be downloaded immediately :param bucket: bucket of original file location :param key: key of original file location + :param version_id: (optional) a VersionId string for the file :return: boolean whether the restore was successful yet """ try: # extract temporary location by heading object - response = self.s3.head_object(Bucket=bucket, Key=key) + maybe_version_id = {} + if version_id: + maybe_version_id['VersionId'] = version_id + response = self.s3.head_object(Bucket=bucket, Key=key, **maybe_version_id) restore = response.get('Restore') if restore is None: PRINT(f'Object {bucket}/{key} is not currently being restored from Glacier') @@ -186,7 +192,7 @@ def is_restore_finished(self, bucket: str, key: str) -> bool: return False return True except Exception as e: - PRINT(f'Error checking restore status of object {bucket}/{key} in S3: {str(e)}') + PRINT(f'Error checking restore status of object {bucket}/{key} in S3: {get_error_message(e)}') return False def patch_file_lifecycle_status(self, atid: str, status: str = 'uploaded', @@ -227,7 +233,7 @@ def non_glacier_versions_exist(self, bucket: str, key: str) -> bool: return True return False except Exception as e: - PRINT(f'Error checking versions for object {bucket}/key: {str(e)}') + PRINT(f'Error checking versions for object {bucket}/{key}: {get_error_message(e)}') return False def delete_glaciered_object_versions(self, bucket: str, key: str, delete_all_versions: bool = False) -> bool: @@ -236,7 +242,7 @@ def delete_glaciered_object_versions(self, bucket: str, key: str, delete_all_ver :param bucket: bucket location containing key :param key: file name in s3 to delete - :param delete_all_versions: whether to delete all glacier versions or rather than just the most recent one + :param delete_all_versions: whether to delete all glacier versions, rather than just the most recent one :return: True if success or False if failed """ try: @@ -246,16 +252,16 @@ def delete_glaciered_object_versions(self, bucket: str, key: str, delete_all_ver for v in versions: if v.get('StorageClass') in S3_GLACIER_CLASSES: response = self.s3.delete_object(Bucket=bucket, Key=key, VersionId=v.get('VersionId')) - PRINT(f'Object {bucket}/{key} Glacier version {v.get("VersionId")} deleted:\n{response}') + PRINT(f'Object {bucket}/{key} VersionId={v.get("VersionId")!r} deleted:\n{response}') deleted = True if not delete_all_versions: break if not deleted: - PRINT(f'No Glacier version found for object {bucket}/{key}') + PRINT(f"No Glacier version found for object {bucket}/{key}.") return False return True except Exception as e: - PRINT(f'Error deleting Glacier versions of object {bucket}/{key}: {str(e)}') + PRINT(f'Error deleting Glacier versions of object {bucket}/{key}: {get_error_message(e)}') return False @staticmethod @@ -268,8 +274,10 @@ def _format_tags(tags: List[dict]) -> str: """ return '&'.join([f'{tag["Key"]}={tag["Value"]}' for tag in tags]) + ALLOW_PART_UPLOAD_ATTEMPTS = 3 + def _do_multipart_upload(self, bucket: str, key: str, total_size: int, part_size: int = 200, - storage_class: str = 'STANDARD', tags: str = '', + storage_class: str = STANDARD, tags: str = '', version_id: Union[str, None] = None) -> Union[dict, None]: """ Helper function for copy_object_back_to_original_location, not intended to be called directly, will arrange for a multipart copy of large updates @@ -290,62 +298,54 @@ def _do_multipart_upload(self, bucket: str, key: str, total_size: int, part_size if num_parts > MAX_MULTIPART_CHUNKS: raise GlacierRestoreException(f'Must user a part_size larger than {part_size}' f' that will result in fewer than {MAX_MULTIPART_CHUNKS} chunks') - cmu = { - 'Bucket': bucket, 'Key': key, 'StorageClass': storage_class - } + cmu_args = {'Bucket': bucket, 'Key': key, 'StorageClass': storage_class} if tags: - cmu['Tagging'] = tags - mpu = self.s3.create_multipart_upload(**cmu) + cmu_args['Tagging'] = tags + mpu = self.s3.create_multipart_upload(**cmu_args) mpu_upload_id = mpu['UploadId'] except Exception as e: - PRINT(f'Error creating multipart upload for {bucket}/{key} : {str(e)}') + PRINT(f'Error creating multipart upload for {bucket}/{key}: {get_error_message(e)}') return None + + copy_source = {'Bucket': bucket, 'Key': key} + copy_target = {'Bucket': bucket, 'Key': key} + if version_id: + copy_source['VersionId'] = version_id + copy_target['CopySourceVersionId'] = version_id + + shared_part_args = {'UploadId': mpu_upload_id, 'CopySource': copy_source, **copy_target} + parts = [] for i in range(num_parts): + part_number = i + 1 start = i * part_size end = min(start + part_size, total_size) - part = { - 'PartNumber': i + 1 - } - copy_source = {'Bucket': bucket, 'Key': key} - copy_target = { - 'Bucket': bucket, 'Key': key, - } - if version_id: - copy_source['VersionId'] = version_id - copy_target['CopySourceVersionId'] = version_id + part = {'PartNumber': part_number} + source_range = f'bytes={start}-{end-1}' # retry upload a few times - for _ in range(3): + for attempt in range(self.ALLOW_PART_UPLOAD_ATTEMPTS): + PRINT(f"{'Trying' if attempt == 0 else 'Retrying'} upload of part {part_number} ...") try: - response = self.s3.upload_part_copy( - CopySource=copy_source, **copy_target, - PartNumber=i + 1, - CopySourceRange=f'bytes={start}-{end-1}', - UploadId=mpu_upload_id - ) + response = self.s3.upload_part_copy(PartNumber=part_number, CopySourceRange=source_range, + **shared_part_args) break except Exception as e: - PRINT(f'Failed to upload part {i+1}, potentially retrying: {str(e)}') + PRINT(f'Failed to upload {bucket}/{key} PartNumber={part_number}: {get_error_message(e)}') else: - PRINT(f'Fatal error arranging multipart upload of {bucket}/{key},' - f' see previous output') + PRINT(f"Fatal error arranging multipart upload of {bucket}/{key}" + f" after {n_of(self.ALLOW_PART_UPLOAD_ATTEMPTS, 'try')}." + f" For details, see previous output.") return None part['ETag'] = response['CopyPartResult']['ETag'] parts.append(part) # mark upload as completed # exception should be caught by caller - return self.s3.complete_multipart_upload( - Bucket=bucket, - Key=key, - MultipartUpload={ - 'Parts': parts - }, - UploadId=mpu_upload_id - ) + return self.s3.complete_multipart_upload(Bucket=bucket, Key=key, MultipartUpload={'Parts': parts}, + UploadId=mpu_upload_id) - def copy_object_back_to_original_location(self, bucket: str, key: str, storage_class: str = 'STANDARD', + def copy_object_back_to_original_location(self, bucket: str, key: str, storage_class: S3StorageClass = STANDARD, part_size: int = 200, # MB preserve_lifecycle_tag: bool = False, version_id: Union[str, None] = None) -> Union[dict, None]: @@ -371,7 +371,7 @@ def copy_object_back_to_original_location(self, bucket: str, key: str, storage_c else: tags = '' except Exception as e: - PRINT(f'Could not retrieve metadata on file {bucket}/{key} : {str(e)}') + PRINT(f'Could not retrieve metadata on file Bucket={bucket!r}, Key={key!r} : {get_error_message(e)}') return None try: if multipart: @@ -390,10 +390,11 @@ def copy_object_back_to_original_location(self, bucket: str, key: str, storage_c copy_target['Tagging'] = tags response = self.s3.copy_object(CopySource=copy_source, **copy_target) PRINT(f'Response from boto3 copy:\n{response}') - PRINT(f'Object {bucket}/{key} copied back to its original location in S3') + PRINT(f'Object {bucket}/{key} copied back to its original location in S3.') return response except Exception as e: - PRINT(f'Error copying object {bucket}/{key} back to its original location in S3: {str(e)}') + PRINT(f'Error copying object {bucket}/{key}' + f' back to its original location in S3: {get_error_message(e)}') return None def restore_glacier_phase_one_restore(self, atid_list: List[Union[dict, str]], versioning: bool = False, @@ -426,7 +427,7 @@ def restore_glacier_phase_one_restore(self, atid_list: List[Union[dict, str]], v return success, errors def restore_glacier_phase_two_copy(self, atid_list: List[Union[str, dict]], versioning: bool = False, - storage_class: S3StorageClass = 'STANDARD', + storage_class: S3StorageClass = STANDARD, parallel: bool = False, num_threads: int = 4) -> (List[str], List[str]): """ Triggers a copy operation for all restored objects passed in @id list @@ -510,7 +511,7 @@ def restore_glacier_phase_three_patch(self, atid_list: List[Union[str, dict]], self.patch_file_lifecycle_status(atid, status=status) success.append(atid) except Exception as e: - PRINT(f'Error encountered patching @id {atid}, error: {str(e)}') + PRINT(f'Error encountered patching @id {atid}, error: {get_error_message(e)}') errors.append(atid) return success, errors @@ -551,7 +552,7 @@ def restore_glacier_phase_four_cleanup(self, atid_list: List[str], @require_confirmation def restore_all_from_search(self, *, search_query: str, page_limit: int = 50, search_generator: bool = False, restore_length: int = 7, new_status: str = 'uploaded', - storage_class: S3StorageClass = 'STANDARD', versioning: bool = False, + storage_class: S3StorageClass = STANDARD, versioning: bool = False, parallel: bool = False, num_threads: int = 4, delete_all_versions: bool = False, phase: int = 1) -> (List[str], List[str]): """ Overarching method that will take a search query and loop through all files in the diff --git a/dcicutils/misc_utils.py b/dcicutils/misc_utils.py index fd0747d43..660b29439 100644 --- a/dcicutils/misc_utils.py +++ b/dcicutils/misc_utils.py @@ -15,6 +15,7 @@ import re import rfc3986.validators import rfc3986.exceptions +import threading import time import warnings import webtest # importing the library makes it easier to mock testing @@ -1424,6 +1425,9 @@ def __init__(self, initial_value=None): self.value = initial_value +COUNTER_LOCK = threading.Lock() + + def make_counter(start=0, step=1): """ Creates a counter that generates values counting from a given start (default 0) by a given step (default 1). @@ -1431,9 +1435,10 @@ def make_counter(start=0, step=1): storage = StorageCell(start) def counter(): - old_value = storage.value - storage.value += step - return old_value + with COUNTER_LOCK: + old_value = storage.value + storage.value += step + return old_value return counter diff --git a/dcicutils/qa_utils.py b/dcicutils/qa_utils.py index d5601fb03..a06d6f42e 100644 --- a/dcicutils/qa_utils.py +++ b/dcicutils/qa_utils.py @@ -10,6 +10,7 @@ import functools import hashlib import io +import json import logging import os import pytest @@ -24,17 +25,21 @@ from botocore.exceptions import ClientError from collections import defaultdict from json import dumps as json_dumps, loads as json_loads -from typing import Any, Optional, List, DefaultDict, Union, Type, Dict +from typing import Any, DefaultDict, Dict, Iterable, List, Optional, Type, Union from typing_extensions import Literal from unittest import mock +from urllib.parse import parse_qsl from . import misc_utils as misc_utils_module, command_utils as command_utils_module -from .common import S3StorageClass +from .bucket_utils import parse_s3_object_name +from .common import ( + S3StorageClass, S3ObjectNameDict, S3ObjectNameSpec, STANDARD, KeyValuestringDictList, KeyValuestringDict, +) from .env_utils import short_env_name from .exceptions import ExpectedErrorNotSeen, WrongErrorSeen, UnexpectedErrorAfterFix, WrongErrorSeenAfterFix from .glacier_utils import GlacierUtils from .lang_utils import there_are from .misc_utils import ( - PRINT, INPUT, ignored, Retry, remove_prefix, REF_TZ, builtin_print, + PRINT, INPUT, ignorable, ignored, Retry, remove_prefix, REF_TZ, builtin_print, make_counter, environ_bool, exported, override_environ, override_dict, local_attrs, full_class_name, find_associations, get_error_message, remove_suffix, format_in_radix, future_datetime, _mockable_input, # noQA - need this to keep mocking consistent @@ -47,6 +52,13 @@ exported(QA_EXCEPTION_PATTERN, find_uses, confirm_no_uses, VersionChecker, ChangeLogChecker) +def make_unique_token(monotonic=False): # effectively a guid but for things that don't promise specifically a guid + if monotonic: + return format_in_radix(time.time_ns(), radix=36) + else: + return str(uuid.uuid4()).replace('-', '.').lower() + + def show_elapsed_time(start, end): """ Helper method for below that is the default - just prints the elapsed time. """ PRINT('Elapsed: %s' % (end - start)) @@ -412,25 +424,248 @@ def __exit__(self, exc_type, exc_val, exc_tb): file_system.prepare_for_overwrite(self.file) if FILE_SYSTEM_VERBOSE: # pragma: no cover - Debugging option. Doesn't need testing. PRINT(f"Writing {content!r} to {self.file}.") - file_system.files[self.file] = content if isinstance(content, bytes) else content.encode(self.encoding) + file_system.set_file_content_for_testing(self.file, + (content + if isinstance(content, bytes) + else content.encode(self.encoding))) + # file_system.files[self.file] = content if isinstance(content, bytes) else content.encode(self.encoding) + + +class MockAbstractContent: + pass + + +class MockPartableContent(MockAbstractContent): + + @classmethod + def start_cloning_from(cls, content): + if isinstance(content, bytes): + return MockPartableBytes(content_to_copy=content, empty=True) + elif isinstance(content, MockBigContent): + return content.start_cloning() + else: + raise ValueError(f"No method defined for cloning: {content!r}") + + ID_COUNTER = make_counter() + CONTENT_ID_SIZE = {} + + def __init__(self, size, empty=False, content_id=None): + self.size = size + content_id = content_id or str(self.new_counter_id()) + declared_size = self.CONTENT_ID_SIZE.get(content_id) + if declared_size is not None and declared_size != size: + # This is just a consistency check. + raise RuntimeError(f"The MockPartableContent id {content_id} (size={size!r})" + f" is already taken with a different size, {declared_size!r}.") + + self.CONTENT_ID_SIZE[content_id] = size + self._content_id = content_id + self.coverage = [[0, 0]] if empty else [[0, size]] + + @classmethod + def new_counter_id(cls): + return cls.ID_COUNTER() + + def __str__(self): + if self.coverage == [[0, self.size]]: + return f"<{self.__class__.__name__} content_id={self._content_id} size={self.size}>" + else: + return f"<{self.__class__.__name__} content_id={self._content_id} coverage={self.coverage}>" + + def __repr__(self): + if self.coverage == [[0, self.size]]: + return f"{full_class_name(self)}(content_id={self._content_id}, size={self.size})" + else: + return f"<{full_class_name(self)} content_id={self._content_id} coverage={self.coverage}>" + + def __eq__(self, other): + if not isinstance(other, MockBigContent): + return False + return self.coverage == other.coverage and self._content_id == other._content_id + + ETAG_PREFIX = "etag." + BYTES_PATTERN_STRING = f"bytes=([0-9]+)-(-?[0-9]+)" + BYTES_PATTERN = re.compile(BYTES_PATTERN_STRING) + + @property + def etag(self): + return f"{self.ETAG_PREFIX}{self._content_id}" + + def part_etag(self, range_spec): + match = self.BYTES_PATTERN.match(range_spec) + if not match: + raise ValueError(f"{range_spec} does not match pattern {self.BYTES_PATTERN_STRING}") + lower_inclusive, upper_inclusive = match.groups() + lower_inclusive = int(lower_inclusive) - 1 + upper_exclusive = int(upper_inclusive) + result = f"{self.ETAG_PREFIX}{self._content_id}.{lower_inclusive}.{upper_exclusive}" + if FILE_SYSTEM_VERBOSE: # pragma: no cover - Debugging option. Doesn't need testing. + print(f"Issuing part_etag {result} for {self} range_spec {range_spec}") + return result + + @classmethod + def part_etag_byte_range(cls, spec: str) -> [int, int]: + lower_inclusive, upper_exclusive = spec.split('.')[2:] + return [int(lower_inclusive), int(upper_exclusive)] + + @classmethod + def part_etag_parent_id(cls, spec: str) -> str: + return spec.split('.')[1] + + @classmethod + def validate_parts_complete(cls, parts_etags: List[str]): + assert parts_etags, f"There must be at least one part: {parts_etags}" + parent_ids = list(map(cls.part_etag_parent_id, parts_etags)) + parts_parent_id = parent_ids[0] + parts_parent_size = cls.CONTENT_ID_SIZE[parts_parent_id] + assert parts_parent_size is not None, f"Bookkeeping error. No source size for content_id {parts_parent_id}." + assert all(parts_parent_id == parent_id for parent_id in parent_ids[1:]), ( + f"Some parts came from unrelated uploads: {parts_etags}") + coverage = simplify_coverage(list(map(cls.part_etag_byte_range, parts_etags))) or [[0, 0]] + assert len(coverage) == 1, f"Parts did not resolve: {coverage}" + [[lo, hi]] = coverage + assert lo == 0, "Parts do not start from 0." + assert parts_parent_size == hi, f"Coverage is {coverage} but expected size was {parts_parent_size}" + + @classmethod + def part_etag_range(cls, part_etag): + start, end = part_etag.split('.')[2:] + return [start, end] + + def start_partial_copy(self): + return self.__class__(size=self.size, empty=True, content_id=self._content_id) + + def copy_portion(self, start, end, target): + if not isinstance(target, MockBigContent): # or self._content_id != target._content_id: + raise Exception(f"You cannot copy part of {self} into {target}.") + target.coverage = add_coverage(coverage=target.coverage, start=start, end=end) + + def copied_content(self): + raise NotImplementedError("Method copied_content must be customized in subclasses of MockPartableContent.") + + +class MockPartableBytes(MockPartableContent): + + def __init__(self, content_to_copy, empty=False): + size = len(content_to_copy) + super().__init__(size=size, empty=empty) + self.byte_string = content_to_copy + + def copied_content(self): + return self.byte_string + + +class MockBigContent(MockPartableContent): + + def start_cloning(self): + return MockBigContent(size=self.size, empty=True, content_id=self._content_id) + + def copied_content(self): + return self + + +IntPair = List[int] # it's a list of ints, but for now we know no way to type hint a list of exactly 2 integers + + +# I'm not sure we need this function. We're doing this a different way. -kmp 15-May-2023 +def add_coverage(coverage: List[IntPair], start: int, end: int): + return simplify_coverage(coverage + [[start, end]]) + + +def simplify_coverage(coverage: Iterable[IntPair]) -> List[IntPair]: + current = [0, 0] + result = [] + for item in sorted(coverage): + [s, e] = item + if e < s: + raise ValueError(f"Consistency problem: {item} is out of order.") + elif s > current[1]: + if current[0] != current[1]: + result.append(current) + current = [s, e] + elif e > current[1]: + current[1] = e + if current[0] != current[1]: + result.append(current) + return result + + +def is_abstract_content(content): + return isinstance(content, MockAbstractContent) class MockFileSystem: """Extremely low-tech mock file system.""" def __init__(self, files=None, default_encoding='utf-8', auto_mirror_files_for_read=False, do_not_auto_mirror=()): + files = files or {} self.default_encoding = default_encoding # Setting this dynamically will make things inconsistent self._auto_mirror_files_for_read = auto_mirror_files_for_read self._do_not_auto_mirror = set(do_not_auto_mirror or []) - self.files = {filename: content.encode(default_encoding) for filename, content in (files or {}).items()} - for filename in self.files: + # self.files = {filename: content.encode(default_encoding) for filename, content in (files or {}).items()} + for filename in files: self._do_not_mirror(filename) + self.files = {} + for filename, content in files.items(): + self.set_file_content_for_testing(filename, content.encode(default_encoding)) IO_OPEN = staticmethod(io.open) OS_PATH_EXISTS = staticmethod(os.path.exists) OS_REMOVE = staticmethod(os.remove) + def assert_file_count(self, n): + assert len(self.files) == n + + def _set_file_content_for_testing(self, filename, content): + # This is subprimitive to set_file_content_for_testing or others that need to do auxiliary actions as well. + self.files[filename] = content + + def set_file_content_for_testing(self, filename, content): + # We might at some future point want to consider whether callers of this function should + # see auto-mirroring or versioning. + self._set_file_content_for_testing(filename, content) + + def restore_file_content_for_testing(self, filename, content): + # This interface is for things like undelete and restore that are restoring prior state and don't want to + # get caught up in mirroring or versioning. In the future, this might need information about versioning + # that needs to be threaded together. -kmp 26-Apr-2023 + self._set_file_content_for_testing(filename, content) + + def initialize_file_entry_testing(self, filename): + # This interface is for things like undelete and restore that are restoring prior state and don't want to + # get caught up in mirroring or versioning. In the future, this might need information about versioning + # that needs to be threaded together. -kmp 26-Apr-2023 + self._set_file_content_for_testing(filename, None) + + def remove_file_entry_for_testing(self, filename): + # This interface is for things like undelete and restore that are restoring prior state and don't want to + # get caught up in mirroring or versioning. In the future, this might need information about versioning + # that needs to be threaded together. -kmp 26-Apr-2023 + del self.files[filename] + + def get_file_content_for_testing(self, filename, required=False): + content = self.files.get(filename) + if required and content is None: + raise Exception(f"Mocked file not found: {filename}") + elif is_abstract_content(content): + raise Exception(f"Mock for file {filename} cannot be opened for specific content: {content}") + return content + + def assert_file_content(self, filename, expected_content): + assert filename in self.files, f"Mock file {filename} not found in {self}." + actual_content = self.files[filename] + assert actual_content == expected_content, ( + f"Mock file {filename} does not have the expected content." + f" Actual={actual_content} Expected={expected_content}" + ) + + def assert_file_system_state(self, expected_file_dictionary): + actual_files = self.files + assert self.files == expected_file_dictionary, ( + f"Mock file system not in expected state. Actual={actual_files} Expected={expected_file_dictionary}" + ) + def _do_not_mirror(self, file): if self._auto_mirror_files_for_read: self._do_not_auto_mirror.add(file) @@ -440,9 +675,10 @@ def _maybe_auto_mirror_file(self, file): if file not in self._do_not_auto_mirror: if (self.OS_PATH_EXISTS(file) # file might be in files if someone has been manipulating the file structure directly - and file not in self.files): + and not self._file_is_mocked(file)): # file not in self.files with open(file, 'rb') as fp: - self.files[file] = fp.read() + self.set_file_content_for_testing(file, fp.read()) + # self.files[file] = fp.read() self._do_not_mirror(file) def prepare_for_overwrite(self, file): @@ -451,6 +687,13 @@ def prepare_for_overwrite(self, file): def exists(self, file): self._maybe_auto_mirror_file(file) + # return self.files.get(file) is not None # don't want an empty file to pass for missing + return self._file_is_mocked(file) + + def _file_is_mocked(self, file): + """ + This checks the state of the file now, independent of auto-mirroring. + """ return self.files.get(file) is not None # don't want an empty file to pass for missing def remove(self, file): @@ -458,6 +701,12 @@ def remove(self, file): if self.files.pop(file, None) is None: raise FileNotFoundError("No such file or directory: %s" % file) + def all_filenames_for_testing(self): + return sorted(self.files.keys()) + + def all_filenames_with_content_for_testing(self): + return self.files.items() + def open(self, file, mode='r', encoding=None): if FILE_SYSTEM_VERBOSE: # pragma: no cover - Debugging option. Doesn't need testing. PRINT("Opening %r in mode %r." % (file, mode)) @@ -474,7 +723,8 @@ def open(self, file, mode='r', encoding=None): def _open_for_read(self, file, binary=False, encoding=None): self._maybe_auto_mirror_file(file) - content = self.files.get(file) + content = self.get_file_content_for_testing(file) + # content = self.files.get(file) if content is None: raise FileNotFoundError("No such file or directory: %s" % file) if FILE_SYSTEM_VERBOSE: # pragma: no cover - Debugging option. Doesn't need testing. @@ -677,7 +927,7 @@ def mock_action_handler(self, wrapped_action, *args, **kwargs): texts = remove_suffix('\n', text).split('\n') last_text = texts[-1] result = wrapped_action(text, **kwargs) # noQA - This call to print is low-level implementation - # This only captures non-file output output. + # This only captures non-file output. file = kwargs.get('file') if file is None: file = sys.stdout @@ -810,7 +1060,7 @@ def __init__(self, *, region_name=None, boto3=None, **kwargs): self._aws_secret_access_key = kwargs.get("aws_secret_access_key") self._aws_region = region_name - # These is specific for testing. + # This is specific to testing. self._aws_credentials_dir = None # FYI: Some things to note about how boto3 (and probably any AWS client) reads AWS credentials/region. @@ -872,7 +1122,7 @@ def put_credentials_for_testing(self, self._aws_secret_access_key = aws_secret_access_key self._aws_region = region_name - # These is specific for testing. + # This is specific to testing. self._aws_credentials_dir = aws_credentials_dir @staticmethod @@ -2021,9 +2271,11 @@ def is_active(self, now=None): def hurry_restoration(self): self._available_after = datetime.datetime.now() + PRINT(f"The restoration availability of {self} has been hurried.") def hurry_restoration_expiry(self): self._available_until = datetime.datetime.now() + PRINT(f"The restoration expiry of {self} has been hurried.") class MockObjectBasicAttributeBlock: @@ -2047,18 +2299,15 @@ def __str__(self): @classmethod def _generate_version_id(cls): - if cls.MONTONIC_VERSIONS: - return format_in_radix(time.time_ns(), radix=36) - else: - return str(uuid.uuid4()).replace('-', '.').lower() + return make_unique_token(monotonic=cls.MONTONIC_VERSIONS) @property def storage_class(self) -> S3StorageClass: raise NotImplementedError(f"The method 'storage_class' is expected to be implemented" f" in subclasses of MockObjectBasicAttributeBlock: {self}") - def initialize_storage_class(self, value: S3StorageClass): - raise NotImplementedError(f"The method 'initialize_storage_class' is expected to be implemented" + def set_storage_class(self, value: S3StorageClass): + raise NotImplementedError(f"The method 'set_storage_class' is expected to be implemented" f" in subclasses of MockObjectBasicAttributeBlock: {self}") @property @@ -2083,7 +2332,7 @@ class MockObjectDeleteMarker(MockObjectBasicAttributeBlock): def storage_class(self) -> S3StorageClass: raise Exception(f"Attempt to find storage class for mock-deleted S3 filename {self.filename}") - def initialize_storage_class(self, value: S3StorageClass): + def set_storage_class(self, value: S3StorageClass): raise Exception(f"Attempt to initialize storage class for mock-deleted S3 filename {self.filename}") @property @@ -2126,13 +2375,19 @@ def storage_class(self): restoration = self.restoration return restoration.storage_class if restoration else self._storage_class - def initialize_storage_class(self, value): - if self.restoration: - # It's ambiguous what is intended, but also this interface is really intended only for initialization - # and should not be used in the middle of a mock operation. The storage class is not dynamically mutable. - # You need to use the mock operations to make new versions with the right storage class after that. - raise Exception("Tried to set storage class in an attribute block" - " while a temporary restoration is ongoing.") + DISCARD_RESTORATIONS_ON_STORAGE_OVERWRITE = True + + def set_storage_class(self, value): + restoration = self.restoration + if restoration and self._storage_class != value: + if self.DISCARD_RESTORATIONS_ON_STORAGE_OVERWRITE: + PRINT(f"Storage class changed in attribute block {self} while a temporary restoration is ongoing.") + PRINT(f"The temporary block {restoration} will be discarded.") + self.restoration.hurry_restoration() + self.restoration.hurry_restoration_expiry() + else: + raise Exception("Tried to set storage class in an attribute block" + " while a temporary restoration is ongoing.") self._storage_class = value _RESTORATION_LOCK = threading.Lock() @@ -2176,6 +2431,126 @@ def restore_temporarily(self, delay_seconds: Union[int, float], duration_days: U storage_class=storage_class) +class MockMultiPartUpload: + + STATE_LOCK = threading.Lock() + + ALL_UPLOADS = {} + + def __init__(self, *, s3, bucket: str, key: str, storage_class: S3StorageClass = STANDARD, + version_id: Optional[str] = None, tagging: Optional[KeyValuestringDictList] = None): + self.initiated = datetime.datetime.now() + self.s3 = s3 + self.upload_id = upload_id = make_unique_token(monotonic=True) + self.parts = [] + # .source is set and .target is reset later (since target might acquire a VersionId) + self.source: Optional[S3ObjectNameDict] = None # initialized on first part upload + self.target: S3ObjectNameDict = { # re-initialized on first part upload + 'Bucket': bucket, + 'Key': key, + 'VersionId': version_id # the version_id isn't actually known until first part upload + } + self.source_attribute_block: Optional[MockObjectAttributeBlock] = None # initialized on first part upload + self._data: Optional[MockPartableContent] = None # initialized on first part upload + self.storage_class = storage_class + self.tagging = tagging or [] + self.action: Optional[callable] = None + self.ALL_UPLOADS[upload_id]: MockMultiPartUpload = self + self.is_complete = False + + @property + def data(self): + data = self._data + if data is None: + raise ValueError("No upload attempt has yet been made.") + data: MockPartableContent + return data + + def initialize_source_attribute_block(self, attribute_block): + if self.source_attribute_block is not None and self.source_attribute_block != attribute_block: + raise RuntimeError(f"You're already copying to a different location." + f" Previously: {self.source_attribute_block} Attempted: {attribute_block}") + self.source_attribute_block = attribute_block + content = attribute_block.content or self.s3.s3_files.files.get(attribute_block.filename) + if self._data is None: + self._data = MockPartableContent.start_cloning_from(content) + + @classmethod + def set_action(cls, upload_id, action: callable): + """ + As an example, in a test, doing + def testing_hook(*, part_number, **_): + if part_number > 1: + raise Exception("Simulated error on any part other than the first one.") + MockMultipartAction.set_action(upload_id, testing_hook) + allows the testing_hook to run on each part_upload attempt, so that in this case it raises an + error on any part other than part 1. + + Keyword args that all action functions will receive are: + * source: an S3ObjectNameSpec + * target: an S3ObjectNameSpec + * part_number: a 1-based int index + * lower: a 0-based lower-inclusive index + * upper: a 0-based upper-exclusive index + The action function is called from check_part_consistency, + which is subprimitive to MockBotoS3Client.upload_part_copy. + """ + upload = cls.lookup(upload_id) + upload.action = action + + @classmethod + def lookup(cls, upload_id): + found: Optional[MockMultiPartUpload] = cls.ALL_UPLOADS.get(upload_id) + if found is None: + ValueError(f"Unknown UploadId: {upload_id}") + return found + + def part_etag(self, range_spec): + return self.data.part_etag(range_spec) + + def check_part_consistency(self, source: S3ObjectNameDict, target: S3ObjectNameDict, + part_number: int, range_spec: str): + ignored(target) + if self.source is None: + self.source = source # Initialize on first use + else: + assert source == self.source, ( + f"A MultiPartUpload must always transfer from the same source. First={self.source} Later={source}") + assert target['Bucket'] == self.target['Bucket'] and target['Key'] == self.target['Key'], ( + f"A MultiPartUpload must always transfer from the same source. Promised={self.target} Actual={target}") + part_etag = self.part_etag(range_spec) + lower_inclusive, upper_exclusive = self.data.part_etag_byte_range(part_etag) + if self.action is not None: + self.action(source=source, target=target, part_number=part_number, + lower=lower_inclusive, upper=upper_exclusive) + return part_etag + + def check_upload_complete(self, target: S3ObjectNameDict, etags: List[str]): + assert target == self.target, f"Filename when completing upload didn't match: {target}" + self.data.validate_parts_complete(parts_etags=etags) + self.is_complete = True + + def move_content(self, s3) -> S3ObjectNameDict: + assert isinstance(s3, MockBotoS3Client) + assert self.is_complete, ( + f"Upload {self.upload_id} tried to .move_content() before calling .check_upload_complete().") + source_s3_filename = f"{self.source['Bucket']}/{self.source['Key']}" + target_s3_filename = f"{self.target['Bucket']}/{self.target['Key']}" + if not self.target.get('VersionId'): + # If a VersionId was supplied, we are copying in-place only to change the storage type, so there's + # no actual change to mock content that's needed. We only change it if we're generating a new version. + s3.maybe_archive_current_version(bucket=self.target['Bucket'], key=self.target['Key'], + replacement_class=MockObjectAttributeBlock) + s3.s3_files.set_file_content_for_testing(target_s3_filename, self.data.copied_content()) + attribute_block = s3._object_attribute_block( # noQA - access to protected member, but this is easiest way + source_s3_filename, version_id=self.source.get('VersionId')) + assert isinstance(attribute_block, MockObjectAttributeBlock), "The referenced file is deleted." + attribute_block.set_storage_class(self.storage_class) + attribute_block.set_tagset(self.tagging) + # raise NotImplementedError(f"Just need to copy {self.data} for into {s3} at {self.target}.") + return self.target + + @MockBoto3.register_client(kind='s3') class MockBotoS3Client(MockBoto3Client): """ @@ -2215,8 +2590,11 @@ def __init__(self, *, self.other_required_arguments = other_required_arguments self.storage_class: S3StorageClass = storage_class or self.DEFAULT_STORAGE_CLASS - def check_for_kwargs_required_by_mock(self, operation, Bucket, Key, **kwargs): - ignored(Bucket, Key) + def check_for_kwargs_required_by_mock(self, operation, Bucket, Key, ExtraArgs=None, VersionId=None, **kwargs): + ignored(Bucket, Key, ExtraArgs, VersionId) + # Some SS3-related required args we're looking for might be in ExtraArgs, but this mock is not presently + # complex enough to decode that. We could add such checks here later, using a more sophisticated check + # than a simple "!=" test, but for now this test is conservative. -kmp 11-May-2023 if kwargs != self.other_required_arguments: raise MockKeysNotImplemented(operation, self.other_required_arguments.keys()) @@ -2224,29 +2602,42 @@ def create_object_for_testing(self, object_content: str, *, Bucket: str, Key: st assert isinstance(object_content, str) self.upload_fileobj(Fileobj=io.BytesIO(object_content.encode('utf-8')), Bucket=Bucket, Key=Key) - def upload_fileobj(self, Fileobj, Bucket, Key, **kwargs): # noqa - Uppercase argument names are chosen by AWS - self.check_for_kwargs_required_by_mock("upload_fileobj", Bucket=Bucket, Key=Key, **kwargs) + def upload_fileobj(self, Fileobj, Bucket, Key, *, ExtraArgs=None, **kwargs): # noQA - AWS CamelCase args + self.check_for_kwargs_required_by_mock("upload_fileobj", Bucket=Bucket, Key=Key, ExtraArgs=ExtraArgs, **kwargs) + # See ALLOWED_UPLOAD_ARGS + # https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/customizations/s3.html + ExtraArgs = ExtraArgs or {} + storage_class = ExtraArgs.get('StorageClass', self.DEFAULT_STORAGE_CLASS) data = Fileobj.read() PRINT("Uploading %s (%s bytes) to bucket %s key %s" % (Fileobj, len(data), Bucket, Key)) - with self.s3_files.open(os.path.join(Bucket, Key), 'wb') as fp: + s3_filename = f"{Bucket}/{Key}" + with self.s3_files.open(s3_filename, 'wb') as fp: fp.write(data) + if storage_class != self.DEFAULT_STORAGE_CLASS: + attribute_block = self._object_attribute_block(filename=s3_filename) + attribute_block.set_storage_class(storage_class) - def upload_file(self, Filename, Bucket, Key, **kwargs): # noqa - Uppercase argument names are chosen by AWS - self.check_for_kwargs_required_by_mock("upload_file", Bucket=Bucket, Key=Key, **kwargs) + def upload_file(self, Filename, Bucket, Key, *, ExtraArgs=None, **kwargs): # noQA - AWS CamelCase args + self.check_for_kwargs_required_by_mock("upload_file", Bucket=Bucket, Key=Key, ExtraArgs=ExtraArgs, **kwargs) with io.open(Filename, 'rb') as fp: - self.upload_fileobj(Fileobj=fp, Bucket=Bucket, Key=Key) - - def download_fileobj(self, Bucket, Key, Fileobj, **kwargs): # noqa - Uppercase argument names are chosen by AWS - self.check_for_kwargs_required_by_mock("download_fileobj", Bucket=Bucket, Key=Key, **kwargs) + self.upload_fileobj(Fileobj=fp, Bucket=Bucket, Key=Key, ExtraArgs=ExtraArgs) + + def download_fileobj(self, Bucket, Key, Fileobj, *, ExtraArgs=None, **kwargs): # noQA - AWS CamelCase args + self.check_for_kwargs_required_by_mock("download_fileobj", Bucket=Bucket, Key=Key, ExtraArgs=ExtraArgs, + **kwargs) + ExtraArgs = ExtraArgs or {} + version_id = ExtraArgs.get('VersionId') + if version_id: + raise ValueError(f"VersionId is not supported by this mock: {version_id}") with self.s3_files.open(os.path.join(Bucket, Key), 'rb') as fp: data = fp.read() PRINT("Downloading bucket %s key %s (%s bytes) to %s" % (Bucket, Key, len(data), Fileobj)) Fileobj.write(data) - def download_file(self, Bucket, Key, Filename, **kwargs): # noqa - Uppercase argument names are chosen by AWS - self.check_for_kwargs_required_by_mock("download_file", Bucket=Bucket, Key=Key, **kwargs) + def download_file(self, Bucket, Key, Filename, *, ExtraArgs=None, **kwargs): # noQA - AWS CamelCase args + self.check_for_kwargs_required_by_mock("download_file", Bucket=Bucket, Key=Key, ExtraArgs=ExtraArgs, **kwargs) with io.open(Filename, 'wb') as fp: self.download_fileobj(Bucket=Bucket, Key=Key, Fileobj=fp) @@ -2268,7 +2659,7 @@ def get_object(self, Bucket, Key, **kwargs): # noqa - Uppercase argument names "binary/octet-stream": [".fo"], } - def put_object(self, *, Bucket, Key, Body, ContentType=None, **kwargs): # noqa - Uppercase argument names are chosen by AWS + def put_object(self, *, Bucket, Key, Body, ContentType=None, **kwargs): # noQA - AWS CamelCase args # TODO: This is not mocking other args like ACL that we might use ACL='public-read' or ACL='private' # grep for ACL= in tibanna, tibanna_ff, or dcicutils for examples of these values. # TODO: Shouldn't this be checking for required arguments (e.g., for SSE)? -kmp 9-May-2022 @@ -2280,9 +2671,11 @@ def put_object(self, *, Bucket, Key, Body, ContentType=None, **kwargs): # noqa assert not kwargs, "put_object mock doesn't support %s." % kwargs if isinstance(Body, str): Body = Body.encode('utf-8') # we just assume utf-8, which AWS seems to as well - self.s3_files.files[Bucket + "/" + Key] = Body + self.s3_files.set_file_content_for_testing(Bucket + "/" + Key, Body) + # self.s3_files.files[Bucket + "/" + Key] = Body + etag = self._content_etag(Body) return { - 'ETag': self._content_etag(Body) + 'ETag': etag } @staticmethod @@ -2291,26 +2684,50 @@ def _content_etag(content): # doublequotes, so an example from # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/list_object_versions.html # shows: 'ETag': '"6805f2cfc46c0f04559748bb039d69ae"', - return f'"{hashlib.md5(content).hexdigest()}"' + if isinstance(content, bytes): + res = f'"{hashlib.md5(content).hexdigest()}"' + elif isinstance(content, MockPartableContent): + res = content.etag + else: + raise ValueError(f"Cannot compute etag for {content!r}.") + # print(f"content={content} ETag={res}") + return res + + @staticmethod + def _content_len(content): + # For reasons known only to AWS, the ETag, though described as an MD5 hash, begins and ends with + # doublequotes, so an example from + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/list_object_versions.html + # shows: 'ETag': '"6805f2cfc46c0f04559748bb039d69ae"', + if isinstance(content, bytes): + res = len(content) + elif isinstance(content, MockPartableContent): + res = content.size + else: + raise ValueError(f"Cannot compute length of {content!r}.") + # print(f"content={content} ETag={res}") + return res def Bucket(self, name): # noQA - AWS function naming style return MockBotoS3Bucket(s3=self, name=name) - def head_object(self, Bucket, Key, **kwargs): # noQA - AWS argument naming style - self.check_for_kwargs_required_by_mock("head_object", Bucket=Bucket, Key=Key, **kwargs) + def head_object(self, Bucket, Key, VersionId=None, **kwargs): # noQA - AWS argument naming style + self.check_for_kwargs_required_by_mock("head_object", Bucket=Bucket, Key=Key, VersionId=VersionId, **kwargs) pseudo_filename = os.path.join(Bucket, Key) if self.s3_files.exists(pseudo_filename): - content = self.s3_files.files[pseudo_filename] - attribute_block = self._object_attribute_block(filename=pseudo_filename) + content = self.s3_files.get_file_content_for_testing(pseudo_filename, required=True) + # content = self.s3_files.files[pseudo_filename] + attribute_block = self._object_attribute_block(filename=pseudo_filename, version_id=VersionId) assert isinstance(attribute_block, MockObjectAttributeBlock) # if file exists, should be normal block result = { 'Bucket': Bucket, 'Key': Key, 'ETag': self._content_etag(content), - 'ContentLength': len(content), + 'ContentLength': self._content_len(content), 'StorageClass': attribute_block.storage_class, # self._object_storage_class(filename=pseudo_filename) + 'VersionId': attribute_block.version_id or '', # it should never be null, but still be careful of type # Numerous others, but this is enough to make the dictionary non-empty and to satisfy some of our tools } restoration = attribute_block.restoration @@ -2328,18 +2745,19 @@ def head_object(self, Bucket, Key, **kwargs): # noQA - AWS argument naming styl # since it might be a 404 (not found) or a 403 (permissions), depending on various details. # For now, just fail in any way since maybe our code doesn't care. raise Exception(f"Mock File Not Found: {pseudo_filename}." - f" Existing files: {list(self.s3_files.files.keys())}") + f" Existing files: {self.s3_files.all_filenames_for_testing()}") def head_bucket(self, Bucket): # noQA - AWS argument naming style bucket_prefix = Bucket + "/" - for filename, content in self.s3_files.files.items(): + # for filename, content in self.s3_files.files.items(): + for filename, content in self.s3_files.all_filenames_with_content_for_testing(): if filename.startswith(bucket_prefix): # Returns other things probably, but this will do to start for our mocking. return {"ResponseMetadata": {"HTTPStatusCode": 200}} raise ClientError(operation_name='HeadBucket', error_response={ # noQA - PyCharm wrongly complains about this dictionary "Error": {"Code": "404", "Message": "Not Found"}, - "ResponseMetadata": {"HTTPStatusCode": 404}, + "ResponseMetadata": {"HTTPStatusCode": 404}, # noQA - some fields omitted }) def get_object_tagging(self, Bucket, Key): @@ -2410,7 +2828,7 @@ def _object_all_versions(self, filename): # print(f"NOT AUTOCREATING {filename} for {self} because {self.s3_files.files}") return attribute_blocks - def _object_attribute_block(self, filename) -> MockObjectBasicAttributeBlock: + def _object_attribute_block(self, filename, version_id=None) -> MockObjectBasicAttributeBlock: """ Returns the attribute_block for an S3 object. This contains information like storage class and tagsets. @@ -2424,31 +2842,44 @@ def _object_attribute_block(self, filename) -> MockObjectBasicAttributeBlock: if not all_versions: # This situation and usually we should not be calling this function at this point, # but try to help developer debug what's going on... - if filename in self.s3_files.files: + # if filename in self.s3_files.files: + if filename in self.s3_files.all_filenames_for_testing(): context = f"mock special non-file (bucket?) s3 item: {filename}" else: context = f"mock non-existent S3 file: {filename}" raise ValueError(f"Attempt to obtain object attribute block for {context}.") - return all_versions[-1] + if version_id: + for version in all_versions: + if version.version_id == version_id: + return version + raise ValueError(f"The file {filename} has no version {version_id!r}.") + else: + return all_versions[-1] _ARCHIVE_LOCK = threading.Lock() - def hurry_restoration_for_testing(self, s3_filename, attribute_block=None): + def hurry_restoration_for_testing(self, s3_filename, version_id=None, attribute_block=None): """ This can be used in testing to hurry up the wait for a temporary restore to become available. """ - attribute_block = attribute_block or self._object_attribute_block(s3_filename) + attribute_block = attribute_block or self._object_attribute_block(s3_filename, version_id=version_id) assert isinstance(attribute_block, MockObjectAttributeBlock) attribute_block.hurry_restoration() - def hurry_restoration_expiry_for_testing(self, s3_filename, attribute_block=None): + def hurry_restoration_expiry_for_testing(self, s3_filename, version_id=None, attribute_block=None): """ This can be used in testing to hurry up the wait for a temporary restore to expire. """ - attribute_block = attribute_block or self._object_attribute_block(s3_filename) + attribute_block = attribute_block or self._object_attribute_block(s3_filename, version_id=version_id) assert isinstance(attribute_block, MockObjectAttributeBlock) attribute_block.hurry_restoration_expiry() + def maybe_archive_current_version(self, bucket: str, key: str, + replacement_class: Type[MockObjectBasicAttributeBlock] = MockObjectAttributeBlock, + init: Optional[callable] = None) -> Optional[MockObjectBasicAttributeBlock]: + if self.s3_files.bucket_uses_versioning(bucket): + return self.archive_current_version(f"{bucket}/{key}", replacement_class=replacement_class, init=init) + def archive_current_version(self, filename, replacement_class: Type[MockObjectBasicAttributeBlock] = MockObjectAttributeBlock, init: Optional[callable] = None) -> Optional[MockObjectBasicAttributeBlock]: @@ -2467,6 +2898,13 @@ def archive_current_version(self, filename, init() # caller can supply an init function to be run while still inside lock return new_block + def create_big_file(self, Bucket, Key, size): + s3_filename = f"{Bucket}/{Key}" + self.s3_files.set_file_content_for_testing(s3_filename, MockBigContent(size=size)) + attribute_block = self._object_attribute_block(s3_filename) + assert isinstance(attribute_block, MockObjectAttributeBlock) + return attribute_block + def _check_versions_registered(self, filename, *versions: Optional[MockObjectBasicAttributeBlock]): """ Performs a useful consistency check to identify problems early, but has no functional effect on other code @@ -2499,7 +2937,8 @@ def _prepare_new_attribute_block(self, filename, # about to write new data anyway, after having archived previous data, which means it should # really only be called by archive_current_version after it has done any necessary saving away # of a prior version (depending on whether versioning is enabled). - self.s3_files.files[filename] = None + # self.s3_files.files[filename] = None + self.s3_files.initialize_file_entry_testing(filename) return new_block def _object_tagset(self, filename): @@ -2520,7 +2959,7 @@ def _set_object_tagset(self, filename, tagset): Presently the value is not error-checked. That might change. By special exception, passing value=None will revert the storage class to the default for the given mock, - for which the default default is 'STANDARD'. + for which the default default is STANDARD. Note that this is a property of the boto3 instance (through its .shared_reality) not of the s3 mock itself so that if another client is created by that same boto3 mock, it will see the same storage classes. @@ -2562,13 +3001,13 @@ def _set_object_storage_class_for_testing(self, s3_filename, value: S3StorageCla Presently the value is not error-checked. That might change. By special exception, passing value=None will revert the storage class to the default for the given mock, - for which the default default is 'STANDARD'. + for which the default default is STANDARD. Note that this is a property of the boto3 instance (through its .shared_reality) not of the s3 mock itself so that if another client is created by that same boto3 mock, it will see the same storage classes. """ attribute_block = self._object_attribute_block(s3_filename) - attribute_block.initialize_storage_class(value) + attribute_block.set_storage_class(value) def list_objects(self, Bucket, Prefix=None): # noQA - AWS argument naming style # Ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.list_objects @@ -2576,14 +3015,15 @@ def list_objects(self, Bucket, Prefix=None): # noQA - AWS argument naming style bucket_prefix_length = len(bucket_prefix) search_prefix = bucket_prefix + (Prefix or '') found = [] - for filename, content in self.s3_files.files.items(): + # for filename, content in self.s3_files.files.items(): + for filename, content in self.s3_files.all_filenames_with_content_for_testing(): if filename.startswith(search_prefix): found.append({ 'Key': filename[bucket_prefix_length:], 'ETag': self._content_etag(content), 'LastModified': self._object_last_modified(filename=filename), # "Owner": {"DisplayName": ..., "ID"...}, - "Size": len(content), + "Size": self._content_len(content), "StorageClass": self._object_storage_class(filename=filename), }) return { @@ -2602,7 +3042,7 @@ def list_objects(self, Bucket, Prefix=None): # noQA - AWS argument naming style } def list_objects_v2(self, Bucket): # noQA - AWS argument naming style - # This is different but similar to list_objects. However we don't really care about that. + # This is different but similar to list_objects. However, we don't really care about that. return self.list_objects(Bucket=Bucket) def copy_object(self, CopySource, Bucket, Key, CopySourceVersionId=None, @@ -2622,6 +3062,9 @@ def _copy_object(self, CopySource, Bucket, Key, CopySourceVersionId, StorageClas target_version_id = CopySourceVersionId target_s3_filename = f"{target_bucket}/{target_key}" copy_in_place = False # might be overridden below + PRINT(f"Copying {source_bucket}/{source_key} ({source_version_id})" + f" to {target_bucket}/{target_key}" + f" ({'same' if target_version_id == source_version_id else target_version_id})") if CopySourceVersionId: if CopySourceVersionId != source_version_id or source_bucket != Bucket or source_key != Key: raise AssertionError(f"This mock expected that if CopySourceVersionId is given," @@ -2629,7 +3072,8 @@ def _copy_object(self, CopySource, Bucket, Key, CopySourceVersionId, StorageClas f" CopySource={CopySource!r} Bucket={Bucket!r} Key={Key!r}" f" CopySourceVersionId={CopySourceVersionId!r}") copy_in_place = True - source_data = self.s3_files.files.get(source_s3_filename) + # source_data = self.s3_files.files.get(source_s3_filename) + source_data = self.s3_files.get_file_content_for_testing(source_s3_filename) if source_version_id: source_version = self._get_versioned_object(source_s3_filename, source_version_id) source_data = source_data if source_version.content is None else source_version.content @@ -2650,22 +3094,25 @@ def _copy_object(self, CopySource, Bucket, Key, CopySourceVersionId, StorageClas self.archive_current_version(target_s3_filename) # In this case, we've made a new version and it will be current. # In that case, the files dictionary needs the content copied. - self.s3_files.files[target_s3_filename] = source_data + # self.s3_files.files[target_s3_filename] = source_data + self.s3_files.set_file_content_for_testing(target_s3_filename, source_data) target_attribute_block = self._get_versioned_object(target_s3_filename, target_version_id) new_storage_class = target_storage_class if (copy_in_place and GlacierUtils.transition_involves_glacier_restoration(source_storage_class, target_storage_class)): - new_storage_class = None # For a restoration, the don't update the glacier data. It's restored elsewhere. + new_storage_class = None # For a restoration, we don't update the glacier data. It's restored elsewhere. target_attribute_block.restore_temporarily(delay_seconds=self.RESTORATION_DELAY_SECONDS, - duration_days=1, storage_class=target_storage_class) - PRINT(f"Set up restoration {target_attribute_block.restoration}") + duration_days=self.RESTORATION_DEFAULT_DURATION_DAYS, + storage_class=target_storage_class) + PRINT(f"Copy made was a temporary restoration {target_attribute_block.restoration}") else: - PRINT(f"The copy was not a temporary restoration.") + PRINT(f"Copy made was not a temporary restoration.") if new_storage_class: - target_attribute_block.initialize_storage_class(new_storage_class) + target_attribute_block.set_storage_class(new_storage_class) return {'Success': True} RESTORATION_DELAY_SECONDS = 2 + RESTORATION_DEFAULT_DURATION_DAYS = 1 def delete_object(self, Bucket, Key, VersionId, **unimplemented_keyargs): # Doc: @@ -2738,8 +3185,8 @@ def _delete_versioned_object(self, s3_filename, version_id) -> Dict[str, Any]: "Error": { "Code": "InvalidArgument", "Message": "Invalid version id specified", - "ArgumentName": "versionId", - "ArgumentValue": version_id + "ArgumentName": "versionId", # noQA - PyCharm says not wanted, but not so sure + "ArgumentValue": version_id # noQA - ditto }}) # Delete the old version all_versions = self._object_all_versions(s3_filename) @@ -2755,21 +3202,30 @@ def _delete_versioned_object(self, s3_filename, version_id) -> Dict[str, Any]: new_current_version: MockObjectBasicAttributeBlock = all_versions[-1] if isinstance(new_current_version, MockObjectAttributeBlock): new_content = new_current_version.content - self.s3_files.files[s3_filename] = new_content + # This isn't really creating the file, it's restoring the data cache in the file dictionary. + # The file version, in a sense, already existed. + self.s3_files.restore_file_content_for_testing(s3_filename, new_content) + # self.s3_files.files[s3_filename] = new_content else: # If there are no versions remaining, we've completely deleted the thing. Just remove all record. - del self.s3_files.files[s3_filename] + # del self.s3_files.files[s3_filename] + self.s3_files.remove_file_entry_for_testing(s3_filename) return result def restore_object(self, Bucket, Key, RestoreRequest, VersionId: Optional[str] = None, StorageClass: Optional[S3StorageClass] = None): - duration_days: int = RestoreRequest.get('Days') + duration_days = RestoreRequest.get('Days') + # NOTE: Dcoumentation says "Days element is required for regular restores, and must not be provided + # for select requests." but we don't quite implement that. + assert isinstance(duration_days, int), ( + "This mock doesn't know what to do if 'Days' is not specified in the RestoreRequest." + ) storage_class: S3StorageClass = StorageClass or self.storage_class s3_filename = f"{Bucket}/{Key}" if not self.s3_files.exists(s3_filename): raise Exception(f"S3 file at Bucket={Bucket!r} Key={Key!r} does not exist," f" so cannot be restored from glacier.") - attribute_block = self._object_attribute_block(s3_filename) + attribute_block = self._object_attribute_block(s3_filename, version_id=VersionId) assert isinstance(attribute_block, MockObjectAttributeBlock) # since the file exists, this should be good attribute_block.restore_temporarily(delay_seconds=self.RESTORATION_DELAY_SECONDS, duration_days=duration_days, @@ -2785,13 +3241,16 @@ def list_object_versions(self, Bucket, Prefix='', **unimplemented_keyargs): # n bucket_prefix_length = len(bucket_prefix) search_prefix = bucket_prefix + (Prefix or '') aws_file_system = self.s3_files - for filename, content in aws_file_system.files.items(): + # for filename, content in aws_file_system.files.items(): + for filename, content in aws_file_system.all_filenames_with_content_for_testing(): key = filename[bucket_prefix_length:] if filename.startswith(search_prefix): all_versions = self._object_all_versions(filename) most_recent_version = all_versions[-1] for version in all_versions: if isinstance(version, MockObjectAttributeBlock): + etag = self._content_etag(content if version.content is None else version.content) + content_length = self._content_len(content if version.content is None else version.content) version_descriptions.append({ # 'Owner': { # "DisplayName": "4dn-dcic-technical", @@ -2800,8 +3259,8 @@ def list_object_versions(self, Bucket, Prefix='', **unimplemented_keyargs): # n 'Key': key, 'VersionId': version.version_id, 'IsLatest': version == most_recent_version, - 'ETag': self._content_etag(content), - 'Size': len(content if version.content is None else version.content), + 'ETag': etag, + 'Size': content_length, 'StorageClass': version.storage_class, 'LastModified': version.last_modified, # type datetime.datetime }) @@ -2830,6 +3289,126 @@ def list_object_versions(self, Bucket, Prefix='', **unimplemented_keyargs): # n # 'EncodingType': "url", } + @classmethod + def lookup_upload_id(cls, upload_id) -> MockMultiPartUpload: + return MockMultiPartUpload.lookup(upload_id) + + def create_multipart_upload(self, *, Bucket, Key, StorageClass: S3StorageClass = STANDARD, + Tagging: Optional[str] = None, **unimplemented_keyargs): + assert not unimplemented_keyargs, (f"The mock for list_object_versions needs to be extended." + f" {there_are(unimplemented_keyargs, kind='unimplemented key')}") + # Weird that Tagging here is a string but in other situations it's a {TagSet: [{Key: ..., Value: ...}]} dict + tagging: KeyValuestringDictList = [] + for k, v in parse_qsl(Tagging or ""): + entry: KeyValuestringDict = {'Key': k, 'Value': v} + tagging.append(entry) + upload = MockMultiPartUpload(s3=self, bucket=Bucket, key=Key, storage_class=StorageClass, tagging=tagging) + return { + 'Bucket': Bucket, + 'Key': Key, + 'UploadId': upload.upload_id, + 'ResponseMetadata': self.compute_mock_response_metadata() + } + + def upload_part_copy(self, *, CopySource: S3ObjectNameSpec, PartNumber, CopySourceRange, UploadId, Bucket, Key, + CopySourceVersionId=None, **unimplemented_keyargs): + assert not unimplemented_keyargs, (f"The mock for list_object_versions needs to be extended." + f" {there_are(unimplemented_keyargs, kind='unimplemented key')}") + # It is not at all obvious why PartNumber has to be supplied, since really all that matters + # is CopySourceRange, but it's constrained to be there and have a certain value, so we'll check that. + assert 1 <= PartNumber <= 10000 + upload = self.lookup_upload_id(UploadId) + + if isinstance(CopySource, str): # Tolerate bucket/key or bucket/key?versionId=xxx + CopySource = parse_s3_object_name(CopySource) + + source_bucket = CopySource['Bucket'] + source_key = CopySource['Key'] + source_version_id = CopySource.get('VersionId') + if CopySourceVersionId: + assert source_bucket == Bucket + assert source_key == Key + assert source_version_id == CopySourceVersionId + s3_filename = f"{source_bucket}/{source_key}" + version_id = CopySourceVersionId + attribute_block = self._object_attribute_block(filename=s3_filename, version_id=version_id) + assert isinstance(attribute_block, MockObjectAttributeBlock), f"Not an ordinary S3 file: {s3_filename}" + upload.initialize_source_attribute_block(attribute_block) + source: S3ObjectNameDict = {'Bucket': source_bucket, 'Key': source_key, 'VersionId': source_version_id} + target: S3ObjectNameDict = {'Bucket': Bucket, 'Key': Key, 'VersionId': CopySourceVersionId} + part_etag = upload.check_part_consistency(source=source, target=target, + part_number=PartNumber, range_spec=CopySourceRange) + return {'CopyPartResult': {'ETag': part_etag}} + + def complete_multipart_upload(self, *, Bucket, Key, MultipartUpload, UploadId, **unimplemented_keyargs): + assert not unimplemented_keyargs, (f"The mock for list_object_versions needs to be extended." + f" {there_are(unimplemented_keyargs, kind='unimplemented key')}") + version_id = None # TODO: Need a way to pass this + upload = MockMultiPartUpload.lookup(UploadId) + parts: List[dict] = MultipartUpload['Parts'] # each element a dictionary containing PartNumber and ETag + etags = [part['ETag'] for part in parts] + if FILE_SYSTEM_VERBOSE: # pragma: no cover - Debugging option. Doesn't need testing. + PRINT(f"Attempting to complete multipart upload with etags: {etags}") + upload.check_upload_complete(target={'Bucket': Bucket, 'Key': Key, 'VersionId': version_id}, etags=etags) + spec: S3ObjectNameDict = upload.move_content(s3=self) + return { + 'Bucket': spec['Bucket'], + 'Key': spec['Key'], + 'VersionId': spec['VersionId'] + } + + def list_multipart_uploads(self, Bucket, **unimplemented_keyargs): + assert not unimplemented_keyargs, (f"The mock for list_multipart_uploads needs to be extended." + f" {there_are(unimplemented_keyargs, kind='unimplemented key')}") + upload_id: str + upload: MockMultiPartUpload + return { + 'Bucket': Bucket, + 'Uploads': [ + { + 'UploadId': upload_id, + 'Key': upload.target['Key'], + 'Initiated': upload.initiated, + 'StorageClass': upload.storage_class, + } + for upload_id, upload in MockMultiPartUpload.ALL_UPLOADS.items() + ], + 'ResponseMetadata': self.compute_mock_response_metadata() + } + + def show_object_versions_for_debugging(self, bucket, prefix, context=None, version_names=None): + ignorable(json) # json library is imported, so acknowledge it might get used here if lines were uncommented + versions = self.list_object_versions(Bucket=bucket, Prefix=prefix) + prefix_len = len(prefix) + hrule_width = 80 + if context: + margin = 3 + n = len(context) + 2 * margin + PRINT(f"+{n * '-'}+") + PRINT(f"|{margin * ' '}{context}{margin * ' '}|") + PRINT(f"|{n * ' '}+{(hrule_width - n - 1) * '-'}") + else: + PRINT(f"+{hrule_width * '-'}") + # print("versions = ", json.dumps(versions, indent=2, default=str)) + for version in versions.get('Versions', []): + version_id = version['VersionId'] + extra = [] + version_name = (version_names or {}).get(version_id) + if version_name: + extra.append(version_name) + if version['IsLatest']: + extra.append('LATEST') + PRINT(f"|" + f" {version['Key'].ljust(max(prefix_len, 12))}" + f" {version['StorageClass'].ljust(8)}" + f" {str(version['Size']).rjust(4)}" + f" VersionId={version_id}" + f" ETag={version['ETag']}" + f" {version['LastModified']}" + f" {','.join(extra)}" + ) + PRINT(f"+{hrule_width * '-'}") + class MockBotoS3Bucket: @@ -2841,23 +3420,28 @@ def __init__(self, name, s3=None): def _delete(self, delete_bucket_too=False): prefix = self.name + "/" - files = self.s3.s3_files.files + s3_files: MockAWSFileSystem = self.s3.s3_files + # files = self.s3.s3_files.files to_delete = set() - for pseudo_filename, _ in [files.items()]: + # for pseudo_filename, _ in [s3_files.files.items()]: + for pseudo_filename, _ in s3_files.all_filenames_with_content_for_testing(): if pseudo_filename.startswith(prefix): if pseudo_filename != prefix: to_delete.add(pseudo_filename) for pseudo_filename in to_delete: - del files[pseudo_filename] + # del s3_files.files[pseudo_filename] + s3_files.remove_file_entry_for_testing(pseudo_filename) if not delete_bucket_too: - files[prefix] = b'' + s3_files.set_file_content_for_testing(prefix, b'') + # s3_files.files[prefix] = b'' # TODO: Does anything need to be returned here? def _keys(self): found = False keys = set() # In real S3, this would be cached info, but for testing we just create it on demand prefix = self.name + "/" - for pseudo_filename, content in self.s3.s3_files.files.items(): + # for pseudo_filename, content in self.s3.s3_files.files.items(): + for pseudo_filename, content in self.s3.s3_files.all_filenames_with_content_for_testing(): if pseudo_filename.startswith(prefix): found = True key = remove_prefix(prefix, pseudo_filename) @@ -3004,7 +3588,7 @@ def known_bug_expected(jira_ticket=None, fixed=False, error_class=None): with known_bug_expected(jira_ticket="TST-00001", error_class=RuntimeError, fixed=True): ... stuff that fails ... - If the previously-expected error (now thought to be fixed) happens, an error will result so it's easy to tell + If the previously-expected error (now thought to be fixed) happens, an error will result, so it's easy to tell if there's been a regression. Parameters: @@ -3045,7 +3629,7 @@ def client_failer(operation_name, code=400): def fail(message, code=code): raise ClientError( { # noQA - PyCharm wrongly complains about this dictionary - "Error": {"Message": message, "Code": code} + "Error": {"Message": message, "Code": code} # noQA - PyCharm things code should be a string }, operation_name=operation_name) return fail diff --git a/docs/source/dcicutils.rst b/docs/source/dcicutils.rst index feddd70a5..9f696ac76 100644 --- a/docs/source/dcicutils.rst +++ b/docs/source/dcicutils.rst @@ -23,6 +23,13 @@ beanstalk_utils :members: +bucket_utils +^^^^^^^^^^^^^^^ + +.. automodule:: dcicutils.bucket_utils + :members: + + codebuild_utils ^^^^^^^^^^^^^^^ diff --git a/pyproject.toml b/pyproject.toml index 3d9e31449..8fafa1a16 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dcicutils" -version = "7.4.1" +version = "7.5.0" description = "Utility package for interacting with the 4DN Data Portal and other 4DN resources" authors = ["4DN-DCIC Team "] license = "MIT" diff --git a/test/test_bucket_utils.py b/test/test_bucket_utils.py new file mode 100644 index 000000000..021d2bc60 --- /dev/null +++ b/test/test_bucket_utils.py @@ -0,0 +1,37 @@ +import pytest + +from dcicutils.bucket_utils import parse_s3_object_name + + +def test_parse_s3_object_name(): + + GOOD = [ + ("foo/bar", {"Bucket": "foo", "Key": "bar"}), + ("foo/bar?versionId=abc", {"Bucket": "foo", "Key": "bar", "VersionId": "abc"}), + ("foo/bar/baz", {"Bucket": "foo", "Key": "bar/baz"}), + ("foo/bar/baz?versionId=", {"Bucket": "foo", "Key": "bar/baz"}), + ("foo/bar/baz?versionId=abc/def?ghi", {"Bucket": "foo", "Key": "bar/baz", "VersionId": "abc/def?ghi"}), + ] + + for input, expected in GOOD: + actual = parse_s3_object_name(input) + assert actual == expected + + BAD = [ + # We don't allow empty bucket or key + "", "foo", "/bar", "foo/", + # We don't accept junk, after or instead of the query param because we don't know what that would mean + # If a query parameter is present, we want it to be the one we care about + "foo/bar?junk=1", + "foo/bar?junkbefore=1&versionId=xyz", + "foo/bar?junkbefore=1&versionId=xyz&junkafter=2", + "foo/bar?versionId=xyz&junkafter=2", + # We think this is supposed to be case-sensitive + "foo/bar?versionid=xyz", + "foo/bar?versionID=xyz" + ] + + for input in BAD: + assert parse_s3_object_name(input, ignore_errors=True) is None + with pytest.raises(ValueError): + assert parse_s3_object_name(input) diff --git a/test/test_data_utils.py b/test/test_data_utils.py index d622b2f00..ce7da9297 100644 --- a/test/test_data_utils.py +++ b/test/test_data_utils.py @@ -179,7 +179,8 @@ def mock_gzip_open_for_write(file, mode): input_filename = 'test1.fastq.gz' generated_filename = generate_sample_fastq_file(input_filename, num=20, length=25, compressed=True) assert generated_filename == input_filename # The bug was that it generated a different name - assert mfs.files.get(generated_filename) == expected_content_1 + assert mfs.get_file_content_for_testing(generated_filename) == expected_content_1 + # assert mfs.files.get(generated_filename) == expected_content_1 # The bug report specifies that this gives a wrong result, too, with pytest.raises(RuntimeError): diff --git a/test/test_ff_mocks.py b/test/test_ff_mocks.py index 16e31ee2e..78102436c 100644 --- a/test/test_ff_mocks.py +++ b/test/test_ff_mocks.py @@ -183,7 +183,8 @@ def simulate_actual_server(): else: expected = {f"{recordings_dir}/{test_name}": f"{json.dumps(initial_data)}\n".encode('utf-8')} - assert mfs.files == expected + # assert mfs.files == expected + mfs.assert_file_system_state(expected) recording = "Recording" if recording_enabled else "NOT recording" assert printed.lines == [ @@ -234,7 +235,8 @@ def test_abstract_test_recorder_playback(): raise AssertionError("Should not get here.") assert str(exc.value) == datum4['error_message'] # 'yikes' - assert mfs.files == {} # no files created on playback + # assert mfs.files == {} # no files created on playback + mfs.assert_file_system_state({}) # no files created on playback assert printed.lines == [ f"Replaying GET {datum1['url']}", # http://foo diff --git a/test/test_ff_utils.py b/test/test_ff_utils.py index 3288a17e2..2accf04c4 100644 --- a/test/test_ff_utils.py +++ b/test/test_ff_utils.py @@ -276,7 +276,8 @@ def test_unified_authenticator_normalize_auth(): # s3 = mock_boto3.client('s3') # assert isinstance(s3, MockBotoS3Client) # for filename, string in (files or {}).items(): -# s3.s3_files.files[filename] = string.encode('utf-8') +# # s3.s3_files.files[filename] = string.encode('utf-8') +# s3.s3_files.set_file_content(filename, string.encode('utf-8')) # yield mock_boto3 diff --git a/test/test_glacier_utils.py b/test/test_glacier_utils.py index e69bbc3cb..4b4b4e3ee 100644 --- a/test/test_glacier_utils.py +++ b/test/test_glacier_utils.py @@ -3,9 +3,10 @@ from unittest import mock +from dcicutils.common import STANDARD, STANDARD_IA, DEEP_ARCHIVE, GLACIER, GLACIER_IR from dcicutils.ff_mocks import mocked_s3utils from dcicutils.glacier_utils import GlacierUtils, GlacierRestoreException -from dcicutils.qa_utils import MockFileSystem +from dcicutils.qa_utils import MockFileSystem, MockBotoS3Client def mock_keydict() -> dict: @@ -193,7 +194,7 @@ def test_glacier_utils_is_restore_finished(self, glacier_utils, response, expect 'IsLatest': True, 'ETag': '"abc123"', 'Size': 1024, - 'StorageClass': 'STANDARD', + 'StorageClass': STANDARD, 'LastModified': '2023' }, { @@ -202,7 +203,7 @@ def test_glacier_utils_is_restore_finished(self, glacier_utils, response, expect 'IsLatest': False, 'ETag': '"def456"', 'Size': 2048, - 'StorageClass': 'GLACIER', + 'StorageClass': GLACIER, 'LastModified': '2023' } ], @@ -217,7 +218,7 @@ def test_glacier_utils_is_restore_finished(self, glacier_utils, response, expect 'IsLatest': False, 'ETag': '"def456"', 'Size': 2048, - 'StorageClass': 'GLACIER', + 'StorageClass': GLACIER, 'LastModified': '2023' } ], @@ -232,7 +233,7 @@ def test_glacier_utils_is_restore_finished(self, glacier_utils, response, expect 'IsLatest': False, 'ETag': '"def456"', 'Size': 2048, - 'StorageClass': 'GLACIER_IR', + 'StorageClass': GLACIER_IR, 'LastModified': '2023' } ], @@ -247,7 +248,7 @@ def test_glacier_utils_is_restore_finished(self, glacier_utils, response, expect 'IsLatest': True, 'ETag': '"abc123"', 'Size': 1024, - 'StorageClass': 'STANDARD', + 'StorageClass': STANDARD, 'LastModified': '2023' }, { @@ -256,7 +257,7 @@ def test_glacier_utils_is_restore_finished(self, glacier_utils, response, expect 'IsLatest': False, 'ETag': '"def456"', 'Size': 2048, - 'StorageClass': 'DEEP_ARCHIVE', + 'StorageClass': DEEP_ARCHIVE, 'LastModified': '2023' } ], @@ -283,7 +284,7 @@ def test_glacier_utils_delete_glaciered_versions_exist(self, glacier_utils, resp 'IsLatest': True, 'ETag': '"abc123"', 'Size': 1024, - 'StorageClass': 'STANDARD', + 'StorageClass': STANDARD, 'LastModified': '2023' }, { @@ -292,7 +293,7 @@ def test_glacier_utils_delete_glaciered_versions_exist(self, glacier_utils, resp 'IsLatest': False, 'ETag': '"def456"', 'Size': 2048, - 'StorageClass': 'GLACIER', + 'StorageClass': GLACIER, 'LastModified': '2023' } ], @@ -307,7 +308,7 @@ def test_glacier_utils_delete_glaciered_versions_exist(self, glacier_utils, resp 'IsLatest': True, 'ETag': '"abc123"', 'Size': 1024, - 'StorageClass': 'STANDARD', + 'StorageClass': STANDARD, 'LastModified': '2023' }, { @@ -316,7 +317,7 @@ def test_glacier_utils_delete_glaciered_versions_exist(self, glacier_utils, resp 'IsLatest': False, 'ETag': '"def456"', 'Size': 2048, - 'StorageClass': 'GLACIER_IR', + 'StorageClass': GLACIER_IR, 'LastModified': '2023' } ], @@ -331,7 +332,7 @@ def test_glacier_utils_delete_glaciered_versions_exist(self, glacier_utils, resp 'IsLatest': True, 'ETag': '"abc123"', 'Size': 1024, - 'StorageClass': 'STANDARD_IA', + 'StorageClass': STANDARD_IA, 'LastModified': '2023' } ], @@ -355,7 +356,7 @@ def test_glacier_utils_non_glacier_versions_exist(self, glacier_utils, response) 'IsLatest': False, 'ETag': '"def456"', 'Size': 2048, - 'StorageClass': 'GLACIER', + 'StorageClass': GLACIER, 'LastModified': '2023' } ], @@ -370,7 +371,7 @@ def test_glacier_utils_non_glacier_versions_exist(self, glacier_utils, response) 'IsLatest': False, 'ETag': '"def456"', 'Size': 2048, - 'StorageClass': 'DEEP_ARCHIVE', + 'StorageClass': DEEP_ARCHIVE, 'LastModified': '2023' } ], @@ -394,7 +395,7 @@ def test_glacier_utils_non_glacier_versions_dont_exist(self, glacier_utils, resp 'IsLatest': True, 'ETag': '"abc123"', 'Size': 1024, - 'StorageClass': 'STANDARD', + 'StorageClass': STANDARD, 'LastModified': '2023' }, { @@ -403,7 +404,7 @@ def test_glacier_utils_non_glacier_versions_dont_exist(self, glacier_utils, resp 'IsLatest': False, 'ETag': '"def456"', 'Size': 2048, - 'StorageClass': 'STANDARD', + 'StorageClass': STANDARD, 'LastModified': '2023' } ], @@ -418,7 +419,7 @@ def test_glacier_utils_non_glacier_versions_dont_exist(self, glacier_utils, resp 'IsLatest': True, 'ETag': '"abc123"', 'Size': 1024, - 'StorageClass': 'STANDARD', + 'StorageClass': STANDARD, 'LastModified': '2023' }, ], @@ -519,21 +520,32 @@ def test_glacier_utils_with_mock_s3(self, glacier_utils): with mocked_s3utils(environments=['fourfront-mastertest']) as mock_boto3: with mfs.mock_exists_open_remove(): s3 = mock_boto3.client('s3') + assert isinstance(s3, MockBotoS3Client) with mock.patch.object(gu, 's3', s3): bucket_name = 'foo' key_name = 'file.txt' key2_name = 'file2.txt' + s3_filename2 = f"{bucket_name}/{key2_name}" with io.open(key_name, 'w') as fp: fp.write("first contents") s3.upload_file(key_name, Bucket=bucket_name, Key=key_name) with io.open(key2_name, 'w') as fp: fp.write("second contents") - s3.upload_file(key2_name, Bucket=bucket_name, Key=key2_name) + s3.upload_file(key2_name, Bucket=bucket_name, Key=key2_name, ExtraArgs={'StorageClass': GLACIER}) with io.open(key2_name, 'w') as fp: # add a second version fp.write("second contents 2") s3.upload_file(key2_name, Bucket=bucket_name, Key=key2_name) versions = s3.list_object_versions(Bucket=bucket_name, Prefix=key2_name) - version_1 = versions['Versions'][0]['VersionId'] - assert gu.restore_s3_from_glacier(bucket_name, key2_name, version_id=version_1) - assert gu.copy_object_back_to_original_location(bucket_name, key2_name, version_id=version_1, + version_1 = versions['Versions'][0] + version_1_id = version_1['VersionId'] + version_names = {version_1_id: 'version_1'} + s3.show_object_versions_for_debugging(bucket=bucket_name, prefix=key2_name, + context="BEFORE RESTORE", version_names=version_names) + assert gu.restore_s3_from_glacier(bucket_name, key2_name, version_id=version_1_id) + s3.show_object_versions_for_debugging(bucket=bucket_name, prefix=key2_name, + context="AFTER RESTORE", version_names=version_names) + s3.hurry_restoration_for_testing(s3_filename=s3_filename2, version_id=version_1_id) + assert gu.copy_object_back_to_original_location(bucket_name, key2_name, version_id=version_1_id, preserve_lifecycle_tag=True) + s3.show_object_versions_for_debugging(bucket=bucket_name, prefix=key2_name, + context="FINAL STATE", version_names=version_names) diff --git a/test/test_qa_utils.py b/test/test_qa_utils.py index 3359bcc1f..9d0a84e4d 100644 --- a/test/test_qa_utils.py +++ b/test/test_qa_utils.py @@ -15,6 +15,7 @@ import uuid from dcicutils import qa_utils +from dcicutils.common import STANDARD, GLACIER from dcicutils.exceptions import ExpectedErrorNotSeen, WrongErrorSeen, UnexpectedErrorAfterFix from dcicutils.ff_mocks import mocked_s3utils from dcicutils.lang_utils import there_are @@ -26,6 +27,7 @@ raises_regexp, VersionChecker, check_duplicated_items_by_key, guess_local_timezone_for_testing, logged_messages, input_mocked, ChangeLogChecker, MockLog, MockId, Eventually, Timer, MockObjectBasicAttributeBlock, MockObjectAttributeBlock, MockObjectDeleteMarker, MockTemporaryRestoration, + MockBigContent, is_abstract_content, add_coverage, simplify_coverage, MockMultiPartUpload, ) # The following line needs to be separate from other imports. It is PART OF A TEST. from dcicutils.qa_utils import notice_pytest_fixtures # Use care if editing this line. It is PART OF A TEST. @@ -185,7 +187,7 @@ def __init__(self, summer_tz, winter_tz): self._winter_tz = winter_tz def tzname(self, dt: datetime.datetime): - # The exact time that daylight time runs varies from year to year. For testing we'll say that + # The exact time that daylight time runs varies from year to year. For testing, we'll say that # daylight time is April 1 to Oct 31. In practice, we recommend times close to Dec 31 for winter # and Jun 30 for summer, so the precise transition date doesn't matter. -kmp 9-Mar-2021 if 3 < dt.month < 11: @@ -321,7 +323,7 @@ def test_controlled_time_utcnow(): t1 = t.now() # initial time + 1 second t.set_datetime(t0) t2 = t.utcnow() # initial time UTC + 1 second - # This might be 5 hours in US/Eastern at HMS or it might be 0 hours in UTC on AWS or GitHub Actions. + # This might be 5 hours in US/Eastern at HMS, or it might be 0 hours in UTC on AWS or GitHub Actions. assert (t2 - t1).total_seconds() == abs(local_time.utcoffset(t0).total_seconds()) @@ -620,7 +622,7 @@ def reliably_add3(x): return rarely_add3(x) # We have to access a random place out of a tuple structure for mock data on time.sleep's arg. - # Documentation says we should be able to access the call with .call_args[n] but that doesn't work + # Documentation says we should be able to access the call with .call_args[n] but that doesn't work, # and it's also documented to work by tuple, so .mock_calls[n][1][m] substitutes for # .mock_calls[n].call_args[m], but using .mock_calls[n][ARGS][m] as the compromise. -kmp 20-May-2020 @@ -797,7 +799,7 @@ def test_mock_file_system_simple(): filename2 = "pre-existing-file.txt" assert os.path.exists(filename2) - assert len(mfs.files) == 1 + mfs.assert_file_count(1) with io.open(filename, 'w') as fp: fp.write("foo") @@ -808,17 +810,18 @@ def test_mock_file_system_simple(): with io.open(filename, 'r') as fp: assert fp.read() == 'foobar\nbaz\n' - assert len(mfs.files) == 2 + mfs.assert_file_count(2) with io.open(filename2, 'r') as fp: assert fp.read() == "stuff from yesterday" - assert sorted(mfs.files.keys()) == ['no.such.file', 'pre-existing-file.txt'] + # assert sorted(mfs.files.keys()) == ['no.such.file', 'pre-existing-file.txt'] + assert mfs.all_filenames_for_testing() == ['no.such.file', 'pre-existing-file.txt'] - assert mfs.files == { + mfs.assert_file_system_state({ 'no.such.file': b'foobar\nbaz\n', 'pre-existing-file.txt': b'stuff from yesterday' - } + }) def test_mock_file_system_auto(): @@ -836,13 +839,14 @@ def test_mock_file_system_auto(): with open(temp_filename, 'w') as outfile: outfile.write(temp_file_text) + mfs: MockFileSystem with MockFileSystem(auto_mirror_files_for_read=True).mock_exists_open_remove() as mfs: - assert len(mfs.files) == 0 + mfs.assert_file_count(0) assert os.path.exists(temp_filename) - assert len(mfs.files) == 1 + mfs.assert_file_count(1) # auto-mirroring has pulled in a file with open(temp_filename) as infile: content = infile.read() @@ -851,13 +855,13 @@ def test_mock_file_system_auto(): os.remove(temp_filename) - assert len(mfs.files) == 0 + mfs.assert_file_count(0) # Removing the file in the mock does not cause us to auto-mirror anew. assert not os.path.exists(temp_filename) # This is just confirmation - assert len(mfs.files) == 0 + mfs.assert_file_count(0) # But now we are outside the mock again, so the file should be visible. assert os.path.exists(temp_filename) @@ -1151,10 +1155,11 @@ def test_mock_boto3_client_use(): # No matter what clients you get, they all share the same MockFileSystem, which we can get from s3_files s3fs = s3.s3_files # We saved an s3 file to bucket "foo" and key "bar", so it will be in the s3fs as "foo/bar" - assert sorted(s3fs.files.keys()) == ['foo/bar', 'foo/baz'] + assert s3fs.all_filenames_for_testing() == ['foo/bar', 'foo/baz'] + # assert sorted(s3fs.files.keys()) == ['foo/bar', 'foo/baz'] # The content is stored in binary format - assert s3fs.files['foo/bar'] == b'some content' - assert s3fs.files['foo/baz'] == b'other content' + s3fs.assert_file_content('foo/bar', b'some content') + s3fs.assert_file_content('foo/baz', b'other content') assert isinstance(s3, MockBotoS3Client) @@ -1212,21 +1217,28 @@ def test_mock_boto_s3_client_upload_file_and_download_file_positional(): with io.open("file1.txt", 'w') as fp: fp.write('Hello!\n') - assert local_mfs.files == {"file1.txt": b"Hello!\n"} - assert mock_s3_client.s3_files.files == {} + # assert local_mfs.files == {"file1.txt": b"Hello!\n"} + local_mfs.assert_file_system_state({"file1.txt": b"Hello!\n"}) + # assert mock_s3_client.s3_files.files == {} + mock_s3_client.s3_files.assert_file_system_state({}) mock_s3_client.upload_file("file1.txt", "MyBucket", "MyFile") - assert local_mfs.files == {"file1.txt": b"Hello!\n"} - assert mock_s3_client.s3_files.files == {'MyBucket/MyFile': b"Hello!\n"} + local_mfs.assert_file_system_state({"file1.txt": b"Hello!\n"}) + # assert local_mfs.files == {"file1.txt": b"Hello!\n"} + + mock_s3_client.s3_files.assert_file_system_state({'MyBucket/MyFile': b"Hello!\n"}) + # assert mock_s3_client.s3_files.files == {'MyBucket/MyFile': b"Hello!\n"} mock_s3_client.download_file("MyBucket", "MyFile", "file2.txt") - assert local_mfs.files == { + # assert local_mfs.files == {...} + local_mfs.assert_file_system_state({ "file1.txt": b"Hello!\n", "file2.txt": b"Hello!\n", - } - assert mock_s3_client.s3_files.files == {'MyBucket/MyFile': b"Hello!\n"} + }) + mock_s3_client.s3_files.assert_file_system_state({'MyBucket/MyFile': b"Hello!\n"}) + # assert mock_s3_client.s3_files.files == {'MyBucket/MyFile': b"Hello!\n"} assert file_contents("file1.txt") == file_contents("file2.txt") @@ -1241,21 +1253,23 @@ def test_mock_boto_s3_client_upload_file_and_download_file_keyworded(): with io.open("file1.txt", 'w') as fp: fp.write('Hello!\n') - assert local_mfs.files == {"file1.txt": b"Hello!\n"} - assert mock_s3_client.s3_files.files == {} + local_mfs.assert_file_content("file1.txt", b"Hello!\n") + mock_s3_client.s3_files.assert_file_count(0) mock_s3_client.upload_file(Filename="file1.txt", Bucket="MyBucket", Key="MyFile") - assert local_mfs.files == {"file1.txt": b"Hello!\n"} - assert mock_s3_client.s3_files.files == {'MyBucket/MyFile': b"Hello!\n"} + local_mfs.assert_file_content("file1.txt", b"Hello!\n") + mock_s3_client.s3_files.assert_file_system_state({'MyBucket/MyFile': b"Hello!\n"}) mock_s3_client.download_file(Bucket="MyBucket", Key="MyFile", Filename="file2.txt") - assert local_mfs.files == { + local_mfs.assert_file_system_state({ "file1.txt": b"Hello!\n", "file2.txt": b"Hello!\n", - } - assert mock_s3_client.s3_files.files == {'MyBucket/MyFile': b"Hello!\n"} + }) + mock_s3_client.s3_files.assert_file_system_state({ + 'MyBucket/MyFile': b"Hello!\n" + }) assert file_contents("file1.txt") == file_contents("file2.txt") @@ -1272,23 +1286,29 @@ def test_mock_boto_s3_client_upload_fileobj_and_download_fileobj_positional(): with io.open("file1.txt", 'w') as fp: fp.write('Hello!\n') - assert local_mfs.files == {"file1.txt": b"Hello!\n"} - assert mock_s3_client.s3_files.files == {} + # assert local_mfs.files == {"file1.txt": b"Hello!\n"} + local_mfs.assert_file_system_state({"file1.txt": b"Hello!\n"}) + # assert mock_s3_client.s3_files.files == {} + mock_s3_client.s3_files.assert_file_system_state({}) with io.open("file1.txt", 'rb') as fp: mock_s3_client.upload_fileobj(fp, "MyBucket", "MyFile") - assert local_mfs.files == {"file1.txt": b"Hello!\n"} - assert mock_s3_client.s3_files.files == {'MyBucket/MyFile': b"Hello!\n"} + # assert local_mfs.files == {"file1.txt": b"Hello!\n"} + local_mfs.assert_file_system_state({"file1.txt": b"Hello!\n"}) + # assert mock_s3_client.s3_files.files == {'MyBucket/MyFile': b"Hello!\n"} + mock_s3_client.s3_files.assert_file_system_state({'MyBucket/MyFile': b"Hello!\n"}) with io.open("file2.txt", 'wb') as fp: mock_s3_client.download_fileobj("MyBucket", "MyFile", fp) - assert local_mfs.files == { + # assert local_mfs.files == { ... } + local_mfs.assert_file_system_state({ "file1.txt": b"Hello!\n", "file2.txt": b"Hello!\n", - } - assert mock_s3_client.s3_files.files == {'MyBucket/MyFile': b"Hello!\n"} + }) + # assert mock_s3_client.s3_files.files == {'MyBucket/MyFile': b"Hello!\n"} + mock_s3_client.s3_files.assert_file_system_state({'MyBucket/MyFile': b"Hello!\n"}) assert file_contents("file1.txt") == file_contents("file2.txt") @@ -1303,23 +1323,29 @@ def test_mock_boto_s3_client_upload_fileobj_and_download_fileobj_keyworded(): with io.open("file1.txt", 'w') as fp: fp.write('Hello!\n') - assert local_mfs.files == {"file1.txt": b"Hello!\n"} - assert mock_s3_client.s3_files.files == {} + # assert local_mfs.files == {"file1.txt": b"Hello!\n"} + local_mfs.assert_file_system_state({"file1.txt": b"Hello!\n"}) + # assert mock_s3_client.s3_files.files == {} + mock_s3_client.s3_files.assert_file_system_state({}) with io.open("file1.txt", 'rb') as fp: mock_s3_client.upload_fileobj(Fileobj=fp, Bucket="MyBucket", Key="MyFile") - assert local_mfs.files == {"file1.txt": b"Hello!\n"} - assert mock_s3_client.s3_files.files == {'MyBucket/MyFile': b"Hello!\n"} + # assert local_mfs.files == {"file1.txt": b"Hello!\n"} + local_mfs.assert_file_system_state({"file1.txt": b"Hello!\n"}) + # assert mock_s3_client.s3_files.files == {'MyBucket/MyFile': b"Hello!\n"} + mock_s3_client.s3_files.assert_file_system_state({'MyBucket/MyFile': b"Hello!\n"}) with io.open("file2.txt", 'wb') as fp: mock_s3_client.download_fileobj(Bucket="MyBucket", Key="MyFile", Fileobj=fp) - assert local_mfs.files == { + # assert local_mfs.files == { ... } + local_mfs.assert_file_system_state({ "file1.txt": b"Hello!\n", "file2.txt": b"Hello!\n", - } - assert mock_s3_client.s3_files.files == {'MyBucket/MyFile': b"Hello!\n"} + }) + # assert mock_s3_client.s3_files.files == {'MyBucket/MyFile': b"Hello!\n"} + mock_s3_client.s3_files.assert_file_system_state({'MyBucket/MyFile': b"Hello!\n"}) assert file_contents("file1.txt") == file_contents("file2.txt") @@ -1378,7 +1404,7 @@ def test_object_basic_attribute_block(): ignored(x) with pytest.raises(NotImplementedError): - b.initialize_storage_class('STANDARD') + b.set_storage_class('STANDARD') with pytest.raises(NotImplementedError): x = b.tagset @@ -1405,7 +1431,7 @@ def test_object_delete_marker(): ignored(x) with pytest.raises(Exception): - b.initialize_storage_class('STANDARD') + b.set_storage_class('STANDARD') with pytest.raises(Exception): x = b.tagset @@ -1432,7 +1458,7 @@ def test_object_attribute_block(): assert b.filename == sample_filename assert isinstance(b.version_id, str) assert b.storage_class == 'STANDARD' - b.initialize_storage_class('GLACIER') + b.set_storage_class('GLACIER') assert b.storage_class == 'GLACIER' assert b.tagset == [] b.set_tagset(sample_tagset) @@ -1492,7 +1518,7 @@ class MyRuntimeError(RuntimeError): with pytest.raises(Exception): # This will fail because the inner error is a KeyError, not a RuntimeError. # I WISH this would raise AssertionError, but pytest lets the KeyError through. - # I am not sure that's the same as what unittest does in this case but it will + # I am not sure that's the same as what unittest does in this case, but it will # suffice for now. -kmp 6-Oct-2020 with raises_regexp(RuntimeError, "This.*test!"): raise KeyError('This is a test!') @@ -1974,7 +2000,8 @@ def test_timer(): def show_s3_debugging_data(mfs, s3, bucket_name): print("file system:") - for file, data in mfs.files.items(): + # for file, data in mfs.files.items(): + for file, data in mfs.all_filenames_with_content_for_testing(): s3_filename = f'{bucket_name}/{file}' all_versions = s3._object_all_versions(s3_filename) # noQA - internal method needed for testing print(f" {file}[{s3._object_attribute_block(s3_filename).version_id}]:" # noQA - ditto @@ -2151,3 +2178,335 @@ def test_s3_list_object_versions(): assert version3['Key'] == key2_name assert version3['IsLatest'] is True assert all(version['StorageClass'] == 'STANDARD' for version in versions) + + +def test_is_abstract_content(): + + content = MockBigContent(size=5000) + assert is_abstract_content(content) + + +def test_simplify_coverage(): + + assert simplify_coverage([[0, 0]]) == [] + assert simplify_coverage([[100, 100]]) == [] + + assert simplify_coverage([[0, 100]]) == [[0, 100]] + assert simplify_coverage([[100, 500]]) == [[100, 500]] + + assert simplify_coverage([[0, 100], [100, 200]]) == [[0, 200]] + assert simplify_coverage([[0, 100], [101, 200]]) == [[0, 100], [101, 200]] + + assert simplify_coverage([[0, 100], [100, 101], [101, 200]]) == [[0, 200]] + assert simplify_coverage([[0, 100], [100, 101], [90, 200]]) == [[0, 200]] + assert simplify_coverage([[100, 200], [225, 250], [90, 300]]) == [[90, 300]] + assert simplify_coverage([[100, 200], [225, 250], [200, 227], [0, 0]]) == [[100, 250]] + + +def test_add_coverage(): + assert add_coverage([], 0, 0) == [] + assert add_coverage([], 100, 100) == [] + + assert add_coverage([], 0, 100) == [[0, 100]] + assert add_coverage([], 100, 500) == [[100, 500]] + + assert add_coverage([[0, 100]], 100, 200) == [[0, 200]] + assert add_coverage([[0, 100]], 101, 200) == [[0, 100], [101, 200]] + + assert add_coverage([[0, 100], [100, 101]], 101, 200) == [[0, 200]] + assert add_coverage([[0, 100], [100, 101]], 90, 200) == [[0, 200]] + assert add_coverage([[100, 200], [225, 250]], 90, 300) == [[90, 300]] + assert add_coverage([[100, 200], [225, 250], [200, 227]], 0, 0) == [[100, 250]] + + +def test_mock_big_content(): + + print() # start on a fresh line + + size = 5005 + increment = 1000 + + content = MockBigContent(size=size) + assert isinstance(content._content_id, str) + assert content.coverage == [[0, size]] + + content_copy = content.start_partial_copy() + assert content_copy != content + pos = 0 + print(f"content={content}") + print(f"content_copy={content_copy}") + while pos < size: + assert content_copy != content + new_pos = min(pos + increment, size) + print(f"pos={pos} new_pos={new_pos} content_copy={content_copy}") + content.copy_portion(start=pos, end=new_pos, target=content_copy) + pos = new_pos + assert content_copy == content + + +def test_validate_parts_complete(): + + content = MockBigContent(size=5000) + part1 = content.part_etag("bytes=1-1000") + part2 = content.part_etag("bytes=1001-4500") + part3 = content.part_etag("bytes=4501-5000") + MockBigContent.validate_parts_complete([part1, part3, part2]) + + content = MockBigContent(size=5000) + part1 = content.part_etag("bytes=0-1000") + part2 = content.part_etag("bytes=1001-4500") + part3 = content.part_etag("bytes=4501-4999") + with pytest.raises(Exception): + MockBigContent.validate_parts_complete([part1, part3, part2]) + + +def test_multipart_upload(): + + file1 = 'file1.txt' + file1_content = 'data1' + bucket1 = 'foo' + key1a = 's3_file1a.txt' + + file1x = 'file1x.txt' + key1x = 's3_file1x.txt' + file1x_content = 'this is alternate date' + key1_prefix = 's3_file' + + file2 = 'file2.txt' + file2_content = '' # Empty + bucket2 = 'bar' + key2 = 's3_file2.txt' + + mfs = MockFileSystem() + with mocked_s3utils(environments=['fourfront-mastertest']) as mock_boto3: + with mfs.mock_exists_open_remove(): + assert isinstance(mock_boto3, MockBoto3) + s3 = mock_boto3.client('s3') + assert isinstance(s3, MockBotoS3Client) + + def scenario(n): + print("=" * 50, "SCENARIO", n, "=" * 50) + + # ==================== + scenario(1) + + with io.open(file1, 'w') as fp: + fp.write(file1_content) + s3.upload_file(Filename=file1, Bucket=bucket1, Key=key1a) + attribute_block_1 = s3._object_attribute_block(f"{bucket1}/{key1a}") + source_version_id_1 = attribute_block_1.version_id + s3.show_object_versions_for_debugging(bucket=bucket1, prefix=key1a, + context="After upload to S3 (scenario 1)", + version_names={source_version_id_1: 'source_version_id_1'}) + result = s3.create_multipart_upload(Bucket=bucket1, Key=key1a) + upload1_id = result['UploadId'] + upload = MockMultiPartUpload.lookup(upload1_id) + assert upload.upload_id == upload1_id + assert upload.source is None + assert upload.target == {'Bucket': bucket1, 'Key': key1a, 'VersionId': None} + assert upload.storage_class == STANDARD + assert upload.tagging == [] + assert upload.parts == [] + assert upload.action is None + assert upload.is_complete is False + scenario1_part1_res = s3.upload_part_copy(CopySource={'Bucket': bucket1, 'Key': key1a}, + Bucket=bucket1, Key=key1a, + PartNumber=1, CopySourceRange="bytes=1-2", UploadId=upload1_id) + s3.show_object_versions_for_debugging(bucket=bucket1, prefix=key1a, + context="After scenario 1 upload part 1", + version_names={source_version_id_1: 'source_version_id_1'}) + scenario1_part1_etag = scenario1_part1_res['CopyPartResult']['ETag'] + n = len(file1_content) + scenario1_part1_res = s3.upload_part_copy(CopySource={'Bucket': bucket1, 'Key': key1a}, + Bucket=bucket1, Key=key1a, + PartNumber=2, CopySourceRange=f"bytes=3-{n}", UploadId=upload1_id) + + with pytest.raises(Exception): + # This copy is incomplete, so the attempt to complete it will fail. + # We need to do one more part copy before it can succeed. + upload_desc = { + 'Parts': [ + {'PartNumber': 1, 'ETag': scenario1_part1_etag} + ] + } + s3.complete_multipart_upload(Bucket=bucket1, Key=key1a, + MultipartUpload=upload_desc, UploadId=upload1_id) + + s3.show_object_versions_for_debugging(bucket=bucket1, prefix=key1a, + context="After scenario 1 upload part 2", + version_names={source_version_id_1: 'source_version_id_1'}) + scenario1_part2_etag = scenario1_part1_res['CopyPartResult']['ETag'] + upload_desc = { + 'Parts': [ + {'PartNumber': 1, 'ETag': scenario1_part1_etag}, + {'PartNumber': 2, 'ETag': scenario1_part2_etag} + ] + } + s3.complete_multipart_upload(Bucket=bucket1, Key=key1a, MultipartUpload=upload_desc, UploadId=upload1_id) + s3.show_object_versions_for_debugging(bucket=bucket1, prefix=key1a, context="After scenario 1 complete", + version_names={source_version_id_1: 'source_version_id_1'}) + + # ==================== + scenario(2) + + with io.open(file2, 'w') as fp: + fp.write(file2_content) + s3.upload_file(Filename=file2, Bucket=bucket2, Key=key2) + attribute_block_2 = s3._object_attribute_block(f"{bucket2}/{key2}") + source_version_id_2 = attribute_block_2.version_id + s3.show_object_versions_for_debugging(bucket=bucket2, prefix=key2, + context="After upload to S3 (scenario 2)", + version_names={source_version_id_2: 'source_version_id_2'}) + result = s3.create_multipart_upload(Bucket=bucket2, Key=key2, + StorageClass=GLACIER, + Tagging="abc=123&xyz=something") + upload2_id = result['UploadId'] + upload2 = MockMultiPartUpload.lookup(upload2_id) + assert upload2.upload_id == upload2_id + assert upload2.source is None + assert upload2.target == {'Bucket': bucket2, 'Key': key2, 'VersionId': None} + assert upload2.storage_class == GLACIER + assert upload2.tagging == [{'Key': 'abc', 'Value': '123'}, {'Key': 'xyz', 'Value': 'something'}] + assert upload2.parts == [] + assert upload2.action is None + assert upload2.is_complete is False + scenario2_part1_res = s3.upload_part_copy(CopySource={'Bucket': bucket2, 'Key': key2}, + Bucket=bucket2, Key=key2, + # Note here that a range of 1-0 means "no bytes", it's empty + PartNumber=1, CopySourceRange="bytes=1-0", UploadId=upload2_id) + s3.show_object_versions_for_debugging(bucket=bucket2, prefix=key2, context="After scenario 2 upload part", + version_names={source_version_id_2: 'source_version_id_2'}) + scenario2_part1_etag = scenario2_part1_res['CopyPartResult']['ETag'] + upload_desc = { + 'Parts': [ + {'PartNumber': 1, 'ETag': scenario2_part1_etag} + ] + } + s3.complete_multipart_upload(Bucket=bucket2, Key=key2, MultipartUpload=upload_desc, UploadId=upload2_id) + s3.show_object_versions_for_debugging(bucket=bucket2, prefix=key2, context="After scenario 2 complete", + version_names={source_version_id_2: 'source_version_id_2'}) + + # ==================== + scenario(3) + + with io.open(file1x, 'w') as fp: + fp.write(file1x_content) + s3.upload_file(Filename=file1x, Bucket=bucket1, Key=key1x) + attribute_block_1x = s3._object_attribute_block(f"{bucket1}/{key1x}") + source_version_id_1x = attribute_block_1x.version_id + s3.show_object_versions_for_debugging(bucket=bucket1, prefix=key1_prefix, + context="After upload to S3 (scenario 3)", + version_names={ + source_version_id_1: 'source_version_id_1', + source_version_id_1x: 'source_version_id_1x' + }) + result = s3.create_multipart_upload(Bucket=bucket1, Key=key1x) + upload1x_id = result['UploadId'] + upload = MockMultiPartUpload.lookup(upload1x_id) + assert upload.upload_id == upload1x_id + assert upload.source is None + assert upload.target == {'Bucket': bucket1, 'Key': key1x, 'VersionId': None} + assert upload.storage_class == STANDARD + assert upload.tagging == [] + assert upload.parts == [] + assert upload.action is None + assert upload.is_complete is False + scenario1x_part1_res = s3.upload_part_copy(CopySource={'Bucket': bucket1, 'Key': key1a, + 'VersionId': source_version_id_1}, + Bucket=bucket1, Key=key1x, + PartNumber=1, CopySourceRange="bytes=1-2", UploadId=upload1x_id) + s3.show_object_versions_for_debugging(bucket=bucket1, prefix=key1_prefix, + context="After scenario 3 upload part 1", + version_names={ + source_version_id_1: 'source_version_id_1', + source_version_id_1x: 'source_version_id_1x' + }) + scenario1x_part1_etag = scenario1x_part1_res['CopyPartResult']['ETag'] + n = len(file1_content) + scenario1x_part1_res = s3.upload_part_copy(CopySource={'Bucket': bucket1, 'Key': key1a, + 'VersionId': source_version_id_1}, + Bucket=bucket1, Key=key1x, + PartNumber=2, CopySourceRange=f"bytes=3-{n}", + UploadId=upload1x_id) + + with pytest.raises(Exception): + # This copy is incomplete, so the attempt to complete it will fail. + # We need to do one more part copy before it can succeed. + upload_desc = { + 'Parts': [ + {'PartNumber': 1, 'ETag': scenario1x_part1_etag} + ] + } + s3.complete_multipart_upload(Bucket=bucket1, Key=key1x, + MultipartUpload=upload_desc, UploadId=upload1x_id) + + s3.show_object_versions_for_debugging(bucket=bucket1, prefix=key1_prefix, + context="After scenario 3 upload part 2", + version_names={ + source_version_id_1: 'source_version_id_1', + source_version_id_1x: 'source_version_id_1x' + }) + scenario1x_part2_etag = scenario1x_part1_res['CopyPartResult']['ETag'] + upload_desc = { + 'Parts': [ + {'PartNumber': 1, 'ETag': scenario1x_part1_etag}, + {'PartNumber': 2, 'ETag': scenario1x_part2_etag} + ] + } + s3.complete_multipart_upload(Bucket=bucket1, Key=key1x, MultipartUpload=upload_desc, UploadId=upload1x_id) + s3.show_object_versions_for_debugging(bucket=bucket1, prefix=key1_prefix, + context="After scenario 3 complete", + version_names={ + source_version_id_1: 'source_version_id_1', + source_version_id_1x: 'source_version_id_1x' + }) + + # ==================== + scenario(4) + + KB = 1000 + MB = KB * KB + + source_4_key = 'key_in' + target_4_key = 'key_out' + prefix_4 = 'key_' + size_4 = 120 * MB + incr_4 = 25 * MB + + source_version_id_4 = s3.create_big_file(Bucket=bucket1, Key=source_4_key, size=120 * MB).version_id + + def show_progress(context): + s3.show_object_versions_for_debugging(bucket=bucket1, prefix=prefix_4, context=context, + version_names={source_version_id_4: 'source_version_id_4'}) + show_progress("After creating mock big file (scenario 4)") + result_4 = s3.create_multipart_upload(Bucket=bucket1, Key=target_4_key) + upload_id_4 = result_4['UploadId'] + upload_4 = MockMultiPartUpload.lookup(upload_id_4) + assert upload_4.upload_id == upload_id_4 + assert upload_4.source is None + assert upload_4.target == {'Bucket': bucket1, 'Key': target_4_key, 'VersionId': None} + assert upload_4.storage_class == STANDARD + assert upload_4.tagging == [] + assert upload_4.parts == [] + assert upload_4.action is None + assert upload_4.is_complete is False + i = 0 + n = 0 + parts = [] + while n < size_4: + i = i + 1 + part_size = min(incr_4, size_4 - n) + range_spec = f"bytes={n + 1}-{n + part_size}" + part_res = s3.upload_part_copy(CopySource={'Bucket': bucket1, 'Key': source_4_key}, + Bucket=bucket1, Key=target_4_key, + PartNumber=i, CopySourceRange=range_spec, + UploadId=upload_id_4) + show_progress(f"After scenario 4 upload part {i}") + part_etag = part_res['CopyPartResult']['ETag'] + parts.append({'PartNumber': i, 'ETag': part_etag}) + n = n + incr_4 + upload_desc = {'Parts': parts} + s3.complete_multipart_upload(Bucket=bucket1, Key=target_4_key, + MultipartUpload=upload_desc, UploadId=upload_id_4) + show_progress("After scenario 4 complete") + assert upload_4.is_complete