Skip to content

Commit 4d32a4e

Browse files
committed
chore(demo-kafka): fix org.apache.kafka.common.errors.TimeoutException
1 parent 8234e67 commit 4d32a4e

File tree

3 files changed

+22
-8
lines changed

3 files changed

+22
-8
lines changed

demo-kafka/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@
6565
<version>${testcontainers.version}</version>
6666
<scope>test</scope>
6767
</dependency>
68+
<dependency>
69+
<groupId>org.testcontainers</groupId>
70+
<artifactId>junit-jupiter</artifactId>
71+
<version>${testcontainers.version}</version>
72+
<scope>test</scope>
73+
</dependency>
6874
</dependencies>
6975

7076
</project>

demo-kafka/src/test/java/com/helltractor/demo/container/ConfluentKafkaContainerCluster.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.helltractor.demo.container;
22

33
import org.apache.kafka.common.Uuid;
4+
import org.apache.kafka.clients.NetworkClient;
45
import org.awaitility.Awaitility;
56
import org.testcontainers.containers.Container;
67
import org.testcontainers.containers.GenericContainer;

demo-kafka/src/test/java/com/helltractor/demo/messaging/MessagingFactoryTest.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import org.slf4j.LoggerFactory;
1313
import org.springframework.beans.factory.annotation.Autowired;
1414
import org.springframework.boot.test.context.SpringBootTest;
15+
import org.testcontainers.junit.jupiter.Container;
16+
import org.testcontainers.junit.jupiter.Testcontainers;
1517

1618
import java.time.Duration;
1719
import java.util.List;
@@ -20,10 +22,14 @@
2022
import static org.junit.jupiter.api.Assertions.assertEquals;
2123

2224
@SpringBootTest(classes = KafkaApplication.class)
23-
public class MessagingFactoryTest {
25+
@Testcontainers
26+
public final class MessagingFactoryTest {
2427

2528
final static Logger logger = LoggerFactory.getLogger(MessagingFactoryTest.class);
2629

30+
@Container
31+
static ConfluentKafkaContainerCluster kafkaCluster = new ConfluentKafkaContainerCluster("7.4.0", 1, 1);
32+
2733
static class TestConsumer {
2834

2935
final AtomicInteger messageCount = new AtomicInteger();
@@ -45,24 +51,25 @@ int getTotalMessages() {
4551
@Autowired
4652
MessagingFactory messagingFactory;
4753

48-
ConfluentKafkaContainerCluster cluster;
49-
5054
MessageProducer<TestMessage> processorOne;
5155
MessageProducer<TestMessage> processorTwo;
5256
MessageProducer<TestMessage> processorThree;
5357

5458
@BeforeEach
5559
void init() {
56-
cluster = new ConfluentKafkaContainerCluster("7.4.0", 1, 1);
57-
cluster.start();
60+
kafkaCluster.start();
61+
Awaitility.await()
62+
.atMost(Duration.ofMinutes(1))
63+
.pollInterval(Duration.ofSeconds(5))
64+
.until(() -> kafkaCluster.getBrokers().stream().allMatch(b -> b.isRunning()));
5865
processorOne = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_ONE, TestMessage.class);
5966
processorTwo = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_TWO, TestMessage.class);
6067
processorThree = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_THREE, TestMessage.class);
6168
}
6269

6370
@AfterEach
6471
void destroy() {
65-
cluster.stop();
72+
kafkaCluster.stop();
6673
}
6774

6875
@Test
@@ -82,8 +89,8 @@ void test() throws InterruptedException {
8289
}
8390

8491
Awaitility.await()
85-
.atMost(Duration.ofSeconds(10))
86-
.until(() -> testConsumer.getTotalMessages() == 300);
92+
.atMost(Duration.ofSeconds(20))
93+
.until(() -> testConsumer.getTotalMessages() >= 300);
8794

8895
assertEquals(300, testConsumer.getTotalMessages());
8996
}

0 commit comments

Comments
 (0)