Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up JDK 1.8
uses: actions/setup-java@v1
- name: Set up JDK 21
uses: actions/setup-java@v4
with:
java-version: 1.8
distribution: temurin
java-version: 21
- name: Grant execute permission for gradlew
run: chmod +x gradlew
- name: Build
Expand Down
12 changes: 7 additions & 5 deletions .github/workflows/package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up JDK 1.8
uses: actions/setup-java@v1
- name: Set up JDK 21
uses: actions/setup-java@v4
with:
java-version: 1.8
distribution: temurin
java-version: 21
- name: Grant execute permission for gradlew
run: chmod +x gradlew
- name: Build
Expand All @@ -27,9 +28,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-java@v1
- uses: actions/setup-java@v4
with:
java-version: 1.8
distribution: temurin
java-version: 21
- name: Get release tag
id: get_version
uses: battila7/get-version-action@v2
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ generated
.settings
bin
src/test/resources
.DS_Store
.DS_Store
LocalDockerfile
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM adoptopenjdk:8-jdk-openj9 AS GRADLE_BUILD
FROM eclipse-temurin:21-jdk AS GRADLE_BUILD
RUN mkdir -p ./build/libs/
RUN curl -L http://search.maven.org/remotecontent?filepath=org/jolokia/jolokia-jvm/1.6.2/jolokia-jvm-1.6.2-agent.jar -o ./jolokia-jvm-agent.jar
COPY ./ ./
Expand All @@ -9,7 +9,7 @@ ENV GITHUB_ACTOR=$GITHUB_ACTOR
ENV GITHUB_TOKEN=$GITHUB_TOKEN
RUN ./gradlew build

FROM eclipse-temurin:8u412-b08-jre
FROM eclipse-temurin:21-jre
RUN apt-get update && apt-get upgrade -y curl
COPY --from=GRADLE_BUILD ./build/libs/ /opt/firehose/bin
COPY --from=GRADLE_BUILD ./jolokia-jvm-agent.jar /opt/firehose
Expand Down
37 changes: 21 additions & 16 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ buildscript {
maven { url 'https://plugins.gradle.org/m2/' }
}
dependencies {
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.17'
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.9.4'
classpath "org.jfrog.buildinfo:build-info-extractor-gradle:4.33.1"
classpath "org.ajoberstar:gradle-git:1.6.0"
}
Expand All @@ -14,8 +14,8 @@ plugins {
id 'idea'
id 'checkstyle'
id 'jacoco'
id "com.google.protobuf" version "0.8.17"
id 'nebula.ospackage' version '8.6.3'
id "com.google.protobuf" version "0.9.4"
id 'com.netflix.nebula.ospackage' version '11.11.0'
id 'io.franzbecker.gradle-lombok' version '1.14'
id 'maven-publish'
}
Expand All @@ -28,7 +28,7 @@ configurations {
}

lombok {
version = '1.18.4'
version = '1.18.30'
sha256 = ""
}

Expand All @@ -38,8 +38,8 @@ version '0.12.28'

def projName = "firehose"

sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
sourceCompatibility = JavaVersion.VERSION_21
targetCompatibility = JavaVersion.VERSION_21

