-
Notifications
You must be signed in to change notification settings - Fork 11
chore: Add support for FDv2 fallback and recovery. #113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
08abc7e
bd15015
b20c621
b46ac5e
fba5aa2
472a1d2
0c150a1
5888761
9e79c00
33fa6c5
ec0d0ec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,184 @@ | ||
| package com.launchdarkly.sdk.server; | ||
|
|
||
| import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; | ||
|
|
||
| import java.io.Closeable; | ||
| import java.io.IOException; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.ScheduledFuture; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| /** | ||
| * Container class for FDv2 data source conditions and related types. | ||
| * <p> | ||
| * This class is non-constructable and serves only as a namespace for condition-related types. | ||
| * Package-private for internal use and testing. | ||
| */ | ||
| class FDv2DataSourceConditions { | ||
| /** | ||
| * Private constructor to prevent instantiation. | ||
| */ | ||
| private FDv2DataSourceConditions() { | ||
| } | ||
|
|
||
| /** | ||
| * Package-private for testing. | ||
| */ | ||
| interface Condition extends Closeable { | ||
| enum ConditionType { | ||
| FALLBACK, | ||
| RECOVERY, | ||
| } | ||
|
|
||
| CompletableFuture<Condition> execute(); | ||
|
|
||
| void inform(FDv2SourceResult sourceResult); | ||
|
|
||
| void close(); | ||
|
|
||
| ConditionType getType(); | ||
| } | ||
|
|
||
| interface ConditionFactory { | ||
| Condition build(); | ||
|
|
||
| Condition.ConditionType getType(); | ||
| } | ||
|
|
||
| static abstract class TimedCondition implements Condition { | ||
| protected final CompletableFuture<Condition> resultFuture = new CompletableFuture<>(); | ||
|
|
||
| protected final ScheduledExecutorService sharedExecutor; | ||
|
|
||
| /** | ||
| * Future for the timeout task, if any. Will be null when no timeout is active. | ||
| */ | ||
| protected ScheduledFuture<Void> timerFuture; | ||
|
|
||
| /** | ||
| * Timeout duration for the fallback operation. | ||
| */ | ||
| protected final long timeoutSeconds; | ||
|
|
||
| public TimedCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { | ||
| this.sharedExecutor = sharedExecutor; | ||
| this.timeoutSeconds = timeoutSeconds; | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<Condition> execute() { | ||
| return resultFuture; | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| if (timerFuture != null) { | ||
| timerFuture.cancel(false); | ||
| timerFuture = null; | ||
| } | ||
| } | ||
|
|
||
| static abstract class Factory implements ConditionFactory { | ||
| protected final ScheduledExecutorService sharedExecutor; | ||
| protected final long timeoutSeconds; | ||
|
|
||
| public Factory(ScheduledExecutorService sharedExecutor, long timeout) { | ||
| this.sharedExecutor = sharedExecutor; | ||
| this.timeoutSeconds = timeout; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * This condition is used to determine if a fallback should be performed. It is informed of each data source result | ||
| * via {@link #inform(FDv2SourceResult)}. Based on the results, it updates its internal state. When the fallback | ||
| * condition is met, then the {@link java.util.concurrent.Future} returned by {@link #execute()} will complete. | ||
| * <p> | ||
| * This is package-private, instead of private, for ease of testing. | ||
| */ | ||
| static class FallbackCondition extends TimedCondition { | ||
| static class Factory extends TimedCondition.Factory { | ||
| public Factory(ScheduledExecutorService sharedExecutor, long timeout) { | ||
| super(sharedExecutor, timeout); | ||
| } | ||
|
|
||
| @Override | ||
| public Condition build() { | ||
| return new FallbackCondition(sharedExecutor, timeoutSeconds); | ||
| } | ||
|
|
||
| @Override | ||
| public ConditionType getType() { | ||
| return ConditionType.FALLBACK; | ||
| } | ||
| } | ||
|
|
||
| public FallbackCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { | ||
| super(sharedExecutor, timeoutSeconds); | ||
| } | ||
|
|
||
| @Override | ||
| public void inform(FDv2SourceResult sourceResult) { | ||
| if (sourceResult == null) { | ||
| return; | ||
| } | ||
| if (sourceResult.getResultType() == FDv2SourceResult.ResultType.CHANGE_SET) { | ||
| if (timerFuture != null) { | ||
| timerFuture.cancel(false); | ||
| timerFuture = null; | ||
| } | ||
| } | ||
| if (sourceResult.getResultType() == FDv2SourceResult.ResultType.STATUS && sourceResult.getStatus().getState() == FDv2SourceResult.State.INTERRUPTED) { | ||
| if (timerFuture == null) { | ||
| timerFuture = sharedExecutor.schedule(() -> { | ||
| resultFuture.complete(this); | ||
| return null; | ||
| }, timeoutSeconds, TimeUnit.SECONDS); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public ConditionType getType() { | ||
| return ConditionType.FALLBACK; | ||
| } | ||
| } | ||
|
|
||
| static class RecoveryCondition extends TimedCondition { | ||
|
|
||
| static class Factory extends TimedCondition.Factory { | ||
| public Factory(ScheduledExecutorService sharedExecutor, long timeout) { | ||
| super(sharedExecutor, timeout); | ||
| } | ||
|
|
||
| @Override | ||
| public Condition build() { | ||
| return new RecoveryCondition(sharedExecutor, timeoutSeconds); | ||
| } | ||
|
|
||
| @Override | ||
| public ConditionType getType() { | ||
| return ConditionType.RECOVERY; | ||
| } | ||
| } | ||
|
|
||
| public RecoveryCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { | ||
| super(sharedExecutor, timeoutSeconds); | ||
| timerFuture = sharedExecutor.schedule(() -> { | ||
| resultFuture.complete(this); | ||
| return null; | ||
| }, timeoutSeconds, TimeUnit.SECONDS); | ||
| } | ||
|
|
||
| @Override | ||
| public void inform(FDv2SourceResult sourceResult) { | ||
| // Time-based recovery. | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may need to discuss the spec. My interpretation is that the synchronizer just needs to run for the recovery time, not that it needs to be in any specific state. Considering the fallback time is less than the recovery time, if the data source is in a bad state it will fallback before recovering. The question is if we care about recovering near having received a changeset. Or if we want changesets to reset the time. |
||
| } | ||
|
|
||
| @Override | ||
| public ConditionType getType() { | ||
| return ConditionType.RECOVERY; | ||
| } | ||
| } | ||
| } | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just a move to a new file. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| package com.launchdarkly.sdk.server; | ||
|
|
||
| import com.launchdarkly.sdk.server.datasources.Synchronizer; | ||
|
|
||
| class SynchronizerFactoryWithState { | ||
| public enum State { | ||
| /** | ||
| * This synchronizer is available to use. | ||
| */ | ||
| Available, | ||
|
|
||
| /** | ||
| * This synchronizer is no longer available to use. | ||
| */ | ||
| Blocked | ||
| } | ||
|
|
||
| private final FDv2DataSource.DataSourceFactory<Synchronizer> factory; | ||
|
|
||
| private State state = State.Available; | ||
|
|
||
|
|
||
| public SynchronizerFactoryWithState(FDv2DataSource.DataSourceFactory<Synchronizer> factory) { | ||
| this.factory = factory; | ||
| } | ||
|
|
||
| public State getState() { | ||
| return state; | ||
| } | ||
|
|
||
| public void block() { | ||
| state = State.Blocked; | ||
| } | ||
|
|
||
| public Synchronizer build() { | ||
| return factory.build(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A condition completes when the conditions requirements are met. It is informed by being fed the data source changes. If doesn't wrap the data sources themselves, because that increases complexity of handling the asynchronous nature, and doesn't clearly separate the responsibility.
A condition could complete inside an inform callback, in which case the state change will be caught in the next loop pass.