Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion batch_processor_filter/src/batch_processor_filter_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ def apply_filter(self, batch_file_created_event: BatchFileCreatedEvent) -> None:
supplier = batch_file_created_event["supplier"]
vaccine_type = batch_file_created_event["vaccine_type"]

logger.info("Received batch file event for filename: %s with message id: %s", filename, message_id)

if self._is_duplicate_file(filename):
# Mark as processed and return without error so next event will be picked up from queue
logger.error("A duplicate file has already been processed. Filename: %s", filename)
Expand All @@ -52,12 +54,12 @@ def apply_filter(self, batch_file_created_event: BatchFileCreatedEvent) -> None:
raise EventAlreadyProcessingForSupplierAndVaccTypeError(f"Batch event already processing for supplier: "
f"{supplier} and vacc type: {vaccine_type}")

self._batch_audit_repository.update_status(message_id, FileStatus.PROCESSING)
self._queue_client.send_message(
QueueUrl=QUEUE_URL,
MessageBody=json.dumps(batch_file_created_event),
MessageGroupId=f"{supplier}_{vaccine_type}"
)
self._batch_audit_repository.update_status(message_id, FileStatus.PROCESSING)

successful_log_message = f"File forwarded for processing by ECS. Filename: {filename}"
logger.info(successful_log_message)
Expand Down
49 changes: 37 additions & 12 deletions batch_processor_filter/tests/test_lambda_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import boto3
import copy
from unittest import TestCase
from unittest.mock import patch, Mock, ANY
from unittest.mock import patch, Mock, ANY, call

import botocore
from moto import mock_aws
Expand Down Expand Up @@ -206,10 +206,17 @@ def test_lambda_handler_raises_error_when_event_already_processing_for_supplier_
sqs_messages = sqs_client.receive_message(QueueUrl=self.mock_queue_url)
self.assertEqual(sqs_messages.get("Messages", []), [])

self.mock_logger.info.assert_called_once_with(
"Batch event already processing for supplier and vacc type. Filename: %s",
"Menacwy_Vaccinations_v5_TEST_20250826T15003000.csv"
)
self.mock_logger.info.assert_has_calls([
call(
"Received batch file event for filename: %s with message id: %s",
"Menacwy_Vaccinations_v5_TEST_20250826T15003000.csv",
"3b60c4f7-ef67-43c7-8f0d-4faee04d7d0e"
),
call(
"Batch event already processing for supplier and vacc type. Filename: %s",
"Menacwy_Vaccinations_v5_TEST_20250826T15003000.csv"
)
])

def test_lambda_handler_processes_event_successfully(self):
"""Should update the audit entry status to Processing and forward to SQS"""
Expand All @@ -225,11 +232,20 @@ def test_lambda_handler_processes_event_successfully(self):
self.assertEqual(len(sqs_messages.get("Messages", [])), 1)
self.assertDictEqual(json.loads(sqs_messages["Messages"][0]["Body"]), dict(self.default_batch_file_event))

expected_log_message = (f"File forwarded for processing by ECS. Filename: "
f"{self.default_batch_file_event['filename']}")
self.mock_logger.info.assert_called_once_with(expected_log_message)
expected_success_log_message = (f"File forwarded for processing by ECS. Filename: "
f"{self.default_batch_file_event['filename']}")
self.mock_logger.info.assert_has_calls([
call(
"Received batch file event for filename: %s with message id: %s",
"Menacwy_Vaccinations_v5_TEST_20250820T10210000.csv",
"df0b745c-b8cb-492c-ba84-8ea28d9f51d5"
),
call(
expected_success_log_message
)
])
self.mock_firehose_send_log.assert_called_once_with(
{**self.default_batch_file_event, "message": expected_log_message}
{**self.default_batch_file_event, "message": expected_success_log_message}
)

