Skip to content

Commit ffb39df

Browse files
committed
DsfClient: Improved async search impl, refactored operations with retry
New DelayStrategy to configure first and later delays. Constant 200ms and exponential (100ms, 200ms, 400ms, 800ms, 800ms, ...) backoff implementations.
1 parent e3bf70c commit ffb39df

File tree

12 files changed

+296
-106
lines changed

12 files changed

+296
-106
lines changed

dsf-bpe/dsf-bpe-process-api-v2-impl/src/main/java/dev/dsf/bpe/v2/client/dsf/AbstractDsfClientJerseyWithRetry.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,20 @@ public abstract class AbstractDsfClientJerseyWithRetry
3434

3535
protected final DsfClientJersey delegate;
3636
private final int nTimes;
37-
private final Duration delay;
37+
private final DelayStrategy delayStrategy;
3838

39-
protected AbstractDsfClientJerseyWithRetry(DsfClientJersey delegate, int nTimes, Duration delay)
39+
protected AbstractDsfClientJerseyWithRetry(DsfClientJersey delegate, int nTimes, DelayStrategy delayStrategy)
4040
{
4141
this.delegate = delegate;
4242
this.nTimes = nTimes;
43-
this.delay = delay;
43+
this.delayStrategy = delayStrategy;
4444
}
4545

