Skip to content

Commit afda774

Browse files
committed
chore(demo-kafka): fix org.apache.kafka.common.errors.TimeoutException
1 parent f0574e7 commit afda774

File tree

6 files changed

+203
-29
lines changed

6 files changed

+203
-29
lines changed

demo-kafka/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@
3838
<groupId>org.slf4j</groupId>
3939
<artifactId>slf4j-api</artifactId>
4040
</dependency>
41+
<dependency>
42+
<groupId>org.projectlombok</groupId>
43+
<artifactId>lombok</artifactId>
44+
<scope>provided</scope>
45+
</dependency>
4146

4247
<dependency>
4348
<groupId>org.springframework.boot</groupId>

demo-kafka/src/main/java/com/helltractor/demo/messaging/MessagingFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,19 @@ public class MessagingFactory {
3434

3535
private final Logger logger = LoggerFactory.getLogger(getClass());
3636

37-
private final MessageTypes messageTypes;
37+
private final KafkaAdmin kafkaAdmin;
3838

3939
private final KafkaTemplate<String, String> kafkaTemplate;
4040

4141
private final ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory;
4242

43-
private final KafkaAdmin kafkaAdmin;
43+
private final MessageTypes messageTypes;
4444

4545
public MessagingFactory(MessageTypes messageTypes, KafkaTemplate<String, String> kafkaTemplate, ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory, KafkaAdmin kafkaAdmin) {
46-
this.messageTypes = messageTypes;
46+
this.kafkaAdmin = kafkaAdmin;
4747
this.kafkaTemplate = kafkaTemplate;
4848
this.listenerContainerFactory = listenerContainerFactory;
49-
this.kafkaAdmin = kafkaAdmin;
49+
this.messageTypes = messageTypes;
5050
}
5151

5252
@PostConstruct

demo-kafka/src/test/java/com/helltractor/demo/ConfluentKafkaContainerCluster.java renamed to demo-kafka/src/test/java/com/helltractor/demo/container/ConfluentKafkaContainerCluster.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.helltractor.demo;
1+
package com.helltractor.demo.container;
22

33
import org.apache.kafka.common.Uuid;
44
import org.awaitility.Awaitility;
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package com.helltractor.demo.container;
2+
3+
import lombok.SneakyThrows;
4+
import org.awaitility.Awaitility;
5+
import org.testcontainers.containers.Container;
6+
import org.testcontainers.containers.GenericContainer;
7+
import org.testcontainers.containers.KafkaContainer;
8+
import org.testcontainers.containers.Network;
9+
import org.testcontainers.lifecycle.Startable;
10+
import org.testcontainers.utility.DockerImageName;
11+
12+
import java.time.Duration;
13+
import java.util.Collection;
14+
import java.util.stream.Collectors;
15+
import java.util.stream.IntStream;
16+
import java.util.stream.Stream;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
/**
21+
* Provides an easy way to launch a Kafka cluster with multiple brokers.
22+
*/
23+
public class KafkaContainerCluster implements Startable {
24+
25+
private final int brokersNum;
26+
27+
private final Network network;
28+
29+
private final GenericContainer<?> zookeeper;
30+
31+
private final Collection<KafkaContainer> brokers;
32+
33+
public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) {
34+
if (brokersNum < 0) {
35+
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
36+
}
37+
if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {
38+
throw new IllegalArgumentException(
39+
"internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0"
40+
);
41+
}
42+
43+
this.brokersNum = brokersNum;
44+
this.network = Network.newNetwork();
45+
46+
this.zookeeper =
47+
new GenericContainer<>(DockerImageName.parse("confluentinc/cp-zookeeper").withTag(confluentPlatformVersion))
48+
.withNetwork(network)
49+
.withNetworkAliases("zookeeper")
50+
.withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(KafkaContainer.ZOOKEEPER_PORT));
51+
52+
this.brokers =
53+
IntStream
54+
.range(0, this.brokersNum)
55+
.mapToObj(brokerNum -> {
56+
return new KafkaContainer(
57+
DockerImageName.parse("confluentinc/cp-kafka").withTag(confluentPlatformVersion)
58+
)
59+
.withNetwork(this.network)
60+
.withNetworkAliases("broker-" + brokerNum)
61+
.dependsOn(this.zookeeper)
62+
.withExternalZookeeper("zookeeper:" + KafkaContainer.ZOOKEEPER_PORT)
63+
.withEnv("KAFKA_BROKER_ID", brokerNum + "")
64+
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "")
65+
.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "")
66+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "")
67+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + "")
68+
.withStartupTimeout(Duration.ofMinutes(1));
69+
})
70+
.collect(Collectors.toList());
71+
}
72+
73+
public Collection<KafkaContainer> getBrokers() {
74+
return this.brokers;
75+
}
76+
77+
public String getBootstrapServers() {
78+
return brokers.stream().map(KafkaContainer::getBootstrapServers).collect(Collectors.joining(","));
79+
}
80+
81+
private Stream<GenericContainer<?>> allContainers() {
82+
return Stream.concat(this.brokers.stream(), Stream.of(this.zookeeper));
83+
}
84+
85+
@Override
86+
@SneakyThrows
87+
public void start() {
88+
// sequential start to avoid resource contention on CI systems with weaker hardware
89+
brokers.forEach(GenericContainer::start);
90+
91+
Awaitility
92+
.await()
93+
.atMost(Duration.ofSeconds(30))
94+
.untilAsserted(() -> {
95+
Container.ExecResult result =
96+
this.zookeeper.execInContainer(
97+
"sh",
98+
"-c",
99+
"zookeeper-shell zookeeper:" +
100+
KafkaContainer.ZOOKEEPER_PORT +
101+
" ls /brokers/ids | tail -n 1"
102+
);
103+
String brokers = result.getStdout();
104+
105+
assertThat(brokers.split(",")).hasSize(this.brokersNum);
106+
});
107+
}
108+
109+
@Override
110+
public void stop() {
111+
allContainers().parallel().forEach(GenericContainer::stop);
112+
}
113+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.helltractor.demo.container;
2+
3+
import com.github.dockerjava.api.command.InspectContainerResponse;
4+
import org.testcontainers.containers.GenericContainer;
5+
import org.testcontainers.containers.wait.strategy.Wait;
6+
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
7+
import org.testcontainers.images.builder.Transferable;
8+
import org.testcontainers.utility.DockerImageName;
9+
10+
public class KafkaLocalContainer extends GenericContainer<KafkaLocalContainer> {
11+
12+
private static final String STARTER_SCRIPT = "/testcontainers_start.sh";
13+
14+
public KafkaLocalContainer(String image) {
15+
super(DockerImageName.parse(image));
16+
withExposedPorts(9092, 8082);
17+
var waitStrategy = new WaitAllStrategy().withStrategy(Wait.forLogMessage(".*started.*\\n", 1))
18+
.withStrategy(Wait.forHttp("/").forPort(8082).forStatusCode(200));
19+
waitingFor(waitStrategy);
20+
withCreateContainerCmdModifier(cmd -> {
21+
cmd.withEntrypoint("sh");
22+
});
23+
withCommand("-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
24+
}
25+
26+
@Override
27+
protected void containerIsStarting(InspectContainerResponse containerInfo) {
28+
var defaultListeners = "PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092";
29+
var defaultAdvertisedListeners = "PLAINTEXT://localhost:29092,PLAINTEXT_HOST://%s:%d".formatted(getHost(),
30+
getMappedPort(9092));
31+
var defaultSecurityProtocolMap = "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT";
32+
33+
var script = """
34+
#!/bin/bash
35+
export KAFKA_LISTENERS="%s"
36+
export KAFKA_ADVERTISED_LISTENERS="%s"
37+
export KAFKA_LISTENER_SECURITY_PROTOCOL_MAP="%s"
38+
39+
/etc/confluent/docker/run
40+
""".formatted(defaultListeners, defaultAdvertisedListeners, defaultSecurityProtocolMap);
41+
copyFileToContainer(Transferable.of(script, 0777), STARTER_SCRIPT);
42+
}
43+
44+
public String getBootstrapServer() {
45+
return "%s:%d".formatted(getHost(), getMappedPort(9092));
46+
}
47+
48+
public String getRestProxyUrl() {
49+
return "http://%s:%d".formatted(getHost(), getMappedPort(8082));
50+
}
51+
}

demo-kafka/src/test/java/com/helltractor/demo/messaging/MessagingTest.java renamed to demo-kafka/src/test/java/com/helltractor/demo/messaging/MessagingFactoryTest.java

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package com.helltractor.demo.messaging;
22

3-
import com.helltractor.demo.ConfluentKafkaContainerCluster;
43
import com.helltractor.demo.KafkaApplication;
4+
import com.helltractor.demo.container.ConfluentKafkaContainerCluster;
55
import com.helltractor.demo.message.TestMessage;
66
import com.helltractor.demo.util.IpUtil;
7+
import org.awaitility.Awaitility;
78
import org.junit.jupiter.api.AfterEach;
89
import org.junit.jupiter.api.BeforeEach;
910
import org.junit.jupiter.api.Test;
@@ -12,15 +13,34 @@
1213
import org.springframework.beans.factory.annotation.Autowired;
1314
import org.springframework.boot.test.context.SpringBootTest;
1415

16+
import java.time.Duration;
1517
import java.util.List;
1618
import java.util.concurrent.atomic.AtomicInteger;
1719

1820
import static org.junit.jupiter.api.Assertions.assertEquals;
1921

2022
@SpringBootTest(classes = KafkaApplication.class)
21-
public class MessagingTest {
23+
public class MessagingFactoryTest {
2224

23-
final static Logger logger = LoggerFactory.getLogger(MessagingTest.class);
25+
final static Logger logger = LoggerFactory.getLogger(MessagingFactoryTest.class);
26+
27+
static class TestConsumer {
28+
29+
final AtomicInteger messageCount = new AtomicInteger();
30+
31+
final long startTime = System.currentTimeMillis();
32+
33+
void processMessages(List<TestMessage> messages) {
34+
messageCount.addAndGet(messages.size());
35+
long currentTime = System.currentTimeMillis();
36+
logger.info("Received {} messages, Total: {}, Elapsed time: {} ms",
37+
messages.size(), messageCount.get(), currentTime - startTime);
38+
}
39+
40+
int getTotalMessages() {
41+
return messageCount.get();
42+
}
43+
}
2444

2545
@Autowired
2646
MessagingFactory messagingFactory;
@@ -45,27 +65,9 @@ void destroy() {
4565
cluster.stop();
4666
}
4767

48-
static class TestConsumer {
49-
50-
final AtomicInteger messageCount = new AtomicInteger();
51-
52-
final long startTime = System.currentTimeMillis();
53-
54-
void processMessages(List<TestMessage> messages) {
55-
messageCount.addAndGet(messages.size());
56-
long currentTime = System.currentTimeMillis();
57-
logger.info("Received {} messages, Total: {}, Elapsed time: {} ms",
58-
messages.size(), messageCount.get(), currentTime - startTime);
59-
}
60-
61-
int getTotalMessages() {
62-
return messageCount.get();
63-
}
64-
}
65-
6668
@Test
6769
void test() throws InterruptedException {
68-
for (int i = 0; i < 1000; i++) {
70+
for (int i = 0; i < 100; i++) {
6971
TestMessage testMessage = new TestMessage();
7072
testMessage.message = "Test-" + i;
7173
processorOne.sendMessages(testMessage);
@@ -79,7 +81,10 @@ void test() throws InterruptedException {
7981
messagingFactory.createBatchMessageListener(topic, groupId, testConsumer::processMessages);
8082
}
8183

82-
Thread.sleep(10000);
83-
assertEquals(3000, testConsumer.getTotalMessages());
84+
Awaitility.await()
85+
.atMost(Duration.ofSeconds(10))
86+
.until(() -> testConsumer.getTotalMessages() == 300);
87+
88+
assertEquals(300, testConsumer.getTotalMessages());
8489
}
8590
}

0 commit comments

Comments
 (0)