diff --git a/build.gradle b/build.gradle index a046ae61..8c57b23a 100644 --- a/build.gradle +++ b/build.gradle @@ -34,7 +34,7 @@ lombok { group 'com.gotocompany' -version '0.12.27' +version '0.12.28' def projName = "firehose" diff --git a/docs/docs/advance/dlq.md b/docs/docs/advance/dlq.md index 4cc55206..028717cb 100644 --- a/docs/docs/advance/dlq.md +++ b/docs/docs/advance/dlq.md @@ -38,9 +38,17 @@ If the writer type is set to BLOB_STORAGE, we can choose any blob storage. Curre * Type: `optional` * Default value: `GCS` +## `DLQ_BLOB_FILE_PARTITION_KEY` + +Partition key for date-based DLQ blob storage directories when using BLOB_STORAGE writer type. Determines whether the date partition uses the message consume timestamp or the Kafka produce timestamp from metadata. + +* Example value: `PRODUCE_TIMESTAMP` +* Type: `optional` +* Default value: `CONSUME_TIMESTAMP` + ## `DLQ_BLOB_FILE_PARTITION_TIMEZONE` -Timezone to be used for date-based partitioning of DLQ files when using BLOB_STORAGE writer type. DLQ files are organized into directories based on the consume timestamp of the message converted to the specified timezone. The configuration accepts standard timezone identifiers and will fail application startup if an invalid timezone is provided. +Timezone to be used for date-based partitioning of DLQ files when using BLOB_STORAGE writer type. DLQ files are organized into directories based on the configured partition key timestamp converted to the specified timezone. The configuration accepts standard timezone identifiers and will fail application startup if an invalid timezone is provided. * Example value: `Asia/Tokyo` * Type: `optional` diff --git a/src/main/java/com/gotocompany/firehose/config/DlqConfig.java b/src/main/java/com/gotocompany/firehose/config/DlqConfig.java index dda508c1..1e19dc5c 100644 --- a/src/main/java/com/gotocompany/firehose/config/DlqConfig.java +++ b/src/main/java/com/gotocompany/firehose/config/DlqConfig.java @@ -1,10 +1,12 @@ package com.gotocompany.firehose.config; import com.gotocompany.firehose.config.converter.BlobStorageTypeConverter; +import com.gotocompany.firehose.config.converter.DlqPartitionKeyTypeConverter; import com.gotocompany.firehose.config.converter.DlqWriterTypeConverter; import com.gotocompany.firehose.config.converter.TimeZoneConverter; import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageType; import com.gotocompany.firehose.sink.dlq.DLQWriterType; +import com.gotocompany.firehose.sink.dlq.DlqPartitionKeyType; import java.time.ZoneId; @@ -37,4 +39,9 @@ public interface DlqConfig extends AppConfig { @ConverterClass(TimeZoneConverter.class) ZoneId getDlqBlobFilePartitionTimezone(); + @Key("DLQ_BLOB_FILE_PARTITION_KEY") + @DefaultValue("CONSUME_TIMESTAMP") + @ConverterClass(DlqPartitionKeyTypeConverter.class) + DlqPartitionKeyType getDlqBlobFilePartitionKey(); + } diff --git a/src/main/java/com/gotocompany/firehose/config/converter/DlqPartitionKeyTypeConverter.java b/src/main/java/com/gotocompany/firehose/config/converter/DlqPartitionKeyTypeConverter.java new file mode 100644 index 00000000..56a7b5de --- /dev/null +++ b/src/main/java/com/gotocompany/firehose/config/converter/DlqPartitionKeyTypeConverter.java @@ -0,0 +1,13 @@ +package com.gotocompany.firehose.config.converter; + +import com.gotocompany.firehose.sink.dlq.DlqPartitionKeyType; +import org.aeonbits.owner.Converter; + +import java.lang.reflect.Method; + +public class DlqPartitionKeyTypeConverter implements Converter { + @Override + public DlqPartitionKeyType convert(Method method, String input) { + return DlqPartitionKeyType.valueOf(input.toUpperCase()); + } +} diff --git a/src/main/java/com/gotocompany/firehose/sink/dlq/DlqPartitionKeyType.java b/src/main/java/com/gotocompany/firehose/sink/dlq/DlqPartitionKeyType.java new file mode 100644 index 00000000..acc3f456 --- /dev/null +++ b/src/main/java/com/gotocompany/firehose/sink/dlq/DlqPartitionKeyType.java @@ -0,0 +1,6 @@ +package com.gotocompany.firehose.sink.dlq; + +public enum DlqPartitionKeyType { + PRODUCE_TIMESTAMP, + CONSUME_TIMESTAMP +} diff --git a/src/main/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriter.java b/src/main/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriter.java index 4ae544e8..6f0e2ec2 100644 --- a/src/main/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriter.java +++ b/src/main/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriter.java @@ -8,6 +8,7 @@ import com.gotocompany.firehose.metrics.Metrics; import com.gotocompany.firehose.sink.common.blobstorage.BlobStorage; import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageException; +import com.gotocompany.firehose.sink.dlq.DlqPartitionKeyType; import com.gotocompany.firehose.sink.dlq.DlqWriter; import lombok.extern.slf4j.Slf4j; @@ -47,6 +48,8 @@ public List write(List messages) throws IOException { } firehoseInstrumentation.logInfo("Starting DLQ blob storage write for {} messages", messages.size()); + firehoseInstrumentation.logDebug("DLQ blob storage partition key type: {}, timezone: {}", + dlqConfig.getDlqBlobFilePartitionKey(), dlqConfig.getDlqBlobFilePartitionTimezone()); Map> messagesByPartition = messages.stream() .collect(Collectors.groupingBy(this::createPartition)); @@ -187,8 +190,14 @@ private String convertToString(Message message) { } private Path createPartition(Message message) { - String consumeDate = DlqDateUtils.getDateFromMessage(message, dlqConfig.getDlqBlobFilePartitionTimezone()); - return Paths.get(message.getTopic(), consumeDate); + DlqPartitionKeyType partitionKeyType = dlqConfig.getDlqBlobFilePartitionKey(); + firehoseInstrumentation.logDebug("DLQ partitioning message - topic: {}, partition: {}, offset: {}, produceTimestamp: {}, consumeTimestamp: {}", + message.getTopic(), message.getPartition(), message.getOffset(), message.getTimestamp(), message.getConsumeTimestamp()); + String partitionDate = DlqDateUtils.getDateFromMessage( + message, + dlqConfig.getDlqBlobFilePartitionTimezone(), + partitionKeyType); + return Paths.get(message.getTopic(), partitionDate); } private String extractDateFromPath(Path path) { diff --git a/src/main/java/com/gotocompany/firehose/sink/dlq/blobstorage/DlqDateUtils.java b/src/main/java/com/gotocompany/firehose/sink/dlq/blobstorage/DlqDateUtils.java index 6cb5e303..e7503694 100644 --- a/src/main/java/com/gotocompany/firehose/sink/dlq/blobstorage/DlqDateUtils.java +++ b/src/main/java/com/gotocompany/firehose/sink/dlq/blobstorage/DlqDateUtils.java @@ -1,17 +1,33 @@ package com.gotocompany.firehose.sink.dlq.blobstorage; import com.gotocompany.firehose.message.Message; +import com.gotocompany.firehose.sink.dlq.DlqPartitionKeyType; import java.time.Instant; -import java.time.LocalDate; import java.time.ZoneId; import java.time.format.DateTimeFormatter; public class DlqDateUtils { public static String getDateFromMessage(Message message, ZoneId zoneId) { - LocalDate consumeLocalDate = LocalDate.from(Instant.ofEpochMilli(message.getConsumeTimestamp()).atZone(zoneId)); - return DateTimeFormatter.ISO_LOCAL_DATE.format(consumeLocalDate); + return getDateFromMessage( + message, + zoneId, + DlqPartitionKeyType.CONSUME_TIMESTAMP + ); + } + + public static String getDateFromMessage(Message message, ZoneId zoneId, DlqPartitionKeyType partitionKeyType) { + long timestamp = + (partitionKeyType == DlqPartitionKeyType.PRODUCE_TIMESTAMP + && message.getTimestamp() > 0) + ? message.getTimestamp() + : message.getConsumeTimestamp(); + + return Instant.ofEpochMilli(timestamp) + .atZone(zoneId) + .toLocalDate() + .format(DateTimeFormatter.ISO_LOCAL_DATE); } } diff --git a/src/test/java/com/gotocompany/firehose/config/converter/DlqPartitionKeyTypeConverterTest.java b/src/test/java/com/gotocompany/firehose/config/converter/DlqPartitionKeyTypeConverterTest.java new file mode 100644 index 00000000..c54f6df0 --- /dev/null +++ b/src/test/java/com/gotocompany/firehose/config/converter/DlqPartitionKeyTypeConverterTest.java @@ -0,0 +1,40 @@ +package com.gotocompany.firehose.config.converter; + +import com.gotocompany.firehose.sink.dlq.DlqPartitionKeyType; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class DlqPartitionKeyTypeConverterTest { + + private DlqPartitionKeyTypeConverter converter; + + @Before + public void setUp() { + converter = new DlqPartitionKeyTypeConverter(); + } + + @Test + public void shouldReturnProduceTimestampForLowercaseInput() { + DlqPartitionKeyType result = converter.convert(null, "produce_timestamp"); + assertEquals(DlqPartitionKeyType.PRODUCE_TIMESTAMP, result); + } + + @Test + public void shouldReturnProduceTimestampForUppercaseInput() { + DlqPartitionKeyType result = converter.convert(null, "PRODUCE_TIMESTAMP"); + assertEquals(DlqPartitionKeyType.PRODUCE_TIMESTAMP, result); + } + + @Test + public void shouldReturnConsumeTimestampForMixedCaseInput() { + DlqPartitionKeyType result = converter.convert(null, "CoNsUmE_TiMeStAmP"); + assertEquals(DlqPartitionKeyType.CONSUME_TIMESTAMP, result); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnInvalidInput() { + converter.convert(null, "invalid"); + } +} diff --git a/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java b/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java index cf2fc7aa..2a3ac16d 100644 --- a/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java @@ -9,6 +9,7 @@ import com.gotocompany.firehose.metrics.Metrics; import com.gotocompany.firehose.sink.common.blobstorage.BlobStorage; import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageException; +import com.gotocompany.firehose.sink.dlq.DlqPartitionKeyType; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -43,6 +44,7 @@ public class BlobStorageDlqWriterTest { @Before public void setUp() throws Exception { when(dlqConfig.getDlqBlobFilePartitionTimezone()).thenReturn(ZoneId.of("UTC")); + when(dlqConfig.getDlqBlobFilePartitionKey()).thenReturn(DlqPartitionKeyType.CONSUME_TIMESTAMP); blobStorageDLQWriter = new BlobStorageDlqWriter(blobStorage, dlqConfig, firehoseInstrumentation); } @@ -443,4 +445,49 @@ public void shouldHandleMultipleMessagesInSamePartition() throws IOException, Bl eq("DLQ blob storage write complete - total: {}, successful partitions: {}, failed partitions: {}, successful messages: {}, failed messages: {}"), eq(3), eq(1), eq(0), eq(3), eq(0)); } + + @Test + public void shouldUseProduceTimestampForPartitioningWhenConfigured() throws IOException, BlobStorageException { + when(dlqConfig.getDlqBlobFilePartitionKey()).thenReturn(DlqPartitionKeyType.PRODUCE_TIMESTAMP); + long produceTimestamp = Instant.parse("2020-01-02T00:00:00Z").toEpochMilli(); + long consumeTimestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, produceTimestamp, + consumeTimestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + + List messages = Collections.singletonList(message); + blobStorageDLQWriter.write(messages); + + verify(blobStorage).store(contains("booking/2020-01-02"), any(byte[].class)); + } + + @Test + public void shouldFallbackToConsumeTimestampWhenProduceTimestampInvalid() throws IOException, BlobStorageException { + when(dlqConfig.getDlqBlobFilePartitionKey()).thenReturn(DlqPartitionKeyType.PRODUCE_TIMESTAMP); + long produceTimestamp = 0L; + long consumeTimestamp = Instant.parse("2020-01-03T00:00:00Z").toEpochMilli(); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, produceTimestamp, + consumeTimestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + + List messages = Collections.singletonList(message); + blobStorageDLQWriter.write(messages); + + verify(blobStorage).store(contains("booking/2020-01-03"), any(byte[].class)); + verify(firehoseInstrumentation).logDebug( + eq("DLQ partitioning message - topic: {}, partition: {}, offset: {}, produceTimestamp: {}, consumeTimestamp: {}"), + eq("booking"), eq(1), eq(1L), eq(0L), eq(consumeTimestamp)); + } + + @Test + public void shouldLogPartitionKeyTypeAndTimezone() throws IOException, BlobStorageException { + long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp, + timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + + List messages = Collections.singletonList(message); + blobStorageDLQWriter.write(messages); + + verify(firehoseInstrumentation).logDebug( + eq("DLQ blob storage partition key type: {}, timezone: {}"), + eq(DlqPartitionKeyType.CONSUME_TIMESTAMP), eq(ZoneId.of("UTC"))); + } } diff --git a/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/DlqDateUtilsTest.java b/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/DlqDateUtilsTest.java index 6aec4329..ecff4cf5 100644 --- a/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/DlqDateUtilsTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/DlqDateUtilsTest.java @@ -3,6 +3,7 @@ import com.gotocompany.depot.error.ErrorInfo; import com.gotocompany.depot.error.ErrorType; import com.gotocompany.firehose.message.Message; +import com.gotocompany.firehose.sink.dlq.DlqPartitionKeyType; import org.junit.Test; import java.io.IOException; @@ -67,5 +68,49 @@ public void shouldReturnDifferentDatesForDifferentTimestamps() { assertEquals("2020-01-01", date1); assertEquals("2020-01-02", date2); } + + @Test + public void shouldUseProduceTimestampWhenConfigured() { + long produceTimestamp = Instant.parse("2020-01-02T00:00:00Z").toEpochMilli(); + long consumeTimestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, produceTimestamp, consumeTimestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + + String date = DlqDateUtils.getDateFromMessage(message, ZoneId.of("UTC"), DlqPartitionKeyType.PRODUCE_TIMESTAMP); + + assertEquals("2020-01-02", date); + } + + @Test + public void shouldUseConsumeTimestampWhenConfigured() { + long produceTimestamp = Instant.parse("2020-01-02T00:00:00Z").toEpochMilli(); + long consumeTimestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, produceTimestamp, consumeTimestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + + String date = DlqDateUtils.getDateFromMessage(message, ZoneId.of("UTC"), DlqPartitionKeyType.CONSUME_TIMESTAMP); + + assertEquals("2020-01-01", date); + } + + @Test + public void shouldFallbackToConsumeTimestampWhenProduceTimestampInvalid() { + long produceTimestamp = 0L; + long consumeTimestamp = Instant.parse("2020-01-03T00:00:00Z").toEpochMilli(); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, produceTimestamp, consumeTimestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + + String date = DlqDateUtils.getDateFromMessage(message, ZoneId.of("UTC"), DlqPartitionKeyType.PRODUCE_TIMESTAMP); + + assertEquals("2020-01-03", date); + } + + @Test + public void shouldApplyTimezoneToProduceTimestamp() { + long produceTimestamp = Instant.parse("2020-01-01T15:00:00Z").toEpochMilli(); + long consumeTimestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli(); + Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, produceTimestamp, consumeTimestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR)); + + String date = DlqDateUtils.getDateFromMessage(message, ZoneId.of("Asia/Tokyo"), DlqPartitionKeyType.PRODUCE_TIMESTAMP); + + assertEquals("2020-01-02", date); + } }