def test_lambda_handler_processes_event_successfully_when_event_for_same_supplier_and_vacc_already_processed(self):
Expand All @@ -256,8 +272,17 @@ def test_lambda_handler_processes_event_successfully_when_event_for_same_supplie
self.assertEqual(len(sqs_messages.get("Messages", [])), 1)
self.assertDictEqual(json.loads(sqs_messages["Messages"][0]["Body"]), dict(test_event))

expected_log_message = f"File forwarded for processing by ECS. Filename: {test_event['filename']}"
self.mock_logger.info.assert_called_once_with(expected_log_message)
expected_success_log_message = f"File forwarded for processing by ECS. Filename: {test_event['filename']}"
self.mock_logger.info.assert_has_calls([
call(
"Received batch file event for filename: %s with message id: %s",
"Menacwy_Vaccinations_v5_TEST_20250826T15003000.csv",
"3b60c4f7-ef67-43c7-8f0d-4faee04d7d0e"
),
call(
expected_success_log_message
)
])
self.mock_firehose_send_log.assert_called_once_with(
{**test_event, "message": expected_log_message}
{**test_event, "message": expected_success_log_message}
)
6 changes: 3 additions & 3 deletions filenameprocessor/src/file_name_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ def handle_record(record) -> dict:
permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier)

queue_name = f"{supplier}_{vaccine_type}"
make_and_send_sqs_message(
file_key, message_id, permissions, vaccine_type, supplier, created_at_formatted_string
)
upsert_audit_table(
message_id, file_key, created_at_formatted_string, expiry_timestamp, queue_name, FileStatus.QUEUED
)
make_and_send_sqs_message(
file_key, message_id, permissions, vaccine_type, supplier, created_at_formatted_string
)

logger.info("Lambda invocation successful for file '%s'", file_key)

Expand Down
9 changes: 4 additions & 5 deletions filenameprocessor/tests/test_lambda_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from boto3 import client as boto3_client
from moto import mock_s3, mock_sqs, mock_firehose, mock_dynamodb

from errors import UnhandledSqsError
from tests.utils_for_tests.generic_setup_and_teardown import GenericSetUp, GenericTearDown
from tests.utils_for_tests.utils_for_filenameprocessor_tests import (
assert_audit_table_entry,
Expand Down Expand Up @@ -288,7 +287,7 @@ def test_lambda_invalid_permissions(self):

def test_lambda_adds_event_to_audit_table_as_failed_when_unexpected_exception_is_caught(self):
"""
Tests that when an unexpected error occurs e.g. sending to SQS (maybe in case of bad deployment):
Tests that when an unexpected error occurs e.g. an unexpected exception when validating permissions:
* The file is added to the audit table with a status of 'Failed' and the reason
* The message is not sent to SQS
* The failure inf_ack file is created
Expand All @@ -298,8 +297,8 @@ def test_lambda_adds_event_to_audit_table_as_failed_when_unexpected_exception_is

with ( # noqa: E999
patch("file_name_processor.uuid4", return_value=test_file_details.message_id), # noqa: E999
patch("file_name_processor.make_and_send_sqs_message", side_effect=UnhandledSqsError(
"Some client error with SQS"
patch("file_name_processor.validate_vaccine_type_permissions", side_effect=Exception(
"Some unexpected exception"
))
): # noqa: E999
lambda_handler(self.make_event([self.make_record(test_file_details.file_key)]), None)
Expand All @@ -310,7 +309,7 @@ def test_lambda_adds_event_to_audit_table_as_failed_when_unexpected_exception_is
"filename": {"S": test_file_details.file_key},
"queue_name": {"S": "EMIS_FLU"},
"status": {"S": "Failed"},
"error_details": {"S": "Some client error with SQS"},
"error_details": {"S": "Some unexpected exception"},
"timestamp": {"S": test_file_details.created_at_formatted_string},
"expires_at": {"N": str(test_file_details.expires_at)},
}
Expand Down
Loading