diff --git a/.github/workflows/base-lambdas-reusable-deploy-all.yml b/.github/workflows/base-lambdas-reusable-deploy-all.yml index 0dfc47b84..7f8fad3ed 100644 --- a/.github/workflows/base-lambdas-reusable-deploy-all.yml +++ b/.github/workflows/base-lambdas-reusable-deploy-all.yml @@ -696,7 +696,7 @@ jobs: lambda_layer_names: "core_lambda_layer" secrets: AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} - + deploy_get_document_reference_by_id_lambda: name: Deploy get_document_reference_lambda uses: ./.github/workflows/base-lambdas-reusable-deploy.yml @@ -710,3 +710,17 @@ jobs: lambda_layer_names: "core_lambda_layer" secrets: AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} + + deploy_review_processor_lambda: + name: Deploy Review Processor Lambda + uses: ./.github/workflows/base-lambdas-reusable-deploy.yml + with: + environment: ${{ inputs.environment }} + python_version: ${{ inputs.python_version }} + build_branch: ${{ inputs.build_branch }} + sandbox: ${{ inputs.sandbox }} + lambda_handler_name: document_review_processor_handler + lambda_aws_name: DocumentReviewProcessor + lambda_layer_names: "core_lambda_layer" + secrets: + AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} diff --git a/lambdas/handlers/document_review_processor_handler.py b/lambdas/handlers/document_review_processor_handler.py new file mode 100644 index 000000000..a363ccf7d --- /dev/null +++ b/lambdas/handlers/document_review_processor_handler.py @@ -0,0 +1,59 @@ +from models.sqs.review_message_body import ReviewMessageBody +from pydantic import ValidationError +from services.document_review_processor_service import ReviewProcessorService +from utils.audit_logging_setup import LoggingService +from utils.decorators.ensure_env_var import ensure_environment_variables +from utils.decorators.override_error_check import override_error_check +from utils.decorators.set_audit_arg import set_request_context_for_logging +from utils.request_context import request_context + +logger = LoggingService(__name__) + + +@set_request_context_for_logging +@ensure_environment_variables( + names=[ + "DOCUMENT_REVIEW_DYNAMODB_NAME", + "STAGING_STORE_BUCKET_NAME", + "PENDING_REVIEW_BUCKET_NAME", + ] +) +@override_error_check +def lambda_handler(event, context): + """ + This handler consumes SQS messages from the document review queue, creates DynamoDB + records in the DocumentReview table, and moves files from the staging bucket + to the pending review bucket. + + Args: + event: Lambda event containing SQS Event + context: Lambda context + + Returns: + None + """ + logger.info("Starting review processor Lambda") + + sqs_messages = event.get("Records", []) + review_service = ReviewProcessorService() + + for sqs_message in sqs_messages: + try: + message = ReviewMessageBody.model_validate_json(sqs_message["body"]) + + review_service.process_review_message(message) + + except ValidationError as error: + logger.error("Malformed review message") + logger.error(error) + raise error + + except Exception as error: + logger.error( + f"Failed to process review message: {str(error)}", + {"Result": "Review processing failed"}, + ) + raise error + + request_context.patient_nhs_no = "" + logger.info("Continuing to next message.") diff --git a/lambdas/models/document_review.py b/lambdas/models/document_review.py index 90cec0cfd..92ac10f85 100644 --- a/lambdas/models/document_review.py +++ b/lambdas/models/document_review.py @@ -1,5 +1,4 @@ import uuid -from typing import Optional from enums.document_review_status import DocumentReviewStatus from enums.metadata_field_names import DocumentReferenceMetadataFields @@ -45,5 +44,5 @@ class DocumentUploadReviewReference(BaseModel): ttl: int | None = Field( alias=str(DocumentReferenceMetadataFields.TTL.value), default=None ) - document_reference_id: str = Field(default=None) + document_reference_id: str | None = Field(default=None) document_snomed_code_type: str = Field(default=SnomedCodes.LLOYD_GEORGE.value.code) diff --git a/lambdas/models/sqs/review_message_body.py b/lambdas/models/sqs/review_message_body.py new file mode 100644 index 000000000..ef24c791f --- /dev/null +++ b/lambdas/models/sqs/review_message_body.py @@ -0,0 +1,21 @@ +from pydantic import BaseModel, Field + + +class ReviewMessageFile(BaseModel): + """Model for individual file in SQS message body from the document review queue.""" + + file_name: str + file_path: str = Field(description="Location in the staging bucket") + """Location in the staging bucket""" + + +class ReviewMessageBody(BaseModel): + """Model for SQS message body from the document review queue.""" + + upload_id: str + files: list[ReviewMessageFile] + nhs_number: str + failure_reason: str + upload_date: str + uploader_ods: str + current_gp: str diff --git a/lambdas/requirements/layers/requirements_core_lambda_layer.txt b/lambdas/requirements/layers/requirements_core_lambda_layer.txt index 60c35cd82..457396e67 100644 --- a/lambdas/requirements/layers/requirements_core_lambda_layer.txt +++ b/lambdas/requirements/layers/requirements_core_lambda_layer.txt @@ -1,7 +1,7 @@ PyJWT==2.8.0 PyYAML==6.0.1 -boto3==1.34.128 -botocore==1.34.128 +boto3==1.40.71 +botocore==1.40.71 charset-normalizer==3.2.0 cryptography==44.0.1 idna==3.7 diff --git a/lambdas/services/base/dynamo_service.py b/lambdas/services/base/dynamo_service.py index c96eded30..38f71f3fc 100644 --- a/lambdas/services/base/dynamo_service.py +++ b/lambdas/services/base/dynamo_service.py @@ -164,11 +164,27 @@ def query_table( return items - def create_item(self, table_name, item): + def create_item(self, table_name, item, key_name: str | None = None): + """ + Put an item into the specified DynamoDB table with a condition on the existence of the key. + Args: + table_name: Name of the DynamoDB table + item: The item to be inserted (as a dictionary) + key_name: The name of the key field to check existance for conditional put + Returns: + Response from the DynamoDB put_item operation + Raises: + ClientError: For AWS service errors (DynamoDB) + """ try: table = self.get_table(table_name) logger.info(f"Writing item to table: {table_name}") - table.put_item(Item=item) + if key_name: + return table.put_item( + Item=item, ConditionExpression=f"attribute_not_exists({key_name})" + ) + else: + return table.put_item(Item=item) except ClientError as e: logger.error( str(e), {"Result": f"Unable to write item to table: {table_name}"} diff --git a/lambdas/services/base/s3_service.py b/lambdas/services/base/s3_service.py index 5c6a690cb..f98fdea22 100644 --- a/lambdas/services/base/s3_service.py +++ b/lambdas/services/base/s3_service.py @@ -116,7 +116,16 @@ def copy_across_bucket( source_file_key: str, dest_bucket: str, dest_file_key: str, + if_none_match: str | None = None, ): + if if_none_match is not None: + return self.client.copy_object( + Bucket=dest_bucket, + Key=dest_file_key, + CopySource={"Bucket": source_bucket, "Key": source_file_key}, + IfNoneMatch=if_none_match, + StorageClass="INTELLIGENT_TIERING", + ) return self.client.copy_object( Bucket=dest_bucket, Key=dest_file_key, @@ -124,11 +133,15 @@ def copy_across_bucket( StorageClass="INTELLIGENT_TIERING", ) - def delete_object(self, s3_bucket_name: str, file_key: str, version_id: str | None = None): + def delete_object( + self, s3_bucket_name: str, file_key: str, version_id: str | None = None + ): if version_id is None: return self.client.delete_object(Bucket=s3_bucket_name, Key=file_key) - - return self.client.delete_object(Bucket=s3_bucket_name, Key=file_key, VersionId=version_id) + + return self.client.delete_object( + Bucket=s3_bucket_name, Key=file_key, VersionId=version_id + ) def create_object_tag( self, s3_bucket_name: str, file_key: str, tag_key: str, tag_value: str diff --git a/lambdas/services/document_review_processor_service.py b/lambdas/services/document_review_processor_service.py new file mode 100644 index 000000000..8d825b3c3 --- /dev/null +++ b/lambdas/services/document_review_processor_service.py @@ -0,0 +1,150 @@ +import os +from datetime import datetime, timezone + +from botocore.exceptions import ClientError +from enums.document_review_status import DocumentReviewStatus +from models.document_reference import DocumentReferenceMetadataFields +from models.document_review import ( + DocumentReviewFileDetails, + DocumentUploadReviewReference, +) +from models.sqs.review_message_body import ReviewMessageBody +from services.base.dynamo_service import DynamoDBService +from services.base.s3_service import S3Service +from utils.audit_logging_setup import LoggingService +from utils.request_context import request_context + +logger = LoggingService(__name__) + + +class ReviewProcessorService: + """ + Service for processing single SQS messages from the document review queue. + """ + + def __init__(self): + """Initialize the review processor service with required AWS services.""" + self.dynamo_service = DynamoDBService() + self.s3_service = S3Service() + + self.review_table_name = os.environ["DOCUMENT_REVIEW_DYNAMODB_NAME"] + self.staging_bucket_name = os.environ["STAGING_STORE_BUCKET_NAME"] + self.review_bucket_name = os.environ["PENDING_REVIEW_BUCKET_NAME"] + + def process_review_message(self, review_message: ReviewMessageBody) -> None: + """ + Process a single SQS message from the review queue. + + Args: + sqs_message: SQS message record containing file and failure information + + Raises: + InvalidMessageException: If message format is invalid or required fields missing + S3FileNotFoundException: If file doesn't exist in staging bucket + ClientError: For AWS service errors (DynamoDB, S3) + """ + + logger.info("Processing review queue message") + + request_context.patient_nhs_no = review_message.nhs_number + + review_id = review_message.upload_id + review_files = self._move_files_to_review_bucket(review_message, review_id) + document_upload_review = self._build_review_record( + review_message, review_id, review_files + ) + try: + self.dynamo_service.create_item( + table_name=self.review_table_name, + item=document_upload_review.model_dump( + by_alias=True, exclude_none=True + ), + key_name=DocumentReferenceMetadataFields.ID.value, + ) + + logger.info(f"Created review record {document_upload_review.id}") + except ClientError as e: + if e.response["Error"]["Code"] == "ConditionalCheckFailedException": + logger.info("Entry already exists on Document Review table") + else: + raise e + + self._delete_files_from_staging(review_message) + + def _build_review_record( + self, + message_data: ReviewMessageBody, + review_id: str, + review_files: list[DocumentReviewFileDetails], + ) -> DocumentUploadReviewReference: + return DocumentUploadReviewReference( + id=review_id, + nhs_number=message_data.nhs_number, + review_status=DocumentReviewStatus.PENDING_REVIEW, + review_reason=message_data.failure_reason, + author=message_data.uploader_ods, + custodian=message_data.current_gp, + files=review_files, + upload_date=int(datetime.now(tz=timezone.utc).timestamp()), + ) + + def _move_files_to_review_bucket( + self, message_data: ReviewMessageBody, review_record_id: str + ) -> list[DocumentReviewFileDetails]: + """ + Move file from staging to review bucket. + + Args: + message_data: Review queue message data + review_record_id: ID of the review record being created + + Returns: + List of DocumentReviewFileDetails with new file locations in review bucket + """ + new_file_keys: list[DocumentReviewFileDetails] = [] + + for file in message_data.files: + new_file_key = ( + f"{message_data.nhs_number}/{review_record_id}/{file.file_name}" + ) + + logger.info( + f"Copying file from ({file.file_path}) in staging to review bucket: {new_file_key}" + ) + try: + + self.s3_service.copy_across_bucket( + source_bucket=self.staging_bucket_name, + source_file_key=file.file_path, + dest_bucket=self.review_bucket_name, + dest_file_key=new_file_key, + if_none_match="*", + ) + logger.info("File successfully copied to review bucket") + logger.info(f"Successfully moved file to: {new_file_key}") + + except ClientError as e: + if e.response["Error"]["Code"] == "PreconditionFailed": + logger.info("File already exists in the Review Bucket") + else: + raise e + + new_file_keys.append( + DocumentReviewFileDetails( + file_name=file.file_name, + file_location=new_file_key, + ) + ) + + return new_file_keys + + def _delete_files_from_staging(self, message_data: ReviewMessageBody) -> None: + for file in message_data.files: + try: + logger.info(f"Deleting file from staging bucket: {file.file_path}") + self.s3_service.delete_object( + s3_bucket_name=self.staging_bucket_name, file_key=file.file_path + ) + except Exception as e: + logger.error(f"Error deleting files from staging: {str(e)}") + # Continue processing as files diff --git a/lambdas/services/document_upload_review_service.py b/lambdas/services/document_upload_review_service.py index 68375f67f..719adf9b0 100644 --- a/lambdas/services/document_upload_review_service.py +++ b/lambdas/services/document_upload_review_service.py @@ -17,11 +17,13 @@ class DocumentUploadReviewService(DocumentService): """Service for handling DocumentUploadReviewReference operations.""" + DEFAULT_QUERY_LIMIT = 50 + def __init__(self): super().__init__() self._table_name = os.environ.get("DOCUMENT_REVIEW_DYNAMODB_NAME") - self._s3_bucket = os.environ.get("DOCUMENT_REVIEW_S3_BUCKET_NAME") + self._s3_bucket = os.environ.get("PENDING_REVIEW_BUCKET_NAME") @property def table_name(self) -> str: diff --git a/lambdas/tests/unit/conftest.py b/lambdas/tests/unit/conftest.py index 505c503d3..50a48a676 100644 --- a/lambdas/tests/unit/conftest.py +++ b/lambdas/tests/unit/conftest.py @@ -138,10 +138,9 @@ MOCK_ALERTING_SLACK_CHANNEL_ID = "slack_channel_id" MOCK_DOCUMENT_REVIEW_TABLE = "test_document_review" MOCK_DOCUMENT_REVIEW_BUCKET = "test_document_review_bucket" -MOCK_EDGE_TABLE = "test_edge_reference_table" - MOCK_EDGE_REFERENCE_TABLE = "test_edge_reference_table" + @pytest.fixture def set_env(monkeypatch): monkeypatch.setenv("AWS_DEFAULT_REGION", REGION_NAME) @@ -230,12 +229,12 @@ def set_env(monkeypatch): monkeypatch.setenv("SLACK_CHANNEL_ID", MOCK_ALERTING_SLACK_CHANNEL_ID) monkeypatch.setenv("ITOC_TESTING_ODS_CODES", MOCK_ITOC_ODS_CODES) monkeypatch.setenv("DOCUMENT_REVIEW_DYNAMODB_NAME", MOCK_DOCUMENT_REVIEW_TABLE) - monkeypatch.setenv("DOCUMENT_REVIEW_S3_BUCKET_NAME", MOCK_DOCUMENT_REVIEW_BUCKET) - monkeypatch.setenv("EDGE_REFERENCE_TABLE", MOCK_EDGE_TABLE) + monkeypatch.setenv("PENDING_REVIEW_BUCKET_NAME", MOCK_DOCUMENT_REVIEW_BUCKET) monkeypatch.setenv("STAGING_STORE_BUCKET_NAME", MOCK_STAGING_STORE_BUCKET) monkeypatch.setenv("METADATA_SQS_QUEUE_URL", MOCK_LG_METADATA_SQS_QUEUE) monkeypatch.setenv("EDGE_REFERENCE_TABLE", MOCK_EDGE_REFERENCE_TABLE) + EXPECTED_PARSED_PATIENT_BASE_CASE = PatientDetails( givenName=["Jane"], familyName="Smith", diff --git a/lambdas/tests/unit/handlers/test_document_review_processor_handler.py b/lambdas/tests/unit/handlers/test_document_review_processor_handler.py new file mode 100644 index 000000000..11c6bbdea --- /dev/null +++ b/lambdas/tests/unit/handlers/test_document_review_processor_handler.py @@ -0,0 +1,233 @@ +import json + +import pytest +from handlers.document_review_processor_handler import lambda_handler +from models.sqs.review_message_body import ReviewMessageBody, ReviewMessageFile + + +@pytest.fixture +def mock_review_service(mocker): + """Mock the ReviewProcessorService.""" + mocked_class = mocker.patch( + "handlers.document_review_processor_handler.ReviewProcessorService" + ) + mocked_instance = mocked_class.return_value + return mocked_instance + + +@pytest.fixture +def sample_review_message_body(): + """Create a sample review message body.""" + return ReviewMessageBody( + upload_id="test-upload-id-123", + files=[ + ReviewMessageFile( + file_name="test_document.pdf", + file_path="staging/9000000009/test_document.pdf", + ) + ], + nhs_number="9000000009", + failure_reason="Failed virus scan", + upload_date="2024-01-15T10:30:00Z", + uploader_ods="Y12345", + current_gp="Y12345", + ) + + +@pytest.fixture +def sample_sqs_message(sample_review_message_body): + """Create a sample SQS message.""" + return { + "body": sample_review_message_body.model_dump_json(), + "eventSource": "aws:sqs", + "messageId": "test-message-id-1", + } + + +@pytest.fixture +def sample_sqs_event(sample_sqs_message): + """Create a sample SQS event with one message.""" + return {"Records": [sample_sqs_message]} + + +@pytest.fixture +def sample_sqs_event_multiple_messages(sample_review_message_body): + """Create a sample SQS event with multiple messages.""" + message_1 = ReviewMessageBody( + upload_id="test-upload-id-123", + files=[ + ReviewMessageFile( + file_name="document_1.pdf", + file_path="staging/9000000009/document_1.pdf", + ) + ], + nhs_number="9000000009", + failure_reason="Failed virus scan", + upload_date="2024-01-15T10:30:00Z", + uploader_ods="Y12345", + current_gp="Y12345", + ) + + message_2 = ReviewMessageBody( + upload_id="test-upload-id-456", + files=[ + ReviewMessageFile( + file_name="document_2.pdf", + file_path="staging/9000000010/document_2.pdf", + ) + ], + nhs_number="9000000010", + failure_reason="Invalid file format", + upload_date="2024-01-15T10:35:00Z", + uploader_ods="Y12345", + current_gp="Y12345", + ) + + message_3 = ReviewMessageBody( + upload_id="test-upload-id-789", + files=[ + ReviewMessageFile( + file_name="document_3.pdf", + file_path="staging/9000000011/document_3.pdf", + ) + ], + nhs_number="9000000011", + failure_reason="Missing metadata", + upload_date="2024-01-15T10:40:00Z", + uploader_ods="Y67890", + current_gp="Y67890", + ) + + return { + "Records": [ + { + "body": message_1.model_dump_json(), + "eventSource": "aws:sqs", + "messageId": "test-message-id-1", + }, + { + "body": message_2.model_dump_json(), + "eventSource": "aws:sqs", + "messageId": "test-message-id-2", + }, + { + "body": message_3.model_dump_json(), + "eventSource": "aws:sqs", + "messageId": "test-message-id-3", + }, + ] + } + + +@pytest.fixture +def empty_sqs_event(): + """Create an empty SQS event.""" + return {"Records": []} + + +def test_lambda_handler_processes_single_message_successfully( + set_env, + context, + sample_sqs_event, + mock_review_service, +): + """Test handler successfully processes a single SQS message.""" + lambda_handler(sample_sqs_event, context) + + mock_review_service.process_review_message.assert_called_once() + + +def test_lambda_handler_processes_multiple_messages_successfully( + set_env, + context, + sample_sqs_event_multiple_messages, + mock_review_service, +): + """Test handler successfully processes multiple SQS messages.""" + lambda_handler(sample_sqs_event_multiple_messages, context) + + assert mock_review_service.process_review_message.call_count == 3 + + +def test_lambda_handler_calls_service_with_correct_message( + set_env, + context, + sample_sqs_event, + mock_review_service, +): + """Test handler calls service with the correctly parsed message.""" + lambda_handler(sample_sqs_event, context) + + mock_review_service.process_review_message.assert_called_once() + + call_args = mock_review_service.process_review_message.call_args[0][0] + + assert type(call_args).__name__ == "ReviewMessageBody" + assert len(call_args.files) == 1 + assert call_args.files[0].file_name == "test_document.pdf" + assert call_args.nhs_number == "9000000009" + assert call_args.files[0].file_path == "staging/9000000009/test_document.pdf" + + +def test_lambda_handler_handles_empty_records_list( + set_env, context, empty_sqs_event, mock_review_service +): + """Test handler handles empty records list gracefully.""" + lambda_handler(empty_sqs_event, context) + + mock_review_service.process_review_message.assert_not_called() + + +def test_lambda_handler_parses_json_body_correctly( + set_env, + context, + mock_review_service, +): + """Test handler correctly parses JSON from message body.""" + event = { + "Records": [ + { + "body": json.dumps( + { + "upload_id": "test-upload-id-123", + "files": [ + {"file_name": "test.pdf", "file_path": "staging/test.pdf"} + ], + "nhs_number": "9000000009", + "failure_reason": "Test failure", + "upload_date": "2024-01-15T10:30:00Z", + "uploader_ods": "Y12345", + "current_gp": "Y12345", + } + ), + "eventSource": "aws:sqs", + } + ] + } + + lambda_handler(event, context) + + mock_review_service.process_review_message.assert_called_once() + call_args = mock_review_service.process_review_message.call_args[0][0] + assert type(call_args).__name__ == "ReviewMessageBody" + assert len(call_args.files) == 1 + assert call_args.files[0].file_name == "test.pdf" + + +def test_lambda_handler_calls_process_review_message_on_service( + set_env, + context, + sample_sqs_event, + sample_review_message_body, + mock_review_service, +): + """Test that handler calls process_review_message method on ReviewProcessorService.""" + lambda_handler(sample_sqs_event, context) + + mock_review_service.process_review_message.assert_called_once() + called_message = mock_review_service.process_review_message.call_args[0][0] + + assert isinstance(called_message, ReviewMessageBody) + assert called_message.upload_id == sample_review_message_body.upload_id + assert called_message.nhs_number == sample_review_message_body.nhs_number + assert called_message.failure_reason == sample_review_message_body.failure_reason diff --git a/lambdas/tests/unit/services/base/test_dynamo_service.py b/lambdas/tests/unit/services/base/test_dynamo_service.py index 1cb9f28d5..80fa1ea91 100755 --- a/lambdas/tests/unit/services/base/test_dynamo_service.py +++ b/lambdas/tests/unit/services/base/test_dynamo_service.py @@ -508,6 +508,29 @@ def test_create_item_raise_client_error(mock_service, mock_table): assert MOCK_CLIENT_ERROR == actual_response.value +def test_create_item_with_key_name(mock_service, mock_table): + item = {"NhsNumber": TEST_NHS_NUMBER, "Name": "Test Patient"} + key_name = "NhsNumber" + + mock_service.create_item(MOCK_TABLE_NAME, item, key_name) + mock_table.assert_called_with(MOCK_TABLE_NAME) + mock_table.return_value.put_item.assert_called_once_with( + Item=item, ConditionExpression=f"attribute_not_exists({key_name})" + ) + + +def test_create_item_raises_client_error(mock_service, mock_table): + item = {"NhsNumber": TEST_NHS_NUMBER, "Name": "Test Patient"} + key_name = "NhsNumber" + + mock_table.return_value.put_item.side_effect = MOCK_CLIENT_ERROR + + with pytest.raises(ClientError) as actual_response: + mock_service.create_item(MOCK_TABLE_NAME, item, key_name) + + assert MOCK_CLIENT_ERROR == actual_response.value + + def test_delete_item_is_called_with_correct_parameters(mock_service, mock_table): mock_service.delete_item(MOCK_TABLE_NAME, {"NhsNumber": TEST_NHS_NUMBER}) diff --git a/lambdas/tests/unit/services/base/test_s3_service.py b/lambdas/tests/unit/services/base/test_s3_service.py index 3d3d8b4ed..ead525f00 100755 --- a/lambdas/tests/unit/services/base/test_s3_service.py +++ b/lambdas/tests/unit/services/base/test_s3_service.py @@ -139,6 +139,26 @@ def test_copy_across_bucket(mock_service, mock_client): ) +def test_copy_across_bucket_if_none_match(mock_service, mock_client): + test_etag = '"abc123def456"' + + mock_service.copy_across_bucket( + source_bucket="bucket_to_copy_from", + source_file_key=TEST_FILE_KEY, + dest_bucket="bucket_to_copy_to", + dest_file_key=f"{TEST_NHS_NUMBER}/{TEST_UUID}", + if_none_match=test_etag, + ) + + mock_client.copy_object.assert_called_once_with( + Bucket="bucket_to_copy_to", + Key=f"{TEST_NHS_NUMBER}/{TEST_UUID}", + CopySource={"Bucket": "bucket_to_copy_from", "Key": TEST_FILE_KEY}, + IfNoneMatch=test_etag, + StorageClass="INTELLIGENT_TIERING", + ) + + def test_delete_object(mock_service, mock_client): mock_service.delete_object(s3_bucket_name=MOCK_BUCKET, file_key=TEST_FILE_NAME) @@ -511,30 +531,38 @@ def test_get_head_object_returns_metadata(mock_service, mock_client): result = mock_service.get_head_object(bucket=MOCK_BUCKET, key=TEST_FILE_KEY) assert result == mock_response - mock_client.head_object.assert_called_once_with(Bucket=MOCK_BUCKET, Key=TEST_FILE_KEY) + mock_client.head_object.assert_called_once_with( + Bucket=MOCK_BUCKET, Key=TEST_FILE_KEY + ) -def test_get_head_object_raises_client_error_when_object_not_found(mock_service, mock_client): +def test_get_head_object_raises_client_error_when_object_not_found( + mock_service, mock_client +): mock_error = ClientError( - {"Error": {"Code": "404", "Message": "Not Found"}}, - "HeadObject" + {"Error": {"Code": "404", "Message": "Not Found"}}, "HeadObject" ) mock_client.head_object.side_effect = mock_error with pytest.raises(ClientError): mock_service.get_head_object(bucket=MOCK_BUCKET, key=TEST_FILE_KEY) - mock_client.head_object.assert_called_once_with(Bucket=MOCK_BUCKET, Key=TEST_FILE_KEY) + mock_client.head_object.assert_called_once_with( + Bucket=MOCK_BUCKET, Key=TEST_FILE_KEY + ) -def test_get_head_object_raises_client_error_on_access_denied(mock_service, mock_client): +def test_get_head_object_raises_client_error_on_access_denied( + mock_service, mock_client +): mock_error = ClientError( - {"Error": {"Code": "403", "Message": "Forbidden"}}, - "HeadObject" + {"Error": {"Code": "403", "Message": "Forbidden"}}, "HeadObject" ) mock_client.head_object.side_effect = mock_error with pytest.raises(ClientError): mock_service.get_head_object(bucket=MOCK_BUCKET, key=TEST_FILE_KEY) - mock_client.head_object.assert_called_once_with(Bucket=MOCK_BUCKET, Key=TEST_FILE_KEY) + mock_client.head_object.assert_called_once_with( + Bucket=MOCK_BUCKET, Key=TEST_FILE_KEY + ) diff --git a/lambdas/tests/unit/services/test_document_review_processor_service.py b/lambdas/tests/unit/services/test_document_review_processor_service.py new file mode 100644 index 000000000..51fb3dac3 --- /dev/null +++ b/lambdas/tests/unit/services/test_document_review_processor_service.py @@ -0,0 +1,421 @@ +import pytest +from botocore.exceptions import ClientError +from enums.document_review_status import DocumentReviewStatus +from models.document_review import ( + DocumentReviewFileDetails, + DocumentUploadReviewReference, +) +from models.sqs.review_message_body import ReviewMessageBody, ReviewMessageFile +from services.document_review_processor_service import ReviewProcessorService + + +@pytest.fixture +def mock_dynamo_service(mocker): + """Mock the DynamoDBService.""" + return mocker.patch("services.document_review_processor_service.DynamoDBService") + + +@pytest.fixture +def mock_s3_service(mocker): + """Mock the S3Service.""" + return mocker.patch("services.document_review_processor_service.S3Service") + + +@pytest.fixture +def service_under_test(set_env, mock_dynamo_service, mock_s3_service): + """Create a ReviewProcessorService instance with mocked dependencies.""" + service = ReviewProcessorService() + return service + + +@pytest.fixture +def sample_review_message(): + """Create a sample review message.""" + return ReviewMessageBody( + upload_id="test-upload-id-123", + files=[ + ReviewMessageFile( + file_name="test_document.pdf", + file_path="staging/9000000009/test_document.pdf", + ) + ], + nhs_number="9000000009", + failure_reason="Failed virus scan", + upload_date="2024-01-15T10:30:00Z", + uploader_ods="Y12345", + current_gp="Y12345", + ) + + +def test_service_initializes_with_correct_environment_variables( + set_env, mock_dynamo_service, mock_s3_service +): + service = ReviewProcessorService() + + assert service.review_table_name == "test_document_review" + assert service.staging_bucket_name == "test_staging_bulk_store" + assert service.review_bucket_name == "test_document_review_bucket" + mock_dynamo_service.assert_called_once() + mock_s3_service.assert_called_once() + + +def test_process_review_message_success( + service_under_test, sample_review_message, mocker +): + """Test successful processing of a review message.""" + mock_move = mocker.patch.object(service_under_test, "_move_files_to_review_bucket") + mock_delete = mocker.patch.object(service_under_test, "_delete_files_from_staging") + + mock_move.return_value = [ + DocumentReviewFileDetails( + file_name="test_document.pdf", + file_location="9000000009/test-upload-id-123/test_document.pdf", + ) + ] + + service_under_test.process_review_message(sample_review_message) + + mock_move.assert_called_once() + service_under_test.dynamo_service.create_item.assert_called_once() + mock_delete.assert_called_once_with(sample_review_message) + + +def test_process_review_message_multiple_files(service_under_test, mocker): + """Test successful processing of a review message with multiple files.""" + message = ReviewMessageBody( + upload_id="test-upload-id-456", + files=[ + ReviewMessageFile( + file_name="document_1.pdf", + file_path="staging/9000000009/document_1.pdf", + ), + ReviewMessageFile( + file_name="document_2.pdf", + file_path="staging/9000000009/document_2.pdf", + ), + ], + nhs_number="9000000009", + failure_reason="Failed virus scan", + upload_date="2024-01-15T10:30:00Z", + uploader_ods="Y12345", + current_gp="Y12345", + ) + + mock_move = mocker.patch.object(service_under_test, "_move_files_to_review_bucket") + mock_delete = mocker.patch.object(service_under_test, "_delete_files_from_staging") + + mock_move.return_value = [ + DocumentReviewFileDetails( + file_name="document_1.pdf", + file_location="9000000009/test-upload-id-456/document_1.pdf", + ), + DocumentReviewFileDetails( + file_name="document_2.pdf", + file_location="9000000009/test-upload-id-456/document_2.pdf", + ), + ] + + service_under_test.process_review_message(message) + + mock_move.assert_called_once() + service_under_test.dynamo_service.create_item.assert_called_once() + mock_delete.assert_called_once_with(message) + + +def test_process_review_message_s3_copy_error( + service_under_test, sample_review_message, mocker +): + """Test processing fails when S3 copy operation fails.""" + mocker.patch.object( + service_under_test, + "_move_files_to_review_bucket", + side_effect=ClientError( + {"Error": {"Code": "NoSuchKey", "Message": "Source file not found"}}, + "CopyObject", + ), + ) + + with pytest.raises(ClientError): + service_under_test.process_review_message(sample_review_message) + + +def test_process_review_message_dynamo_error_not_precondition( + service_under_test, sample_review_message, mocker +): + """Test processing fails when DynamoDB put fails.""" + mocker.patch.object( + service_under_test, + "_move_files_to_review_bucket", + return_value=[ + DocumentReviewFileDetails( + file_name="document_1.pdf", + file_location="9000000009/test-upload-id-456/document_1.pdf", + ) + ], + ) + service_under_test.dynamo_service.create_item.side_effect = ClientError( + {"Error": {"Code": "InternalServerError", "Message": "DynamoDB error"}}, + "PutItem", + ) + + with pytest.raises(ClientError): + service_under_test.process_review_message(sample_review_message) + + +def test_process_review_message_continues_dynamo_conditional_check_failure( + service_under_test, sample_review_message, mocker +): + + mocker.patch.object( + service_under_test, + "_move_files_to_review_bucket", + return_value=[ + DocumentReviewFileDetails( + file_name="document_1.pdf", + file_location="9000000009/test-upload-id-456/document_1.pdf", + ) + ], + ) + mocker.patch.object(service_under_test, "_delete_files_from_staging") + service_under_test.dynamo_service.create_item.side_effect = ClientError( + { + "Error": { + "Code": "ConditionalCheckFailedException", + "Message": "DynamoDB error", + } + }, + "PutItem", + ) + + service_under_test.process_review_message(sample_review_message) + + service_under_test._delete_files_from_staging.assert_called() + + +# Tests for _build_review_record and _create_review_record methods + + +def test_build_review_record_success(service_under_test, sample_review_message): + """Test successful building of review record.""" + files = [ + DocumentReviewFileDetails( + file_name="test_document.pdf", + file_location="9000000009/test-review-id/test_document.pdf", + ) + ] + + result = service_under_test._build_review_record( + sample_review_message, "test-review-id", files + ) + + assert isinstance(result, DocumentUploadReviewReference) + assert result.id == "test-review-id" + assert result.nhs_number == "9000000009" + assert result.review_status == DocumentReviewStatus.PENDING_REVIEW + assert result.review_reason == "Failed virus scan" + assert result.author == "Y12345" + assert result.custodian == "Y12345" + assert len(result.files) == 1 + assert result.files[0].file_name == "test_document.pdf" + assert ( + result.files[0].file_location == "9000000009/test-review-id/test_document.pdf" + ) + + +def test_build_review_record_with_multiple_files(service_under_test): + """Test building review record with multiple files.""" + message = ReviewMessageBody( + upload_id="test-upload-id-789", + files=[ + ReviewMessageFile( + file_name="document_1.pdf", + file_path="staging/9000000009/document_1.pdf", + ), + ReviewMessageFile( + file_name="document_2.pdf", + file_path="staging/9000000009/document_2.pdf", + ), + ], + nhs_number="9000000009", + failure_reason="Failed virus scan", + upload_date="2024-01-15T10:30:00Z", + uploader_ods="Y12345", + current_gp="Y12345", + ) + + files = [ + DocumentReviewFileDetails( + file_name="document_1.pdf", + file_location="9000000009/test-review-id/document_1.pdf", + ), + DocumentReviewFileDetails( + file_name="document_2.pdf", + file_location="9000000009/test-review-id/document_2.pdf", + ), + ] + + result = service_under_test._build_review_record(message, "test-review-id", files) + + assert len(result.files) == 2 + assert result.files[0].file_name == "document_1.pdf" + assert result.files[1].file_name == "document_2.pdf" + + +# Tests for _move_files_to_review_bucket method + + +def test_move_files_success(service_under_test, sample_review_message): + """Test successful file move from staging to review bucket.""" + files = service_under_test._move_files_to_review_bucket( + sample_review_message, "test-review-id-123" + ) + + expected_key = "9000000009/test-review-id-123/test_document.pdf" + + assert len(files) == 1 + assert files[0].file_name == "test_document.pdf" + assert files[0].file_location == expected_key + + service_under_test.s3_service.copy_across_bucket.assert_called_once_with( + source_bucket="test_staging_bulk_store", + source_file_key="staging/9000000009/test_document.pdf", + dest_bucket="test_document_review_bucket", + dest_file_key=expected_key, + if_none_match="*", + ) + + +def test_move_multiple_files_success(service_under_test): + """Test successful move of multiple files.""" + message = ReviewMessageBody( + upload_id="test-upload-id-999", + files=[ + ReviewMessageFile( + file_name="document_1.pdf", + file_path="staging/9000000009/document_1.pdf", + ), + ReviewMessageFile( + file_name="document_2.pdf", + file_path="staging/9000000009/document_2.pdf", + ), + ], + nhs_number="9000000009", + failure_reason="Failed virus scan", + upload_date="2024-01-15T10:30:00Z", + uploader_ods="Y12345", + current_gp="Y12345", + ) + + files = service_under_test._move_files_to_review_bucket(message, "test-review-id") + + assert len(files) == 2 + assert files[0].file_name == "document_1.pdf" + assert files[0].file_location == "9000000009/test-review-id/document_1.pdf" + assert files[1].file_name == "document_2.pdf" + assert files[1].file_location == "9000000009/test-review-id/document_2.pdf" + + assert service_under_test.s3_service.copy_across_bucket.call_count == 2 + + +def test_move_files_copy_error(service_under_test, sample_review_message): + """Test file move handles S3 copy errors.""" + service_under_test.s3_service.copy_across_bucket.side_effect = ClientError( + {"Error": {"Code": "NoSuchKey", "Message": "Source not found"}}, + "CopyObject", + ) + + with pytest.raises(ClientError): + service_under_test._move_files_to_review_bucket( + sample_review_message, "test-review-id" + ) + + +def test_move_files_to_review_bucket_continues_file_already_exists_in_review_bucket( + service_under_test, sample_review_message +): + + service_under_test.s3_service.copy_across_bucket.side_effect = ClientError( + { + "Error": { + "Code": "PreconditionFailed", + "Message": "At least one of the pre-conditions you specified did not hold", + } + }, + "CopyObject", + ) + + service_under_test.process_review_message(sample_review_message) + service_under_test.dynamo_service.create_item.assert_called() + + +# Tests for _delete_files_from_staging method + + +def test_delete_from_staging_success(service_under_test, sample_review_message): + """Test successful deletion from staging bucket.""" + service_under_test._delete_files_from_staging(sample_review_message) + + service_under_test.s3_service.delete_object.assert_called_once_with( + s3_bucket_name="test_staging_bulk_store", + file_key="staging/9000000009/test_document.pdf", + ) + + +def test_delete_from_staging_handles_errors(service_under_test, sample_review_message): + """Test deletion from staging handles errors gracefully.""" + service_under_test.s3_service.delete_object.side_effect = ClientError( + {"Error": {"Code": "AccessDenied", "Message": "Access Denied"}}, + "DeleteObject", + ) + + # Should not raise exception - errors are caught and logged + service_under_test._delete_files_from_staging(sample_review_message) + + service_under_test.s3_service.delete_object.assert_called_once() + + +# Integration scenario tests + + +def test_full_workflow_with_valid_message(service_under_test, sample_review_message): + """Test complete workflow from message to final record creation.""" + service_under_test.dynamo_service.create_item.return_value = None + service_under_test.s3_service.copy_across_bucket.return_value = None + service_under_test.s3_service.delete_object.return_value = None + + service_under_test.process_review_message(sample_review_message) + + service_under_test.dynamo_service.create_item.assert_called_once() + service_under_test.s3_service.copy_across_bucket.assert_called_once() + service_under_test.s3_service.delete_object.assert_called_once() + + +def test_workflow_handles_multiple_different_patients(service_under_test): + """Test processing messages for different patients.""" + service_under_test.dynamo_service.create_item.return_value = None + service_under_test.s3_service.copy_across_bucket.return_value = None + service_under_test.s3_service.delete_object.return_value = None + + messages = [ + ReviewMessageBody( + upload_id=f"test-upload-id-{i}", + files=[ + ReviewMessageFile( + file_name=f"doc_{i}.pdf", + file_path=f"staging/900000000{i}/doc_{i}.pdf", + ) + ], + nhs_number=f"900000000{i}", + failure_reason="Test failure", + upload_date="2024-01-15T10:30:00Z", + uploader_ods="Y12345", + current_gp="Y12345", + ) + for i in range(1, 4) + ] + + for message in messages: + service_under_test.process_review_message(message) + + assert service_under_test.dynamo_service.create_item.call_count == 3 + assert service_under_test.s3_service.copy_across_bucket.call_count == 3 diff --git a/lambdas/tests/unit/services/test_get_document_review_service.py b/lambdas/tests/unit/services/test_get_document_review_service.py index 53396790f..7d2bb22e3 100644 --- a/lambdas/tests/unit/services/test_get_document_review_service.py +++ b/lambdas/tests/unit/services/test_get_document_review_service.py @@ -12,7 +12,7 @@ from services.get_document_review_service import GetDocumentReviewService from tests.unit.conftest import ( MOCK_DOCUMENT_REVIEW_BUCKET, - MOCK_EDGE_TABLE, + MOCK_EDGE_REFERENCE_TABLE, TEST_NHS_NUMBER, ) from utils.exceptions import DynamoServiceException @@ -210,7 +210,7 @@ def test_create_cloudfront_presigned_url(mock_service, mock_uuid, mocker): call_args = ( mock_service.document_review_service.dynamo_service.create_item.call_args ) - assert call_args[0][0] == MOCK_EDGE_TABLE + assert call_args[0][0] == MOCK_EDGE_REFERENCE_TABLE assert call_args[0][1]["ID"] == f"review/{mock_uuid}" assert call_args[0][1]["presignedUrl"] == TEST_PRESIGNED_URL_1 assert "TTL" in call_args[0][1] diff --git a/lambdas/utils/exceptions.py b/lambdas/utils/exceptions.py index 03a6faf66..a67ae9b2b 100644 --- a/lambdas/utils/exceptions.py +++ b/lambdas/utils/exceptions.py @@ -167,6 +167,7 @@ class FhirDocumentReferenceException(Exception): class TransactionConflictException(Exception): pass + class MigrationUnrecoverableException(Exception): def __init__(self, message: str, item_id: str): super().__init__(message) @@ -174,7 +175,8 @@ def __init__(self, message: str, item_id: str): self.item_id = item_id def to_dict(self): - return {"itemId": self.item_id, "message": self.message} + return {"itemId": self.item_id, "message": self.message} + class MigrationRetryableException(Exception): def __init__(self, message: str, segment_id: str): @@ -184,3 +186,19 @@ def __init__(self, message: str, segment_id: str): def to_dict(self): return {"segmentId": self.segment_id, "message": self.message} + + +class ReviewProcessVerifyingException(Exception): + pass + + +class ReviewProcessMovingException(Exception): + pass + + +class ReviewProcessDeleteException(Exception): + pass + + +class ReviewProcessCreateRecordException(Exception): + pass