Skip to content

Commit d3c15f0

Browse files
committed
[PRMP-585] added support for multiple files in sqs message
1 parent 9a4a622 commit d3c15f0

File tree

5 files changed

+338
-273
lines changed

5 files changed

+338
-273
lines changed
Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,25 @@
11
import json
2+
3+
from pydantic import ValidationError
24
from lambdas.models.sqs.review_message_body import ReviewMessageBody
35
from lambdas.services.review_processor_service import ReviewProcessorService
46
from utils.audit_logging_setup import LoggingService
57
from utils.decorators.ensure_env_var import ensure_environment_variables
6-
from utils.decorators.handle_lambda_exceptions import handle_lambda_exceptions
78
from utils.decorators.override_error_check import override_error_check
89
from utils.decorators.set_audit_arg import set_request_context_for_logging
9-
from utils.decorators.validate_sqs_message_event import validate_sqs_event
10-
from utils.lambda_response import ApiGatewayResponse
1110

1211
logger = LoggingService(__name__)
1312

1413

1514
@set_request_context_for_logging
16-
@override_error_check
1715
@ensure_environment_variables(
1816
names=[
1917
"DOCUMENT_REVIEW_DYNAMODB_NAME",
2018
"STAGING_STORE_BUCKET_NAME",
2119
"PENDING_REVIEW_BUCKET_NAME",
2220
]
2321
)
24-
@handle_lambda_exceptions
25-
@validate_sqs_event
22+
@override_error_check
2623
def lambda_handler(event, context):
2724
"""
2825
This handler consumes SQS messages from the document review queue, creates DynamoDB
@@ -34,7 +31,7 @@ def lambda_handler(event, context):
3431
_context: Lambda context
3532
3633
Returns:
37-
ApiGatewayResponse with processing status
34+
None
3835
"""
3936
logger.info("Starting review processor Lambda")
4037

@@ -47,23 +44,22 @@ def lambda_handler(event, context):
4744
for sqs_message in sqs_messages:
4845
try:
4946
sqs_message_body = json.loads(sqs_message["body"])
50-
message = ReviewMessageBody.model_validate(sqs_message_body)
47+
message: ReviewMessageBody = ReviewMessageBody.model_validate(sqs_message_body)
5148

5249
review_service.process_review_message(message)
5350
processed_count += 1
51+
except ValidationError as error:
52+
logger.error("Malformed review message")
53+
logger.error(error)
54+
5455
except Exception as e:
5556
logger.error(
5657
f"Failed to process review message: {str(e)}",
5758
{"Result": "Review processing failed"},
5859
)
5960
failed_count += 1
61+
logger.info("Continuing to next message.")
6062

6163
logger.info(
6264
f"Review processor completed: {processed_count} processed, {failed_count} failed"
6365
)
64-
65-
return ApiGatewayResponse(
66-
status_code=200,
67-
body=f"Processed {processed_count} messages",
68-
methods="GET",
69-
).create_api_gateway_response()

lambdas/models/sqs/review_message_body.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
from pydantic import BaseModel
22

33

4-
class ReviewMessageBody(BaseModel):
5-
"""Model for SQS message body from the document review queue."""
4+
class ReviewMessageFile(BaseModel):
5+
"""Model for individual file in SQS message body from the document review queue."""
66

77
file_name: str
88
file_path: str
99
"""Location in the staging bucket"""
10+
11+
class ReviewMessageBody(BaseModel):
12+
"""Model for SQS message body from the document review queue."""
13+
14+
files: list[ReviewMessageFile]
1015
nhs_number: str
1116
failure_reason: str
1217
upload_date: str
Lines changed: 53 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
from datetime import datetime, timezone
3+
import uuid
34

45
from enums.review_status import ReviewStatus
56
from models.document_review import DocumentReviewFileDetails, DocumentsUploadReview
@@ -43,13 +44,17 @@ def process_review_message(self, review_message: ReviewMessageBody) -> None:
4344

4445
request_context.patient_nhs_no = review_message.nhs_number
4546

46-
logger.info(f"Processing review for NHS: {review_message.nhs_number}, File: {review_message.file_name}")
47+
logger.info(f"Processing review for NHS: {review_message.nhs_number} with {len(review_message.files)} files")
4748

48-
self._verify_file_exists_in_staging(review_message.file_path)
49-
document_upload_review = self._create_review_record(review_message)
49+
for file in review_message.files:
50+
logger.info(f"Processing review file: {file.file_name}")
51+
self._verify_file_exists_in_staging(file.file_path)
5052

51-
new_file_key = self._move_file_to_review_bucket(review_message, document_upload_review.id)
52-
self._update_review_record_with_file_location(document_upload_review.id, new_file_key)
53+
review_id = uuid.uuid4().hex
54+
files = self._move_files_to_review_bucket(review_message, review_id)
55+
document_upload_review = self._build_review_record(review_message, review_id, files)
56+
57+
self._create_review_record(document_upload_review)
5358