4646
protected final <R> R retry(Supplier<R> supplier)
4747
{
4848
RuntimeException caughtException = null;
49+
Duration delay = delayStrategy.getFirstDelay();
50+
4951
for (int tryNumber = 0; tryNumber <= nTimes || nTimes == RetryClient.RETRY_FOREVER; tryNumber++)
5052
{
5153
try
@@ -63,8 +65,11 @@ else if (nTimes != RetryClient.RETRY_FOREVER)
6365
{
6466
if (tryNumber < nTimes || nTimes == RetryClient.RETRY_FOREVER)
6567
{
66-
logger.warn("Caught {} - {}; trying again in {}s{}", e.getClass(), e.getMessage(),
67-
delay.toSeconds(),
68+
if (tryNumber > 0)
69+
delay = delayStrategy.getNextDelay(delay);
70+
71+
logger.warn("Caught {} - {}; trying again in {}{}", e.getClass(), e.getMessage(),
72+
delay.toString(),
6873
nTimes == RetryClient.RETRY_FOREVER ? " (retry " + (tryNumber + 1) + ")" : "");
6974

7075
try

dsf-bpe/dsf-bpe-process-api-v2-impl/src/main/java/dev/dsf/bpe/v2/client/dsf/BasicDsfClientWithRetryImpl.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@
1616
package dev.dsf.bpe.v2.client.dsf;
1717

1818
import java.io.InputStream;
19-
import java.time.Duration;
2019
import java.util.List;
2120
import java.util.Map;
22-
import java.util.concurrent.Future;
21+
import java.util.concurrent.CompletableFuture;
2322

2423
import org.hl7.fhir.r4.model.Binary;
2524
import org.hl7.fhir.r4.model.Bundle;
@@ -32,9 +31,9 @@
3231

3332
class BasicDsfClientWithRetryImpl extends AbstractDsfClientJerseyWithRetry implements BasicDsfClient
3433
{
35-
BasicDsfClientWithRetryImpl(DsfClientJersey delegate, int nTimes, Duration delay)
34+
BasicDsfClientWithRetryImpl(DsfClientJersey delegate, int nTimes, DelayStrategy delayStrategy)
3635
{
37-
super(delegate, nTimes, delay);
36+
super(delegate, nTimes, delayStrategy);
3837
}
3938

4039
@Override
@@ -92,29 +91,29 @@ public Bundle search(Class<? extends Resource> resourceType, Map<String, List<St
9291
}
9392

9493
@Override
95-
public Future<Bundle> searchAsync(Duration initialPollingInterval, Class<? extends Resource> resourceType,
94+
public CompletableFuture<Bundle> searchAsync(DelayStrategy delayStrategy, Class<? extends Resource> resourceType,
9695
Map<String, List<String>> parameters)
9796
{
98-
return retry(() -> delegate.searchAsync(initialPollingInterval, resourceType, parameters));
97+
return retry(() -> delegate.searchAsync(delayStrategy, resourceType, parameters));
9998
}
10099

101100
@Override
102-
public Future<Bundle> searchAsync(Duration initialPollingInterval, String url)
101+
public CompletableFuture<Bundle> searchAsync(DelayStrategy delayStrategy, String url)
103102
{
104-
return retry(() -> delegate.searchAsync(initialPollingInterval, url));
103+
return retry(() -> delegate.searchAsync(delayStrategy, url));
105104
}
106105

107106
@Override
108-
public Future<Bundle> searchAsyncWithStrictHandling(Duration initialPollingInterval,
107+
public CompletableFuture<Bundle> searchAsyncWithStrictHandling(DelayStrategy delayStrategy,
109108
Class<? extends Resource> resourceType, Map<String, List<String>> parameters)
110109
{
111-
return retry(() -> delegate.searchAsyncWithStrictHandling(initialPollingInterval, resourceType, parameters));
110+
return retry(() -> delegate.searchAsyncWithStrictHandling(delayStrategy, resourceType, parameters));
112111
}
113112

114113
@Override
115-
public Future<Bundle> searchAsyncWithStrictHandling(Duration initialPollingInterval, String url)
114+
public CompletableFuture<Bundle> searchAsyncWithStrictHandling(DelayStrategy delayStrategy, String url)
116115
{
117-
return retry(() -> delegate.searchAsyncWithStrictHandling(initialPollingInterval, url));
116+
return retry(() -> delegate.searchAsyncWithStrictHandling(delayStrategy, url));
118117
}
119118

120119
@Override

dsf-bpe/dsf-bpe-process-api-v2-impl/src/main/java/dev/dsf/bpe/v2/client/dsf/DsfClientJersey.java

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.TimeZone;
3030
import java.util.concurrent.CompletableFuture;
3131
import java.util.concurrent.Executors;
32-
import java.util.concurrent.Future;
3332
import java.util.concurrent.ScheduledExecutorService;
3433
import java.util.concurrent.TimeUnit;
3534
import java.util.function.Supplier;
@@ -968,7 +967,7 @@ public Bundle searchWithStrictHandling(Class<? extends Resource> resourceType, M
968967
}
969968

970969
@Override
971-
public Future<Bundle> searchAsync(Duration initialPollingInterval, Class<? extends Resource> resourceType,
970+
public CompletableFuture<Bundle> searchAsync(DelayStrategy delayStrategy, Class<? extends Resource> resourceType,
972971
Map<String, List<String>> parameters)
973972
{
974973
Objects.requireNonNull(resourceType, "resourceType");
@@ -980,19 +979,19 @@ public Future<Bundle> searchAsync(Duration initialPollingInterval, Class<? exten
980979
target = target.queryParam(entry.getKey(), entry.getValue().toArray());
981980
}
982981

983-
return doSearchAsync(initialPollingInterval, target, false);
982+
return doSearchAsync(delayStrategy, target, false);
984983
}
985984

986985
@Override
987-
public Future<Bundle> searchAsync(Duration initialPollingInterval, String url)
986+
public CompletableFuture<Bundle> searchAsync(DelayStrategy delayStrategy, String url)
988987
{
989988
checkUri(url);
990989

991-
return doSearchAsync(initialPollingInterval, client.target(url), false);
990+
return doSearchAsync(delayStrategy, client.target(url), false);
992991
}
993992

994993
@Override
995-
public Future<Bundle> searchAsyncWithStrictHandling(Duration initialPollingInterval,
994+
public CompletableFuture<Bundle> searchAsyncWithStrictHandling(DelayStrategy delayStrategy,
996995
Class<? extends Resource> resourceType, Map<String, List<String>> parameters)
997996
{
998997
Objects.requireNonNull(resourceType, "resourceType");
@@ -1004,15 +1003,15 @@ public Future<Bundle> searchAsyncWithStrictHandling(Duration initialPollingInter
10041003
target = target.queryParam(entry.getKey(), entry.getValue().toArray());
10051004
}
10061005

1007-
return doSearchAsync(initialPollingInterval, target, true);
1006+
return doSearchAsync(delayStrategy, target, true);
10081007
}
10091008

10101009
@Override
1011-
public Future<Bundle> searchAsyncWithStrictHandling(Duration initialPollingInterval, String url)
1010+
public CompletableFuture<Bundle> searchAsyncWithStrictHandling(DelayStrategy delayStrategy, String url)
10121011
{
10131012
checkUri(url);
10141013

1015-
return doSearchAsync(initialPollingInterval, client.target(url), true);
1014+
return doSearchAsync(delayStrategy, client.target(url), true);
10161015
}
10171016

10181017
private void checkUri(String url)
@@ -1022,9 +1021,11 @@ private void checkUri(String url)
10221021
throw new RuntimeException("url is blank");
10231022
if (!url.startsWith(baseUrl))
10241023
throw new RuntimeException("url not starting with client base url");
1024+
if (url.startsWith(baseUrl + "@"))
1025+
throw new RuntimeException("url starting with client base url + @");
10251026
}
10261027

1027-
private Future<Bundle> doSearchAsync(Duration initialPollingInterval, WebTarget target, boolean strict)
1028+
private CompletableFuture<Bundle> doSearchAsync(DelayStrategy delayStrategy, WebTarget target, boolean strict)
10281029
{
10291030
Builder requestBuilder = target.request().header(Constants.HEADER_PREFER,
10301031
Constants.HEADER_PREFER_RESPOND_ASYNC);
@@ -1044,11 +1045,22 @@ public void completed(Response response)
10441045
else if (Status.ACCEPTED.getStatusCode() == response.getStatus())
10451046
{
10461047
String location = response.getHeaderString(HttpHeaders.LOCATION);
1047-
if (location == null)
1048-
throw new RuntimeException("202 Accepted without Location header");
1048+
if (location == null || location.isBlank())
1049+
{
1050+
logger.warn("202 Accepted without Location header");
1051+
1052+
location = response.getHeaderString(HttpHeaders.CONTENT_LOCATION);
1053+
if (location == null || location.isBlank())
1054+
throw new RuntimeException(
1055+
"202 Accepted without Location and without Content-Location header");
1056+
else
1057+
logger.warn("202 Accepted with Content-Location header");
1058+
}
1059+
1060+
checkUri(location);
10491061

10501062
response.close();
1051-
pollUntilComplete(location, initialPollingInterval.toMillis(), resultFuture);
1063+
pollUntilComplete(location, delayStrategy, resultFuture);
10521064
}
10531065
else
10541066
resultFuture.completeExceptionally(handleError(response));
@@ -1064,36 +1076,32 @@ public void failed(Throwable throwable)
10641076
return resultFuture;
10651077
}
10661078

1067-
private void pollUntilComplete(String pollUrl, long pollIntervalMillis, CompletableFuture<Bundle> resultFuture)
1079+
private void pollUntilComplete(String location, DelayStrategy delayStrategy, CompletableFuture<Bundle> resultFuture)
10681080
{
10691081
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
10701082

10711083
Runnable poll = new Runnable()
10721084
{
1073-
private long pollInterval = pollIntervalMillis;
1085+
private Duration delay = delayStrategy.getFirstDelay();
10741086

10751087
@Override
10761088
public void run()
10771089
{
1090+
if (resultFuture.isCancelled())
1091+
return;
1092+
10781093
try
10791094
{
1080-
checkUri(pollUrl);
1081-
1082-
Response response = client.target(pollUrl).request().get();
1095+
Response response = client.target(location).request().get();
10831096

10841097
if (Status.OK.getStatusCode() == response.getStatus())
10851098
resultFuture.complete(response.readEntity(Bundle.class));
10861099
else if (Status.ACCEPTED.getStatusCode() == response.getStatus())
10871100
{
1088-
String location = response.getHeaderString(HttpHeaders.LOCATION);
1089-
if (location == null)
1090-
throw new RuntimeException("202 Accepted without Location header");
1091-
10921101
response.close();
10931102

1094-
pollInterval *= 2;
1095-
1096-
executor.schedule(this, pollInterval, TimeUnit.MILLISECONDS);
1103+
delay = delayStrategy.getNextDelay(delay);
1104+
executor.schedule(this, delay.toMillis(), TimeUnit.MILLISECONDS);
10971105
}
10981106
else
10991107
resultFuture.completeExceptionally(handleError(response));
@@ -1106,7 +1114,7 @@ else if (Status.ACCEPTED.getStatusCode() == response.getStatus())
11061114
}
11071115
};
11081116

1109-
executor.schedule(poll, pollIntervalMillis, TimeUnit.MILLISECONDS);
1117+
executor.schedule(poll, delayStrategy.getFirstDelay().toMillis(), TimeUnit.MILLISECONDS);
11101118
}
11111119

11121120
@Override
@@ -1165,23 +1173,21 @@ public StructureDefinition generateSnapshot(StructureDefinition differential)
11651173
}
11661174

11671175
@Override
1168-
public BasicDsfClient withRetry(int nTimes, Duration delay)
1176+
public BasicDsfClient withRetry(int nTimes, DelayStrategy delayStrategy)
11691177
{
11701178
if (nTimes < 0)
11711179
throw new IllegalArgumentException("nTimes < 0");
1172-
if (delay == null || delay.isNegative())
1173-
throw new IllegalArgumentException("delay null or negative");
1180+
Objects.requireNonNull(delayStrategy, "delayStrategy");
11741181

1175-
return new BasicDsfClientWithRetryImpl(this, nTimes, delay);
1182+
return new BasicDsfClientWithRetryImpl(this, nTimes, delayStrategy);
11761183
}
11771184

11781185
@Override
1179-
public BasicDsfClient withRetryForever(Duration delay)
1186+
public BasicDsfClient withRetryForever(DelayStrategy delayStrategy)
11801187
{
1181-
if (delay == null || delay.isNegative())
1182-
throw new IllegalArgumentException("delay null or negative");
1188+
Objects.requireNonNull(delayStrategy, "delayStrategy");
11831189

1184-
return new BasicDsfClientWithRetryImpl(this, RETRY_FOREVER, delay);
1190+
return new BasicDsfClientWithRetryImpl(this, RETRY_FOREVER, delayStrategy);
11851191
}
11861192

11871193
@Override

dsf-bpe/dsf-bpe-process-api-v2-impl/src/main/java/dev/dsf/bpe/v2/client/dsf/PreferReturnMinimalRetryImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package dev.dsf.bpe.v2.client.dsf;
1717

1818
import java.io.InputStream;
19-
import java.time.Duration;
2019
import java.util.List;
2120
import java.util.Map;
2221

@@ -28,9 +27,9 @@
2827

2928
class PreferReturnMinimalRetryImpl extends AbstractDsfClientJerseyWithRetry implements PreferReturnMinimal
3029
{
31-
PreferReturnMinimalRetryImpl(DsfClientJersey delegate, int nTimes, Duration delay)
30+
PreferReturnMinimalRetryImpl(DsfClientJersey delegate, int nTimes, DelayStrategy delayStrategy)
3231
{
33-
super(delegate, nTimes, delay);
32+
super(delegate, nTimes, delayStrategy);
3433
}
3534

3635
@Override

dsf-bpe/dsf-bpe-process-api-v2-impl/src/main/java/dev/dsf/bpe/v2/client/dsf/PreferReturnMinimalWithRetryImpl.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package dev.dsf.bpe.v2.client.dsf;
1717

1818
import java.io.InputStream;
19-
import java.time.Duration;
2019
import java.util.List;
2120
import java.util.Map;
2221

@@ -78,22 +77,22 @@ public Bundle postBundle(Bundle bundle)
7877
}
7978

8079
@Override
81-
public PreferReturnMinimal withRetry(int nTimes, Duration delay)
80+
public PreferReturnMinimal withRetry(int nTimes, DelayStrategy delayStrategy)
8281
{
8382
if (nTimes < 0)
8483
throw new IllegalArgumentException("nTimes < 0");
85-
if (delay == null || delay.isNegative())
86-
throw new IllegalArgumentException("delay null or negative");
84+
if (delayStrategy == null)
85+
throw new IllegalArgumentException("delayStrategy null");
8786

88-
return new PreferReturnMinimalRetryImpl(delegate, nTimes, delay);
87+
return new PreferReturnMinimalRetryImpl(delegate, nTimes, delayStrategy);
8988
}
9089

9190
@Override
92-
public PreferReturnMinimal withRetryForever(Duration delay)
91+
public PreferReturnMinimal withRetryForever(DelayStrategy delayStrategy)
9392
{
94-
if (delay == null || delay.isNegative())
95-
throw new IllegalArgumentException("delay null or negative");
93+
if (delayStrategy == null)
94+
throw new IllegalArgumentException("delayStrategy null");
9695

97-
return new PreferReturnMinimalRetryImpl(delegate, RETRY_FOREVER, delay);
96+
return new PreferReturnMinimalRetryImpl(delegate, RETRY_FOREVER, delayStrategy);
9897
}
9998
}

dsf-bpe/dsf-bpe-process-api-v2-impl/src/main/java/dev/dsf/bpe/v2/client/dsf/PreferReturnOutcomeRetryImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package dev.dsf.bpe.v2.client.dsf;
1717

1818
import java.io.InputStream;
19-
import java.time.Duration;
2019
import java.util.List;
2120
import java.util.Map;
2221

@@ -28,9 +27,9 @@
2827

2928
class PreferReturnOutcomeRetryImpl extends AbstractDsfClientJerseyWithRetry implements PreferReturnOutcome
3029
{
31-
PreferReturnOutcomeRetryImpl(DsfClientJersey delegate, int nTimes, Duration delay)
30+
PreferReturnOutcomeRetryImpl(DsfClientJersey delegate, int nTimes, DelayStrategy delayStrategy)
3231
{
33-
super(delegate, nTimes, delay);
32+
super(delegate, nTimes, delayStrategy);
3433
}
3534

3635
@Override

0 commit comments

Comments
 (0)