Skip to content

Comments

feat: add configurable partition key for blob type dlq based on message timestamp#101

Merged
sumitaich1998 merged 4 commits intomainfrom
dlq-timestamp-based-partition
Jan 23, 2026
Merged

feat: add configurable partition key for blob type dlq based on message timestamp#101
sumitaich1998 merged 4 commits intomainfrom
dlq-timestamp-based-partition

Conversation

@sumitaich1998
Copy link

@sumitaich1998 sumitaich1998 commented Jan 22, 2026

We have added a new feature to partition DLQ blob storage files based on the Kafka message produce timestamp. Earlier, it was only possible to partition based on the consume timestamp. Now, we introduced a new config DLQ_BLOB_FILE_PARTITION_KEY which allows you to choose between PRODUCE_TIMESTAMP and CONSUME_TIMESTAMP. The default is still CONSUME_TIMESTAMP. If you select PRODUCE_TIMESTAMP and the message timestamp is missing or zero, it will automatically fallback to use the consume timestamp. We also added logging to show which partition key is being used and when a fallback happens.

@sumitaich1998 sumitaich1998 self-assigned this Jan 22, 2026
Copy link
Collaborator

@deepanshgoyal33 deepanshgoyal33 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small feedback

return Paths.get(message.getTopic(), consumeDate);
DlqPartitionKeyType partitionKeyType = dlqConfig.getDlqBlobFilePartitionKey();
if (partitionKeyType == DlqPartitionKeyType.PRODUCE_TIMESTAMP && message.getTimestamp() <= 0) {
firehoseInstrumentation.logInfo("DLQ partitioning fallback to consume timestamp for message topic: {}, partition: {}, offset: {}, timestamp: {}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should either add a metric that the produce timestamp is not good incase of the above mentioned condition. Or just log the timestamps for all the records, this is an extra if statement, which is not needed in my opinion.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, fixed in commit 3a2d81a

if (timestamp <= 0) {
timestamp = message.getConsumeTimestamp();
}
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else {
public static String getDateFromMessage(Message message, ZoneId zoneId) {
return getDateFromMessage(
message,
zoneId,
DlqPartitionKeyType.CONSUME_TIMESTAMP
);
}
public static String getDateFromMessage(
Message message,
ZoneId zoneId,
DlqPartitionKeyType partitionKeyType) {
long timestamp =
(partitionKeyType == DlqPartitionKeyType.PRODUCE_TIMESTAMP
&& message.getTimestamp() > 0)
? message.getTimestamp()
: message.getConsumeTimestamp();
return Instant.ofEpochMilli(timestamp)
.atZone(zoneId)
.toLocalDate()
.format(DateTimeFormatter.ISO_LOCAL_DATE);
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, fixed in commit 17faa6c

Copy link
Collaborator

@deepanshgoyal33 deepanshgoyal33 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sumitaich1998 sumitaich1998 merged commit 38ae89b into main Jan 23, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants