-
Notifications
You must be signed in to change notification settings - Fork 1
[PRMP-585] Create ReviewProcessor lambda logic #846
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
lillie-dae
wants to merge
52
commits into
main
Choose a base branch
from
PRMP-585
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
52 commits
Select commit
Hold shift + click to select a range
1d620e8
[PRMP-585] Create ReviewProcessor lambda logic
lillie-dae 0f85182
[PRMP-585] minor fix
lillie-dae 225bd3f
[PRMP-585] minor fixes
lillie-dae 335bc2d
[PRMP-585] added support for multiple files in sqs message
lillie-dae e118f3b
[PRMP-585] code review comment improvements
lillie-dae 6733274
[PRMP-585] Code review comment changes
lillie-dae c32d4a8
[PRMP-585] required changes to match terraform
lillie-dae 6a60022
[PRPM-585] minor fixes
lillie-dae c505e2f
[PRMP-585] exclude none
lillie-dae cc6cba2
[PRMP-585] enhance message checks
lillie-dae 0a60c71
fix test
lillie-dae 58111a8
[PRMP-585] remove unsed import
lillie-dae eb9c4d4
Code comment changes
lillie-dae dc81e95
code comment changes
lillie-dae 7e1585e
missed tests
lillie-dae 763aca5
[PRMP-585] bump boto3 version
steph-torres-nhs a934f81
[PRMP-585] add types
steph-torres-nhs b425824
[PRMP-585] review processor handles if object already exists in revie…
steph-torres-nhs be312f8
[PRMP-585] Create ReviewProcessor lambda logic
lillie-dae 027c96c
[PRMP-585] minor fix
lillie-dae 6971e63
[PRMP-585] minor fixes
lillie-dae dfb0b43
[PRMP-585] added support for multiple files in sqs message
lillie-dae f3b3cdc
[PRMP-585] code review comment improvements
lillie-dae 714ebfa
[PRMP-585] Code review comment changes
lillie-dae 97ab571
[PRMP-585] required changes to match terraform
lillie-dae 058aa47
[PRPM-585] minor fixes
lillie-dae 1557d62
[PRMP-585] exclude none
lillie-dae 4510521
[PRMP-585] enhance message checks
lillie-dae 300ab46
fix test
lillie-dae 15daf3f
[PRMP-585] remove unsed import
lillie-dae ab75868
Code comment changes
lillie-dae 2dc220c
code comment changes
lillie-dae a94a6f3
missed tests
lillie-dae 6f58e84
[PRMP-585] bump boto3 version
steph-torres-nhs 0247b4f
[PRMP-585] add types
steph-torres-nhs 64d5425
[PRMP-585] review processor handles if object already exists in revie…
steph-torres-nhs e237600
[PRMP-585] merge origin and pull in main
steph-torres-nhs 6326457
[PRMP-585] add log message for file having already been copied to rev…
steph-torres-nhs 2ba9645
[PRMP-585] add dynamo conditional failure handling
steph-torres-nhs 8785aa0
Merge branch 'main' into PRMP-585
steph-torres-nhs fd39504
[PRMP-585] remove unnecessary pass statements
steph-torres-nhs 11739b3
[PRMP-585] remove requirement
steph-torres-nhs 54cbeae
Merge branch 'main' into PRMP-585
NogaNHS dd125ab
[PRMP-585] fix merge conflict
NogaNHS 7d56554
[PRMP-585] Update mock environment variables
NogaNHS 1841958
[PRMP-585] change to test_document_upload_review_service
NogaNHS 2e8f673
Update environment variable for pending review bucket in document upl…
NogaNHS 13aae2b
Merge branch 'main' into PRMP-585
NogaNHS 354a3c0
format
NogaNHS dc746bf
Rename MOCK_EDGE_TABLE to MOCK_EDGE_REFERENCE_TABLE in test_get_docum…
NogaNHS 9551215
Merge branch 'main' into PRMP-585
NogaNHS d74d3af
format
NogaNHS File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| from models.sqs.review_message_body import ReviewMessageBody | ||
| from pydantic import ValidationError | ||
| from services.document_review_processor_service import ReviewProcessorService | ||
| from utils.audit_logging_setup import LoggingService | ||
| from utils.decorators.ensure_env_var import ensure_environment_variables | ||
| from utils.decorators.override_error_check import override_error_check | ||
| from utils.decorators.set_audit_arg import set_request_context_for_logging | ||
| from utils.request_context import request_context | ||
|
|
||
| logger = LoggingService(__name__) | ||
|
|
||
|
|
||
| @set_request_context_for_logging | ||
| @ensure_environment_variables( | ||
| names=[ | ||
| "DOCUMENT_REVIEW_DYNAMODB_NAME", | ||
| "STAGING_STORE_BUCKET_NAME", | ||
| "PENDING_REVIEW_BUCKET_NAME", | ||
| ] | ||
| ) | ||
| @override_error_check | ||
| def lambda_handler(event, context): | ||
| """ | ||
| This handler consumes SQS messages from the document review queue, creates DynamoDB | ||
| records in the DocumentReview table, and moves files from the staging bucket | ||
| to the pending review bucket. | ||
|
|
||
| Args: | ||
| event: Lambda event containing SQS Event | ||
| context: Lambda context | ||
|
|
||
| Returns: | ||
| None | ||
| """ | ||
| logger.info("Starting review processor Lambda") | ||
|
|
||
| sqs_messages = event.get("Records", []) | ||
| review_service = ReviewProcessorService() | ||
|
|
||
| for sqs_message in sqs_messages: | ||
| try: | ||
| message = ReviewMessageBody.model_validate_json(sqs_message["body"]) | ||
|
|
||
| review_service.process_review_message(message) | ||
|
|
||
lillie-dae marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| except ValidationError as error: | ||
| logger.error("Malformed review message") | ||
| logger.error(error) | ||
| raise error | ||
|
|
||
| except Exception as error: | ||
| logger.error( | ||
| f"Failed to process review message: {str(error)}", | ||
| {"Result": "Review processing failed"}, | ||
| ) | ||
| raise error | ||
steph-torres-nhs marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| request_context.patient_nhs_no = "" | ||
| logger.info("Continuing to next message.") | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| from pydantic import BaseModel, Field | ||
|
|
||
|
|
||
| class ReviewMessageFile(BaseModel): | ||
| """Model for individual file in SQS message body from the document review queue.""" | ||
|
|
||
| file_name: str | ||
| file_path: str = Field(description="Location in the staging bucket") | ||
| """Location in the staging bucket""" | ||
lillie-dae marked this conversation as resolved.
Show resolved
Hide resolved
NogaNHS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| class ReviewMessageBody(BaseModel): | ||
| """Model for SQS message body from the document review queue.""" | ||
|
|
||
| upload_id: str | ||
| files: list[ReviewMessageFile] | ||
| nhs_number: str | ||
| failure_reason: str | ||
| upload_date: str | ||
| uploader_ods: str | ||
| current_gp: str | ||
4 changes: 2 additions & 2 deletions
4
lambdas/requirements/layers/requirements_core_lambda_layer.txt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,150 @@ | ||
| import os | ||
| from datetime import datetime, timezone | ||
|
|
||
| from botocore.exceptions import ClientError | ||
| from enums.document_review_status import DocumentReviewStatus | ||
| from models.document_reference import DocumentReferenceMetadataFields | ||
| from models.document_review import ( | ||
| DocumentReviewFileDetails, | ||
| DocumentUploadReviewReference, | ||
| ) | ||
| from models.sqs.review_message_body import ReviewMessageBody | ||
| from services.base.dynamo_service import DynamoDBService | ||
| from services.base.s3_service import S3Service | ||
| from utils.audit_logging_setup import LoggingService | ||
| from utils.request_context import request_context | ||
|
|
||
| logger = LoggingService(__name__) | ||
|
|
||
|
|
||
| class ReviewProcessorService: | ||
| """ | ||
| Service for processing single SQS messages from the document review queue. | ||
| """ | ||
|
|
||
| def __init__(self): | ||
| """Initialize the review processor service with required AWS services.""" | ||
| self.dynamo_service = DynamoDBService() | ||
| self.s3_service = S3Service() | ||
|
|
||
| self.review_table_name = os.environ["DOCUMENT_REVIEW_DYNAMODB_NAME"] | ||
| self.staging_bucket_name = os.environ["STAGING_STORE_BUCKET_NAME"] | ||
| self.review_bucket_name = os.environ["PENDING_REVIEW_BUCKET_NAME"] | ||
|
|
||
| def process_review_message(self, review_message: ReviewMessageBody) -> None: | ||
| """ | ||
| Process a single SQS message from the review queue. | ||
|
|
||
| Args: | ||
| sqs_message: SQS message record containing file and failure information | ||
|
|
||
| Raises: | ||
| InvalidMessageException: If message format is invalid or required fields missing | ||
| S3FileNotFoundException: If file doesn't exist in staging bucket | ||
| ClientError: For AWS service errors (DynamoDB, S3) | ||
| """ | ||
|
|
||
| logger.info("Processing review queue message") | ||
|
|
||
| request_context.patient_nhs_no = review_message.nhs_number | ||
|
|
||
| review_id = review_message.upload_id | ||
| review_files = self._move_files_to_review_bucket(review_message, review_id) | ||
| document_upload_review = self._build_review_record( | ||
| review_message, review_id, review_files | ||
| ) | ||
| try: | ||
| self.dynamo_service.create_item( | ||
| table_name=self.review_table_name, | ||
| item=document_upload_review.model_dump( | ||
| by_alias=True, exclude_none=True | ||
| ), | ||
| key_name=DocumentReferenceMetadataFields.ID.value, | ||
| ) | ||
|
|
||
| logger.info(f"Created review record {document_upload_review.id}") | ||
| except ClientError as e: | ||
| if e.response["Error"]["Code"] == "ConditionalCheckFailedException": | ||
| logger.info("Entry already exists on Document Review table") | ||
| else: | ||
| raise e | ||
|
|
||
| self._delete_files_from_staging(review_message) | ||
|
|
||
| def _build_review_record( | ||
| self, | ||
| message_data: ReviewMessageBody, | ||
| review_id: str, | ||
| review_files: list[DocumentReviewFileDetails], | ||
| ) -> DocumentUploadReviewReference: | ||
| return DocumentUploadReviewReference( | ||
| id=review_id, | ||
| nhs_number=message_data.nhs_number, | ||
| review_status=DocumentReviewStatus.PENDING_REVIEW, | ||
| review_reason=message_data.failure_reason, | ||
| author=message_data.uploader_ods, | ||
| custodian=message_data.current_gp, | ||
| files=review_files, | ||
| upload_date=int(datetime.now(tz=timezone.utc).timestamp()), | ||
| ) | ||
|
|
||
| def _move_files_to_review_bucket( | ||
| self, message_data: ReviewMessageBody, review_record_id: str | ||
| ) -> list[DocumentReviewFileDetails]: | ||
| """ | ||
| Move file from staging to review bucket. | ||
|
|
||
| Args: | ||
| message_data: Review queue message data | ||
| review_record_id: ID of the review record being created | ||
|
|
||
| Returns: | ||
| List of DocumentReviewFileDetails with new file locations in review bucket | ||
| """ | ||
| new_file_keys: list[DocumentReviewFileDetails] = [] | ||
|
|
||
| for file in message_data.files: | ||
| new_file_key = ( | ||
| f"{message_data.nhs_number}/{review_record_id}/{file.file_name}" | ||
| ) | ||
|
|
||
| logger.info( | ||
| f"Copying file from ({file.file_path}) in staging to review bucket: {new_file_key}" | ||
| ) | ||
| try: | ||
|
|
||
| self.s3_service.copy_across_bucket( | ||
| source_bucket=self.staging_bucket_name, | ||
| source_file_key=file.file_path, | ||
| dest_bucket=self.review_bucket_name, | ||
| dest_file_key=new_file_key, | ||
| if_none_match="*", | ||
| ) | ||
| logger.info("File successfully copied to review bucket") | ||
| logger.info(f"Successfully moved file to: {new_file_key}") | ||
|
|
||
| except ClientError as e: | ||
| if e.response["Error"]["Code"] == "PreconditionFailed": | ||
| logger.info("File already exists in the Review Bucket") | ||
| else: | ||
| raise e | ||
|
|
||
| new_file_keys.append( | ||
| DocumentReviewFileDetails( | ||
| file_name=file.file_name, | ||
| file_location=new_file_key, | ||
| ) | ||
| ) | ||
|
|
||
| return new_file_keys | ||
|
|
||
| def _delete_files_from_staging(self, message_data: ReviewMessageBody) -> None: | ||
| for file in message_data.files: | ||
| try: | ||
| logger.info(f"Deleting file from staging bucket: {file.file_path}") | ||
| self.s3_service.delete_object( | ||
| s3_bucket_name=self.staging_bucket_name, file_key=file.file_path | ||
| ) | ||
| except Exception as e: | ||
| logger.error(f"Error deleting files from staging: {str(e)}") | ||
| # Continue processing as files |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.