Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ lombok {

group 'com.gotocompany'

version '0.12.27'
version '0.12.28'

def projName = "firehose"

Expand Down
10 changes: 9 additions & 1 deletion docs/docs/advance/dlq.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,17 @@ If the writer type is set to BLOB_STORAGE, we can choose any blob storage. Curre
* Type: `optional`
* Default value: `GCS`

## `DLQ_BLOB_FILE_PARTITION_KEY`

Partition key for date-based DLQ blob storage directories when using BLOB_STORAGE writer type. Determines whether the date partition uses the message consume timestamp or the Kafka produce timestamp from metadata.

* Example value: `PRODUCE_TIMESTAMP`
* Type: `optional`
* Default value: `CONSUME_TIMESTAMP`

## `DLQ_BLOB_FILE_PARTITION_TIMEZONE`

Timezone to be used for date-based partitioning of DLQ files when using BLOB_STORAGE writer type. DLQ files are organized into directories based on the consume timestamp of the message converted to the specified timezone. The configuration accepts standard timezone identifiers and will fail application startup if an invalid timezone is provided.
Timezone to be used for date-based partitioning of DLQ files when using BLOB_STORAGE writer type. DLQ files are organized into directories based on the configured partition key timestamp converted to the specified timezone. The configuration accepts standard timezone identifiers and will fail application startup if an invalid timezone is provided.

* Example value: `Asia/Tokyo`
* Type: `optional`
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/gotocompany/firehose/config/DlqConfig.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.gotocompany.firehose.config;

import com.gotocompany.firehose.config.converter.BlobStorageTypeConverter;
import com.gotocompany.firehose.config.converter.DlqPartitionKeyTypeConverter;
import com.gotocompany.firehose.config.converter.DlqWriterTypeConverter;
import com.gotocompany.firehose.config.converter.TimeZoneConverter;
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageType;
import com.gotocompany.firehose.sink.dlq.DLQWriterType;
import com.gotocompany.firehose.sink.dlq.DlqPartitionKeyType;

import java.time.ZoneId;

Expand Down Expand Up @@ -37,4 +39,9 @@ public interface DlqConfig extends AppConfig {
@ConverterClass(TimeZoneConverter.class)
ZoneId getDlqBlobFilePartitionTimezone();

@Key("DLQ_BLOB_FILE_PARTITION_KEY")
@DefaultValue("CONSUME_TIMESTAMP")
@ConverterClass(DlqPartitionKeyTypeConverter.class)
DlqPartitionKeyType getDlqBlobFilePartitionKey();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.gotocompany.firehose.config.converter;

import com.gotocompany.firehose.sink.dlq.DlqPartitionKeyType;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;

public class DlqPartitionKeyTypeConverter implements Converter<DlqPartitionKeyType> {
@Override
public DlqPartitionKeyType convert(Method method, String input) {
return DlqPartitionKeyType.valueOf(input.toUpperCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.gotocompany.firehose.sink.dlq;

public enum DlqPartitionKeyType {
PRODUCE_TIMESTAMP,
CONSUME_TIMESTAMP
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.gotocompany.firehose.metrics.Metrics;
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorage;
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageException;
import com.gotocompany.firehose.sink.dlq.DlqPartitionKeyType;
import com.gotocompany.firehose.sink.dlq.DlqWriter;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -47,6 +48,8 @@ public List<Message> write(List<Message> messages) throws IOException {
}

firehoseInstrumentation.logInfo("Starting DLQ blob storage write for {} messages", messages.size());
firehoseInstrumentation.logDebug("DLQ blob storage partition key type: {}, timezone: {}",
dlqConfig.getDlqBlobFilePartitionKey(), dlqConfig.getDlqBlobFilePartitionTimezone());

Map<Path, List<Message>> messagesByPartition = messages.stream()
.collect(Collectors.groupingBy(this::createPartition));
Expand Down Expand Up @@ -187,8 +190,14 @@ private String convertToString(Message message) {
}

private Path createPartition(Message message) {
String consumeDate = DlqDateUtils.getDateFromMessage(message, dlqConfig.getDlqBlobFilePartitionTimezone());
return Paths.get(message.getTopic(), consumeDate);
DlqPartitionKeyType partitionKeyType = dlqConfig.getDlqBlobFilePartitionKey();
firehoseInstrumentation.logDebug("DLQ partitioning message - topic: {}, partition: {}, offset: {}, produceTimestamp: {}, consumeTimestamp: {}",
message.getTopic(), message.getPartition(), message.getOffset(), message.getTimestamp(), message.getConsumeTimestamp());
String partitionDate = DlqDateUtils.getDateFromMessage(
message,
dlqConfig.getDlqBlobFilePartitionTimezone(),
partitionKeyType);
return Paths.get(message.getTopic(), partitionDate);
}

private String extractDateFromPath(Path path) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,33 @@
package com.gotocompany.firehose.sink.dlq.blobstorage;

import com.gotocompany.firehose.message.Message;
import com.gotocompany.firehose.sink.dlq.DlqPartitionKeyType;

import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;

public class DlqDateUtils {

public static String getDateFromMessage(Message message, ZoneId zoneId) {
LocalDate consumeLocalDate = LocalDate.from(Instant.ofEpochMilli(message.getConsumeTimestamp()).atZone(zoneId));
return DateTimeFormatter.ISO_LOCAL_DATE.format(consumeLocalDate);
return getDateFromMessage(
message,
zoneId,
DlqPartitionKeyType.CONSUME_TIMESTAMP
);
}

public static String getDateFromMessage(Message message, ZoneId zoneId, DlqPartitionKeyType partitionKeyType) {
long timestamp =
(partitionKeyType == DlqPartitionKeyType.PRODUCE_TIMESTAMP
&& message.getTimestamp() > 0)
? message.getTimestamp()
: message.getConsumeTimestamp();

return Instant.ofEpochMilli(timestamp)
.atZone(zoneId)
.toLocalDate()
.format(DateTimeFormatter.ISO_LOCAL_DATE);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.gotocompany.firehose.config.converter;

import com.gotocompany.firehose.sink.dlq.DlqPartitionKeyType;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class DlqPartitionKeyTypeConverterTest {

private DlqPartitionKeyTypeConverter converter;

@Before
public void setUp() {
converter = new DlqPartitionKeyTypeConverter();
}

@Test
public void shouldReturnProduceTimestampForLowercaseInput() {
DlqPartitionKeyType result = converter.convert(null, "produce_timestamp");
assertEquals(DlqPartitionKeyType.PRODUCE_TIMESTAMP, result);
}

@Test
public void shouldReturnProduceTimestampForUppercaseInput() {
DlqPartitionKeyType result = converter.convert(null, "PRODUCE_TIMESTAMP");
assertEquals(DlqPartitionKeyType.PRODUCE_TIMESTAMP, result);
}

@Test
public void shouldReturnConsumeTimestampForMixedCaseInput() {
DlqPartitionKeyType result = converter.convert(null, "CoNsUmE_TiMeStAmP");
assertEquals(DlqPartitionKeyType.CONSUME_TIMESTAMP, result);
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowOnInvalidInput() {
converter.convert(null, "invalid");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.gotocompany.firehose.metrics.Metrics;
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorage;
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageException;
import com.gotocompany.firehose.sink.dlq.DlqPartitionKeyType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -43,6 +44,7 @@ public class BlobStorageDlqWriterTest {
@Before
public void setUp() throws Exception {
when(dlqConfig.getDlqBlobFilePartitionTimezone()).thenReturn(ZoneId.of("UTC"));
when(dlqConfig.getDlqBlobFilePartitionKey()).thenReturn(DlqPartitionKeyType.CONSUME_TIMESTAMP);
blobStorageDLQWriter = new BlobStorageDlqWriter(blobStorage, dlqConfig, firehoseInstrumentation);
}

Expand Down Expand Up @@ -443,4 +445,49 @@ public void shouldHandleMultipleMessagesInSamePartition() throws IOException, Bl
eq("DLQ blob storage write complete - total: {}, successful partitions: {}, failed partitions: {}, successful messages: {}, failed messages: {}"),
eq(3), eq(1), eq(0), eq(3), eq(0));
}

@Test
public void shouldUseProduceTimestampForPartitioningWhenConfigured() throws IOException, BlobStorageException {
when(dlqConfig.getDlqBlobFilePartitionKey()).thenReturn(DlqPartitionKeyType.PRODUCE_TIMESTAMP);
long produceTimestamp = Instant.parse("2020-01-02T00:00:00Z").toEpochMilli();
long consumeTimestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli();
Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, produceTimestamp,
consumeTimestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR));

List<Message> messages = Collections.singletonList(message);
blobStorageDLQWriter.write(messages);

verify(blobStorage).store(contains("booking/2020-01-02"), any(byte[].class));
}

@Test
public void shouldFallbackToConsumeTimestampWhenProduceTimestampInvalid() throws IOException, BlobStorageException {
when(dlqConfig.getDlqBlobFilePartitionKey()).thenReturn(DlqPartitionKeyType.PRODUCE_TIMESTAMP);
long produceTimestamp = 0L;
long consumeTimestamp = Instant.parse("2020-01-03T00:00:00Z").toEpochMilli();
Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, produceTimestamp,
consumeTimestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR));

List<Message> messages = Collections.singletonList(message);
blobStorageDLQWriter.write(messages);

verify(blobStorage).store(contains("booking/2020-01-03"), any(byte[].class));
verify(firehoseInstrumentation).logDebug(
eq("DLQ partitioning message - topic: {}, partition: {}, offset: {}, produceTimestamp: {}, consumeTimestamp: {}"),
eq("booking"), eq(1), eq(1L), eq(0L), eq(consumeTimestamp));
}

@Test
public void shouldLogPartitionKeyTypeAndTimezone() throws IOException, BlobStorageException {
long timestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli();
Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, timestamp,
timestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR));

List<Message> messages = Collections.singletonList(message);
blobStorageDLQWriter.write(messages);

verify(firehoseInstrumentation).logDebug(
eq("DLQ blob storage partition key type: {}, timezone: {}"),
eq(DlqPartitionKeyType.CONSUME_TIMESTAMP), eq(ZoneId.of("UTC")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.gotocompany.depot.error.ErrorInfo;
import com.gotocompany.depot.error.ErrorType;
import com.gotocompany.firehose.message.Message;
import com.gotocompany.firehose.sink.dlq.DlqPartitionKeyType;
import org.junit.Test;

import java.io.IOException;
Expand Down Expand Up @@ -67,5 +68,49 @@ public void shouldReturnDifferentDatesForDifferentTimestamps() {
assertEquals("2020-01-01", date1);
assertEquals("2020-01-02", date2);
}

@Test
public void shouldUseProduceTimestampWhenConfigured() {
long produceTimestamp = Instant.parse("2020-01-02T00:00:00Z").toEpochMilli();
long consumeTimestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli();
Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, produceTimestamp, consumeTimestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR));

String date = DlqDateUtils.getDateFromMessage(message, ZoneId.of("UTC"), DlqPartitionKeyType.PRODUCE_TIMESTAMP);

assertEquals("2020-01-02", date);
}

@Test
public void shouldUseConsumeTimestampWhenConfigured() {
long produceTimestamp = Instant.parse("2020-01-02T00:00:00Z").toEpochMilli();
long consumeTimestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli();
Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, produceTimestamp, consumeTimestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR));

String date = DlqDateUtils.getDateFromMessage(message, ZoneId.of("UTC"), DlqPartitionKeyType.CONSUME_TIMESTAMP);

assertEquals("2020-01-01", date);
}

@Test
public void shouldFallbackToConsumeTimestampWhenProduceTimestampInvalid() {
long produceTimestamp = 0L;
long consumeTimestamp = Instant.parse("2020-01-03T00:00:00Z").toEpochMilli();
Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, produceTimestamp, consumeTimestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR));

String date = DlqDateUtils.getDateFromMessage(message, ZoneId.of("UTC"), DlqPartitionKeyType.PRODUCE_TIMESTAMP);

assertEquals("2020-01-03", date);
}

@Test
public void shouldApplyTimezoneToProduceTimestamp() {
long produceTimestamp = Instant.parse("2020-01-01T15:00:00Z").toEpochMilli();
long consumeTimestamp = Instant.parse("2020-01-01T00:00:00Z").toEpochMilli();
Message message = new Message("123".getBytes(), "abc".getBytes(), "booking", 1, 1, null, produceTimestamp, consumeTimestamp, new ErrorInfo(new IOException("test"), ErrorType.DESERIALIZATION_ERROR));

String date = DlqDateUtils.getDateFromMessage(message, ZoneId.of("Asia/Tokyo"), DlqPartitionKeyType.PRODUCE_TIMESTAMP);

assertEquals("2020-01-02", date);
}
}