-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathConsumer.java
More file actions
103 lines (83 loc) · 4.55 KB
/
Consumer.java
File metadata and controls
103 lines (83 loc) · 4.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package com.oci.stream;
import com.google.common.util.concurrent.Uninterruptibles;
import com.oracle.bmc.ConfigFileReader;
import com.oracle.bmc.auth.AuthenticationDetailsProvider;
import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
import com.oracle.bmc.streaming.StreamClient;
import com.oracle.bmc.streaming.model.CreateGroupCursorDetails;
import com.oracle.bmc.streaming.model.Message;
import com.oracle.bmc.streaming.requests.CreateGroupCursorRequest;
import com.oracle.bmc.streaming.requests.GetMessagesRequest;
import com.oracle.bmc.streaming.responses.CreateGroupCursorResponse;
import com.oracle.bmc.streaming.responses.GetMessagesResponse;
import java.util.concurrent.TimeUnit;
import static java.nio.charset.StandardCharsets.UTF_8;
public class Consumer {
public static void main(String[] args) throws Exception {
final String configurationFilePath = "/home/fernando_h/.oci/config";
final String profile = "DEFAULT";
final String ociStreamOcid = "ocid1.stream.oc1.eu-frankfurt-1.amaaaaaaue...";
final String ociMessageEndpoint = "https://cell-1.streaming.eu-frankfurt-1.oci.oraclecloud.com";
final ConfigFileReader.ConfigFile configFile = ConfigFileReader.parseDefault();
final AuthenticationDetailsProvider provider =
new ConfigFileAuthenticationDetailsProvider(configFile);
// Streams are assigned a specific endpoint url based on where they are provisioned.
// Create a stream client using the provided message endpoint.
StreamClient streamClient = StreamClient.builder().endpoint(ociMessageEndpoint).build(provider);
// A cursor can be created as part of a consumer group.
// Committed offsets are managed for the group, and partitions
// are dynamically balanced amongst consumers in the group.
System.out.println("Starting a simple message loop with a group cursor");
String groupCursor =
getCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1");
simpleMessageLoop(streamClient, ociStreamOcid, groupCursor);
}
private static void simpleMessageLoop(
StreamClient streamClient, String streamId, String initialCursor) {
String cursor = initialCursor;
for (int i = 0; i < 10; i++) {
GetMessagesRequest getRequest =
GetMessagesRequest.builder()
.streamId(streamId)
.cursor(cursor)
.limit(25)
.build();
GetMessagesResponse getResponse = streamClient.getMessages(getRequest);
// process the messages
System.out.println(String.format("Read %s messages.", getResponse.getItems().size()));
for (Message message : ((GetMessagesResponse) getResponse).getItems()) {
System.out.println(
String.format(
"%s: %s",
message.getKey() == null ? "Null" :new String(message.getKey(), UTF_8),
new String(message.getValue(), UTF_8)));
}
// getMessages is a throttled method; clients should retrieve sufficiently large message
// batches, as to avoid too many http requests.
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
// use the next-cursor for iteration
cursor = getResponse.getOpcNextCursor();
}
}
private static String getCursorByGroup(
StreamClient streamClient, String streamId, String groupName, String instanceName) {
System.out.println(
String.format(
"Creating a cursor for group %s, instance %s.", groupName, instanceName));
CreateGroupCursorDetails cursorDetails =
CreateGroupCursorDetails.builder()
.groupName(groupName)
.instanceName(instanceName)
.type(CreateGroupCursorDetails.Type.TrimHorizon)
.commitOnGet(true)
.build();
CreateGroupCursorRequest createCursorRequest =
CreateGroupCursorRequest.builder()
.streamId(streamId)
.createGroupCursorDetails(cursorDetails)
.build();
CreateGroupCursorResponse groupCursorResponse =
streamClient.createGroupCursor(createCursorRequest);
return groupCursorResponse.getCursor().getValue();
}
}