5459
logger.info(
5560
f"Successfully processed review for {review_message.nhs_number}",
@@ -80,89 +85,64 @@ def _verify_file_exists_in_staging(self, file_path: str) -> None:
8085
logger.error(f"Error checking file in staging bucket: {str(e)}")
8186
raise
8287

83-
def _create_review_record(self, message_data: ReviewMessageBody) -> DocumentsUploadReview:
84-
"""
85-
Create a new review record in DynamoDB.
86-
87-
Args:
88-
message_data: Validated review queue message data
89-
90-
Returns:
91-
Created DocumentsUploadReview object
92-
93-
Raises:
94-
ClientError: If DynamoDB create operation fails
95-
"""
96-
try:
97-
files = [DocumentReviewFileDetails(
98-
file_name=message_data.file_name,
99-
file_location=message_data.file_path
100-
)]
101-
102-
document_review = DocumentsUploadReview(
103-
nhs_number=message_data.nhs_number,
104-
upload_date=int(datetime.fromisoformat(message_data.upload_date).replace(tzinfo=timezone.utc).timestamp()),
105-
review_status=ReviewStatus.PENDING_REVIEW,
106-
review_reason=message_data.failure_reason,
107-
author=message_data.uploader_ods,
108-
custodian=message_data.current_gp,
109-
files=files,
110-
)
111-
112-
self.dynamo_service.create_item(
113-
table_name=self.review_table_name,
114-
item=document_review.model_dump(by_alias=True, exclude_none=True),
115-
)
116-
117-
logger.info(
118-
f"Created review record in DynamoDB with ID: {document_review.id}",
119-
{"Result": "DynamoDB record created"},
120-
)
121-
122-
return document_review
123-
124-
except Exception as e:
125-
logger.error(f"Failed to create DynamoDB record: {str(e)}")
126-
raise
88+
def _build_review_record(
89+
self, message_data: ReviewMessageBody, review_id: str, files: list[DocumentReviewFileDetails]
90+
) -> DocumentsUploadReview:
91+
return DocumentsUploadReview(
92+
id=review_id,
93+
nhs_number=message_data.nhs_number,
94+
review_status=ReviewStatus.PENDING_REVIEW,
95+
review_reason=message_data.failure_reason,
96+
author=message_data.uploader_ods,
97+
custodian=message_data.current_gp,
98+
files=files,
99+
upload_date=int(datetime.now(tz=timezone.utc).timestamp())
100+
)
127101

128-
def _move_file_to_review_bucket(self, message_data: ReviewMessageBody, review_record_id: str) -> str:
102+
def _move_files_to_review_bucket(
103+
self, message_data: ReviewMessageBody, review_record_id: str
104+
) -> list[DocumentReviewFileDetails]:
129105
"""
130106
Move file from staging to review bucket.
131107
132108
Args:
133109
message_data: Review queue message data
134-
review_record_id: ID of the review record (used in destination path)
110+
review_record_id: ID of the review record being created
135111
136112
Returns:
137-
New file key in review bucket
138-
139-
Raises:
140-
ClientError: If S3 copy or delete operations fail
113+
List of DocumentReviewFileDetails objects for the moved files
141114
"""
115+
moved_files = []
142116
try:
143-
new_file_key = f"{message_data.nhs_number}/{review_record_id}/{message_data.file_name}"
117+
for file in message_data.files:
118+
new_file_key = f"{message_data.nhs_number}/{review_record_id}/{file.file_name}"
144119

145-
logger.info(f"Copying file from ({message_data.file_path}) in staging to review bucket: {new_file_key}")
120+
logger.info(f"Copying file from ({file.file_path}) in staging to review bucket: {new_file_key}")
146121

147-
self.s3_service.copy_across_bucket(
148-
source_bucket=self.staging_bucket_name,
149-
source_file_key=message_data.file_path,
150-
dest_bucket=self.review_bucket_name,
151-
dest_file_key=new_file_key,
152-
)
122+
self.s3_service.copy_across_bucket(
123+
source_bucket=self.staging_bucket_name,
124+
source_file_key=file.file_path,
125+
dest_bucket=self.review_bucket_name,
126+
dest_file_key=new_file_key,
127+
)
153128

154-
logger.info("File successfully copied to review bucket")
155-
logger.info(f"Deleting file from staging bucket: {message_data.file_path}")
129+
logger.info("File successfully copied to review bucket")
130+
logger.info(f"Deleting file from staging bucket: {file.file_path}")
156131

157-
self._delete_from_staging(message_data.file_path)
158-
logger.info(f"Successfully moved file to: {new_file_key}")
132+
self._delete_from_staging(file.file_path)
133+
logger.info(f"Successfully moved file to: {new_file_key}")
159134

160-
return new_file_key
135+
moved_files.append(DocumentReviewFileDetails(
136+
file_name=file.file_name,
137+
file_location=new_file_key
138+
))
161139

162140
except Exception as e:
163141
logger.error(f"Failed to move file: {str(e)}")
164142
raise
165143

144+
return moved_files
145+
166146
def _delete_from_staging(self, file_key: str) -> None:
167147
try:
168148
self.s3_service.delete_object(s3_bucket_name=self.staging_bucket_name, file_key=file_key)
@@ -173,17 +153,15 @@ def _delete_from_staging(self, file_key: str) -> None:
173153
logger.error(f"Error deleting file from staging: {str(e)}")
174154
raise
175155

176-
def _update_review_record_with_file_location(self, review_record_id: str, review_bucket_path: str) -> None:
156+
def _create_review_record(self, review_record: DocumentsUploadReview) -> None:
177157
try:
178-
self.dynamo_service.update_item(
158+
self.dynamo_service.create_item(
179159
table_name=self.review_table_name,
180-
key_pair={"ID": review_record_id},
181-
updated_fields={"ReviewBucketPath": review_bucket_path},
160+
item=review_record
182161
)
183162

184-
logger.info(f"Updated review record {review_record_id} with file location: {review_bucket_path}")
163+
logger.info(f"Created review record {review_record.id}")
185164

186165
except Exception as e:
187-
logger.error(f"Failed to update review record with file location: {str(e)}")
188-
logger.warning("Review record created but file location not updated in DynamoDB")
166+
logger.error(f"Failed to create review record with id: {review_record.id} -- {str(e)}")
189167
raise

0 commit comments

Comments
 (0)