Make the receive consignment process fault-tolerant so orders ALWAYS complete, even if:
- Individual SKUs are not found
- Individual products fail to mark as received
- Notification service fails
- Any other errors occur during processing
CRITICAL RULE: Always mark order as "Complete" no matter what errors occur
Location: Around line 150-250 (the main processing loop)
Current behavior: If ANY product fails, the entire order fails New behavior: Log errors for individual products but continue processing, ALWAYS complete the order
// REPLACE the entire receiveConsignment function with this fault-tolerant version:
var receiveConsignment = function (payload, messageId) {
var commandName = payload.workerName;
var db = null;
var reportModelId = payload.reportModelId;
var orgModelId = payload.orgModelId;
var receivedProducts = payload.receivedProducts || [];
// Track errors but don't let them stop the process
var errors = [];
var successCount = 0;
var failureCount = 0;
logger.debug({
commandName: commandName,
message: 'Starting receive consignment process',
messageId: messageId,
reportModelId: reportModelId,
totalProducts: receivedProducts.length
});
return dbUtils.getDB()
.then(function (dbInstance) {
db = dbInstance;
logger.debug({
commandName: commandName,
message: 'Connected to database',
messageId: messageId
});
// Process each product with individual error handling
var productPromises = receivedProducts.map(function(product, index) {
return Promise.resolve()
.then(function() {
logger.debug({
commandName: commandName,
message: 'Processing product',
productIndex: index + 1,
totalProducts: receivedProducts.length,
sku: product.sku,
receivedQuantity: product.receivedQuantity
});
// Find product by SKU
return db.collection('ProductModel').findOne({
orgModelId: utils.ObjectId(orgModelId),
sku: product.sku
});
})
.then(function(productModel) {
if (!productModel) {
// SKU not found - LOG but DON'T fail
var error = {
sku: product.sku,
error: 'Product not found in database',
receivedQuantity: product.receivedQuantity
};
errors.push(error);
failureCount++;
logger.warn({
commandName: commandName,
message: 'SKU not found - IGNORING and continuing',
messageId: messageId,
sku: product.sku,
receivedQuantity: product.receivedQuantity
});
return Promise.resolve(); // Continue to next product
}
// Update line item as received
return db.collection('StockOrderLineitemModel').updateOne(
{
reportModelId: utils.ObjectId(reportModelId),
productModelId: productModel._id
},
{
$set: {
received: true,
receivedQuantity: product.receivedQuantity || 0,
updatedAt: new Date()
}
}
)
.then(function(updateResult) {
if (updateResult.modifiedCount > 0) {
successCount++;
logger.debug({
commandName: commandName,
message: 'Product marked as received',
sku: product.sku,
receivedQuantity: product.receivedQuantity
});
} else {
// Product not in order - LOG but DON'T fail
var error = {
sku: product.sku,
error: 'Product not found in order',
receivedQuantity: product.receivedQuantity
};
errors.push(error);
failureCount++;
logger.warn({
commandName: commandName,
message: 'Product not in order - IGNORING and continuing',
messageId: messageId,
sku: product.sku
});
}
return Promise.resolve();
});
})
.catch(function(error) {
// Individual product error - LOG but DON'T fail
errors.push({
sku: product.sku,
error: error.message || error,
receivedQuantity: product.receivedQuantity
});
failureCount++;
logger.error({
commandName: commandName,
message: 'Error processing product - IGNORING and continuing',
messageId: messageId,
sku: product.sku,
error: error.message || error
});
return Promise.resolve(); // Continue to next product
});
});
// Wait for all products to be processed (even if some failed)
return Promise.all(productPromises);
})
.then(function() {
logger.debug({
commandName: commandName,
message: 'All products processed',
messageId: messageId,
successCount: successCount,
failureCount: failureCount,
totalErrors: errors.length
});
// ALWAYS mark order as Complete - THIS IS CRITICAL
return db.collection('ReportModel').updateOne(
{ _id: utils.ObjectId(reportModelId) },
{
$set: {
state: utils.REPORT_STATES.COMPLETE,
updatedAt: new Date()
}
}
);
})
.then(function(updateResult) {
logger.debug({
commandName: commandName,
message: 'Order marked as COMPLETE',
messageId: messageId,
reportModelId: reportModelId,
successCount: successCount,
failureCount: failureCount,
modifiedCount: updateResult.modifiedCount
});
// Send notification - but DON'T let it fail the process
return sendNotification(payload, messageId, successCount, failureCount, errors)
.catch(function(notificationError) {
// Notification failed - LOG but DON'T fail
logger.error({
commandName: commandName,
message: 'Notification failed - IGNORING',
messageId: messageId,
error: notificationError.message || notificationError
});
return Promise.resolve(); // Continue anyway
});
})
.then(function() {
logger.debug({
commandName: commandName,
message: 'Receive consignment completed successfully',
messageId: messageId,
reportModelId: reportModelId,
summary: {
total: receivedProducts.length,
success: successCount,
failed: failureCount,
errors: errors
}
});
return Promise.resolve({
success: true,
reportModelId: reportModelId,
summary: {
total: receivedProducts.length,
success: successCount,
failed: failureCount,
errors: errors
}
});
})
.catch(function(error) {
// CRITICAL ERROR (database connection, etc.)
// Even in this case, try to mark order as complete
logger.error({
commandName: commandName,
message: 'CRITICAL ERROR - Attempting to mark order as complete anyway',
messageId: messageId,
error: error.message || error
});
// Last-ditch effort to complete the order
if (db) {
return db.collection('ReportModel').updateOne(
{ _id: utils.ObjectId(reportModelId) },
{
$set: {
state: utils.REPORT_STATES.COMPLETE,
updatedAt: new Date()
}
}
)
.then(function() {
logger.debug({
commandName: commandName,
message: 'Order marked as COMPLETE despite critical error',
messageId: messageId
});
return Promise.resolve({
success: true,
reportModelId: reportModelId,
hadCriticalError: true,
error: error.message || error
});
})
.catch(function(finalError) {
logger.error({
commandName: commandName,
message: 'FAILED to mark order as complete - manual intervention required',
messageId: messageId,
error: finalError.message || finalError
});
// Return success anyway to prevent worker retry
return Promise.resolve({
success: false,
reportModelId: reportModelId,
requiresManualCompletion: true,
error: error.message || error
});
});
} else {
logger.error({
commandName: commandName,
message: 'No database connection - manual intervention required',
messageId: messageId
});
return Promise.resolve({
success: false,
reportModelId: reportModelId,
requiresManualCompletion: true,
error: 'No database connection'
});
}
});
};
// Helper function to send notification (non-blocking)
function sendNotification(payload, messageId, successCount, failureCount, errors) {
var commandName = payload.workerName;
return Promise.resolve()
.then(function() {
var options = {
method: 'POST',
uri: utils.PUBLISH_URL,
json: true,
headers: {
'Authorization': payload.loopbackAccessToken.id
},
body: new utils.Notification(
utils.workerType.RECEIVE_CONSIGNMENT_VEND,
utils.messageFor.MESSAGE_FOR_CLIENT, // IMPORTANT: Use correct messageFor
utils.workerStatus.SUCCESS,
{
success: true,
reportModelId: payload.reportModelId,
summary: {
total: (payload.receivedProducts || []).length,
success: successCount,
failed: failureCount,
errors: errors.length > 0 ? errors : undefined
}
},
payload.loopbackAccessToken.userId // Use userId for MESSAGE_FOR_CLIENT
)
};
logger.debug({
commandName: commandName,
message: 'Sending notification',
messageId: messageId,
options: options
});
return rp(options);
})
.then(function(response) {
logger.debug({
commandName: commandName,
message: 'Notification sent successfully',
messageId: messageId,
response: response
});
return Promise.resolve();
})
.catch(function(error) {
logger.error({
commandName: commandName,
message: 'Failed to send notification',
messageId: messageId,
error: error.message || error
});
// Don't reject - just log and continue
return Promise.resolve();
});
}Location: Line 104-134 (Redis message handler)
Change: Add better error handling and logging for unknown payloads
// REPLACE the Redis message handler with this:
app.on('redis-subscriber-connected', () => {
app.redis.workerSubscriber.on('message', (channel, message) => {
try{
let payload = JSON.parse(message);
let { eventType, data, messageFor, status } = payload;
// Validate messageFor field
if (!messageFor) {
logger.warn({
message: 'Notification missing messageFor field - IGNORING',
payload: payload,
eventType: eventType
});
return; // Ignore but don't crash
}
switch(messageFor) {
case utils.constants.MESSAGE_FOR_CLIENT:
let { userId } = payload;
if (!userId) {
logger.warn({
message: 'MESSAGE_FOR_CLIENT missing userId - IGNORING',
payload: payload
});
return;
}
utils.sendSSEOutput(sseUsers, userId, eventType, status, data, utils.constants.MESSAGE_FOR_CLIENT);
break;
case utils.constants.MESSAGE_FOR_API:
let { callId } = payload;
if (!callId) {
logger.warn({
message: 'MESSAGE_FOR_API missing callId - IGNORING',
payload: payload
});
return;
}
utils.sendSSEOutput(sseAPI, callId, eventType, status, data, utils.constants.MESSAGE_FOR_API);
break;
default:
// Unknown messageFor - LOG but DON'T crash
logger.warn({
message: 'Unknown messageFor value - IGNORING',
messageFor: messageFor,
eventType: eventType,
payload: payload
});
// Don't throw error - just ignore
break;
}
}
catch(error) {
// JSON parse error or other error - LOG but DON'T crash
logger.error({
error: error,
message: 'Error while receiving redis subscription message - IGNORING',
rawMessage: message
});
// Don't throw - just log and continue
}
});
});Add these constants:
const exports = {}
module.exports = exports;
exports.workerEvents = {
// Add your worker events here
};
exports.workerStatus = {
STARTED: 'STARTED',
PROCESSING: 'PROCESSING',
SUCCESS: 'SUCCESS',
FAILED: 'FAILED',
};
// ADD THESE:
exports.MESSAGE_FOR_CLIENT = 'MESSAGE_FOR_CLIENT';
exports.MESSAGE_FOR_API = 'MESSAGE_FOR_API';Location: Line 732-742 (Notification constructor)
Ensure the Notification constructor validates messageFor:
exports.Notification = function (eventType, messageFor, status, data, id) {
// Validate messageFor
if (messageFor !== exports.messageFor.MESSAGE_FOR_CLIENT &&
messageFor !== exports.messageFor.MESSAGE_FOR_API) {
logger.warn({
message: 'Invalid messageFor value - defaulting to MESSAGE_FOR_CLIENT',
messageFor: messageFor,
eventType: eventType
});
messageFor = exports.messageFor.MESSAGE_FOR_CLIENT;
}
this.eventType = eventType;
this.messageFor = messageFor;
this.status = status;
this.data = data;
if (messageFor === exports.messageFor.MESSAGE_FOR_CLIENT) {
this.userId = id;
} else if (messageFor === exports.messageFor.MESSAGE_FOR_API) {
this.callId = id;
}
};# Send a receive consignment with an invalid SKU
# Expected: Order completes, error logged, invalid SKU ignored# Stop notification service
docker-compose stop notification
# Receive consignment
# Expected: Order completes, notification error logged
# Restart notification service
docker-compose start notification# Simulate database connection issue
# Expected: Order still attempts to complete// Run this query after each test
use stockup
db.ReportModel.find({
orgModelId: ObjectId("5e1f0aa335f11d1ffed8478e"),
state: { $ne: "Complete" },
updatedAt: { $gte: new ISODate("2026-02-19") }
}).forEach(function(order) {
print("STUCK ORDER: " + order.name + " - " + order.state);
});
// Should return NO results - all orders should be Complete- Individual SKU failures → Logged and ignored, processing continues
- Product not found in order → Logged and ignored, processing continues
- Notification failures → Logged and ignored, order still completes
- Database errors → Last-ditch effort to complete order
- Invalid notification payloads → Logged and ignored, service doesn't crash
- Order ALWAYS marked as "Complete" (unless database is completely down)
- Worker NEVER retries (returns success even with errors)
- All errors logged (for debugging and manual review)
- System keeps running (no crashes from individual failures)
- Check logs for
IGNORINGmessages to see what errors occurred - Check logs for
requiresManualCompletion: truefor critical failures - All errors are logged with full context for debugging
-
Backup current code
git checkout -b backup-before-fault-tolerant-fixes git commit -am "Backup before fault-tolerant fixes" -
Apply changes (use this MD file)
-
Test in development
docker-compose up --build
-
Deploy to production
git checkout master git merge backup-before-fault-tolerant-fixes docker-compose up -d --build
-
Monitor logs
docker-compose logs -f worker notification
If issues occur:
git checkout master
git revert HEAD
docker-compose up -d --buildEND OF FAULT-TOLERANT FIXES