repositories {
mavenLocal()
Expand Down Expand Up @@ -67,8 +67,8 @@ private Properties loadEnv() {
def mainClassName = "com.gotocompany.firehose.launch.Main"

dependencies {
implementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.1.0'
implementation group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.1.0'
implementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.21.12'
implementation group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.21.12'
implementation group: 'com.datadoghq', name: 'java-dogstatsd-client', version: '2.13.0'
implementation group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.2'
implementation group: 'org.aeonbits.owner', name: 'owner', version: '1.0.9'
Expand Down Expand Up @@ -118,8 +118,8 @@ dependencies {

testImplementation group: 'junit', name: 'junit', version: '4.11'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'org.mockito:mockito-core:4.5.1'
testImplementation 'org.mockito:mockito-inline:4.5.1'
testImplementation 'org.mockito:mockito-core:5.14.2'
testImplementation 'org.mockito:mockito-inline:5.2.0'
testImplementation "com.github.tomakehurst:wiremock:2.3.1"
testImplementation group: 'io.opentracing', name: 'opentracing-mock', version: '0.33.0'
testImplementation group: 'org.mock-server', name: 'mockserver-netty', version: '3.10.5'
Expand All @@ -130,11 +130,11 @@ dependencies {
protobuf {
generatedFilesBaseDir = "$projectDir/src/generated"
protoc {
artifact = "com.google.protobuf:protoc:3.1.0"
artifact = "com.google.protobuf:protoc:3.21.12"
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:1.0.3"
artifact = "io.grpc:protoc-gen-grpc-java:1.53.0"
}
}
generateProtoTasks {
Expand Down Expand Up @@ -166,6 +166,11 @@ test {
}

}

processTestResources {
dependsOn generateProto, generateTestProto
}

clean {
delete "$projectDir/src/test/resources/__files"
}
Expand Down Expand Up @@ -258,10 +263,10 @@ checkstyleTest {

jacocoTestReport {
reports {
xml.enabled true
csv.enabled false
html.enabled = true
html.destination file("${reportsDir}/jacoco/")
xml.required = true
csv.required = false
html.required = true
html.outputLocation = file("${reportsDir}/jacoco/")
}
afterEvaluate {
getClassDirectories().setFrom(classDirectories.files.collect {
Expand Down
6 changes: 3 additions & 3 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Wed Aug 25 23:42:32 IST 2021
distributionUrl=https\://services.gradle.org/distributions/gradle-7.2-all.zip
#Tue Feb 17 02:26:20 WIB 2026
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.2-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
27 changes: 27 additions & 0 deletions src/main/java/com/gotocompany/firehose/launch/BootupTimer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.gotocompany.firehose.launch;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public final class BootupTimer {
private static final AtomicBoolean FIRST_CONSUMED = new AtomicBoolean(false);
private static volatile long startNanos;

private BootupTimer() {
}

public static void markProcessStart() {
startNanos = System.nanoTime();
}

public static void markFirstConsumed() {
if (startNanos == 0L) {
return;
}
if (FIRST_CONSUMED.compareAndSet(false, true)) {
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
System.out.println("Bootup time to first consumed batch: " + elapsedMs + " ms");
}
}
}

3 changes: 3 additions & 0 deletions src/main/java/com/gotocompany/firehose/launch/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class Main {
* @throws InterruptedException the interrupted exception
*/
public static void main(String[] args) throws InterruptedException {
BootupTimer.markProcessStart();
KafkaConsumerConfig kafkaConsumerConfig = ConfigFactory.create(KafkaConsumerConfig.class, System.getenv());
multiThreadedConsumers(kafkaConsumerConfig);
}
Expand Down Expand Up @@ -51,6 +52,8 @@ private static void multiThreadedConsumers(KafkaConsumerConfig kafkaConsumerConf
firehoseInstrumentation.logWarn("Consumer Thread interrupted, leaving the loop!");
break;
}
// remove this after we have a better way to measure bootup time for async consumer
BootupTimer.markFirstConsumed();
firehoseConsumer.process();
}
} catch (Exception | Error e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.gotocompany.firehose.serializer;


import com.gotocompany.firehose.message.Message;
import com.gotocompany.firehose.exception.DeserializerException;
import com.google.gson.ExclusionStrategy;
Expand All @@ -17,13 +16,17 @@
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.ArrayList;
import java.util.Date;

/**
* EsbMessageToJson Serialize protobuff message content into JSON.
Expand All @@ -35,6 +38,9 @@ public class MessageToJson implements MessageSerializer {
private boolean wrapInsideArray;
private boolean enableSimpleDateFormat;

private DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("MMM dd, yyyy hh:mm:ss a", Locale.US);
private DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss", Locale.US);

public MessageToJson(Parser protoParser, boolean preserveFieldNames, boolean enableSimpleDateFormat) {
this(protoParser, preserveFieldNames, false, enableSimpleDateFormat);
}
Expand Down Expand Up @@ -112,13 +118,14 @@ private JSONObject convertProtoBuffTimeStampToDateTime(JSONObject jsonObject, St
JSONObject parentObject = (JSONObject) new JSONParser().parse(jsonObject.get(parentField).toString());
String timestampObject = parentObject.get(timeStampField).toString();

Date date;
OffsetDateTime dateTime;
try {
date = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss").parse(timestampObject);
} catch (java.text.ParseException e) {
throw new RuntimeException(String.format("Not able to parse date, %s", timestampObject));
dateTime = OffsetDateTime.parse(timestampObject);
} catch (DateTimeParseException ignored) {
LocalDateTime localDateTime = LocalDateTime.parse(timestampObject, inputFormatter);
dateTime = localDateTime.atOffset(ZoneOffset.UTC);
}
parentObject.put(timeStampField, date);
parentObject.put(timeStampField, outputFormatter.format(dateTime));
jsonObject.put(parentField, gson.toJson(parentObject));

return jsonObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public void setUp() {
key = TestKey.newBuilder().setOrderNumber("123").setOrderUrl("abc").build();
firehoseKafkaConsumer = new FirehoseKafkaConsumer(kafkaConsumer, consumerConfig, firehoseInstrumentation);
when(consumerConfig.getSourceKafkaPollTimeoutMs()).thenReturn(500L);
when(consumerConfig.getSourceKafkaConsumerGroupId()).thenReturn(consumerGroupId);
when(kafkaConsumer.poll(Duration.ofMillis(500L))).thenReturn(consumerRecords);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public void testConstructorSuccess() {
timestampFilter = new TimestampFilter(stencilClient, filterConfig, instrumentation);
assertNotNull(timestampFilter);

verify(instrumentation, times(7)).logInfo(anyString(), any());
// 7 invocations were failing because 6 were called with one parameter and rest 6 were called with two parameters.
verify(instrumentation, times(1)).logInfo(anyString());
verify(instrumentation, times(6)).logInfo(anyString(), any());
}

@Test(expected = IllegalArgumentException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import static org.mockito.Mockito.*;

@RunWith(MockitoJUnitRunner.class)
@RunWith(MockitoJUnitRunner.Silent.class)
public class MessageDeSerializerTest {

@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import static org.junit.Assert.assertEquals;

@RunWith(MockitoJUnitRunner.class)
@RunWith(MockitoJUnitRunner.Silent.class)
public class WriterOrchestratorTest {

@Rule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import static org.junit.Assert.assertEquals;

@RunWith(MockitoJUnitRunner.class)
@RunWith(MockitoJUnitRunner.Silent.class)
public class TimePartitionedPathUtilsTest {

private final String zone = "UTC";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;

@RunWith(MockitoJUnitRunner.class)
@RunWith(MockitoJUnitRunner.Silent.class)
public class BlobStorageCheckerTest {

private final BlockingQueue<LocalFileMetadata> toBeFlushedToRemotePaths = new LinkedBlockingQueue<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

@RunWith(MockitoJUnitRunner.class)
@RunWith(MockitoJUnitRunner.Silent.class)
public class BlobStorageFactoryTest {

private Map<String, String> validOssConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;

@RunWith(MockitoJUnitRunner.class)
@RunWith(MockitoJUnitRunner.Silent.class)
public class CloudObjectStorageTest {

@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

@RunWith(MockitoJUnitRunner.class)
@RunWith(MockitoJUnitRunner.Silent.class)
public class TencentSecurityTokenServiceTest {
private static final String SECRET_ID = "test-secret-id";
private static final String SECRET_KEY = "test-secret-key";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;

@RunWith(MockitoJUnitRunner.class)
@RunWith(MockitoJUnitRunner.Silent.class)
public class TencentObjectOperationsTest {

@Mock
Expand Down
Loading