Skip to content
Draft

Large diffs are not rendered by default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A condition completes when the conditions requirements are met. It is informed by being fed the data source changes. If doesn't wrap the data sources themselves, because that increases complexity of handling the asynchronous nature, and doesn't clearly separate the responsibility.

A condition could complete inside an inform callback, in which case the state change will be caught in the next loop pass.

Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package com.launchdarkly.sdk.server;

import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* Container class for FDv2 data source conditions and related types.
* <p>
* This class is non-constructable and serves only as a namespace for condition-related types.
* Package-private for internal use and testing.
*/
class FDv2DataSourceConditions {
/**
* Private constructor to prevent instantiation.
*/
private FDv2DataSourceConditions() {
}

/**
* Package-private for testing.
*/
interface Condition extends Closeable {
enum ConditionType {
FALLBACK,
RECOVERY,
}

CompletableFuture<Condition> execute();

void inform(FDv2SourceResult sourceResult);

void close();

ConditionType getType();
}

interface ConditionFactory {
Condition build();

Condition.ConditionType getType();
}

static abstract class TimedCondition implements Condition {
protected final CompletableFuture<Condition> resultFuture = new CompletableFuture<>();

protected final ScheduledExecutorService sharedExecutor;

/**
* Future for the timeout task, if any. Will be null when no timeout is active.
*/
protected ScheduledFuture<Void> timerFuture;

/**
* Timeout duration for the fallback operation.
*/
protected final long timeoutSeconds;

public TimedCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) {
this.sharedExecutor = sharedExecutor;
this.timeoutSeconds = timeoutSeconds;
}

@Override
public CompletableFuture<Condition> execute() {
return resultFuture;
}

@Override
public void close() {
if (timerFuture != null) {
timerFuture.cancel(false);
timerFuture = null;
}
}

static abstract class Factory implements ConditionFactory {
protected final ScheduledExecutorService sharedExecutor;
protected final long timeoutSeconds;

public Factory(ScheduledExecutorService sharedExecutor, long timeout) {
this.sharedExecutor = sharedExecutor;
this.timeoutSeconds = timeout;
}
}
}

/**
* This condition is used to determine if a fallback should be performed. It is informed of each data source result
* via {@link #inform(FDv2SourceResult)}. Based on the results, it updates its internal state. When the fallback
* condition is met, then the {@link java.util.concurrent.Future} returned by {@link #execute()} will complete.
* <p>
* This is package-private, instead of private, for ease of testing.
*/
static class FallbackCondition extends TimedCondition {
static class Factory extends TimedCondition.Factory {
public Factory(ScheduledExecutorService sharedExecutor, long timeout) {
super(sharedExecutor, timeout);
}

@Override
public Condition build() {
return new FallbackCondition(sharedExecutor, timeoutSeconds);
}

@Override
public ConditionType getType() {
return ConditionType.FALLBACK;
}
}

public FallbackCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) {
super(sharedExecutor, timeoutSeconds);
}

@Override
public void inform(FDv2SourceResult sourceResult) {
if (sourceResult == null) {
return;
}
if (sourceResult.getResultType() == FDv2SourceResult.ResultType.CHANGE_SET) {
if (timerFuture != null) {
timerFuture.cancel(false);
timerFuture = null;
}
}
if (sourceResult.getResultType() == FDv2SourceResult.ResultType.STATUS && sourceResult.getStatus().getState() == FDv2SourceResult.State.INTERRUPTED) {
if (timerFuture == null) {
timerFuture = sharedExecutor.schedule(() -> {
resultFuture.complete(this);
return null;
}, timeoutSeconds, TimeUnit.SECONDS);
}
}
}

@Override
public ConditionType getType() {
return ConditionType.FALLBACK;
}
}

static class RecoveryCondition extends TimedCondition {

static class Factory extends TimedCondition.Factory {
public Factory(ScheduledExecutorService sharedExecutor, long timeout) {
super(sharedExecutor, timeout);
}

@Override
public Condition build() {
return new RecoveryCondition(sharedExecutor, timeoutSeconds);
}

@Override
public ConditionType getType() {
return ConditionType.RECOVERY;
}
}

public RecoveryCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) {
super(sharedExecutor, timeoutSeconds);
timerFuture = sharedExecutor.schedule(() -> {
resultFuture.complete(this);
return null;
}, timeoutSeconds, TimeUnit.SECONDS);
}

@Override
public void inform(FDv2SourceResult sourceResult) {
// Time-based recovery.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to discuss the spec. My interpretation is that the synchronizer just needs to run for the recovery time, not that it needs to be in any specific state.

Considering the fallback time is less than the recovery time, if the data source is in a bad state it will fallback before recovering.

The question is if we care about recovering near having received a changeset. Or if we want changesets to reset the time.

}

@Override
public ConditionType getType() {
return ConditionType.RECOVERY;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ static FDv2DataSystem create(
DataSource dataSource = new FDv2DataSource(
initializerFactories,
synchronizerFactories,
dataSourceUpdates
dataSourceUpdates,
config.threadPriority,
clientContext.getBaseLogger().subLogger(Loggers.DATA_SOURCE_LOGGER_NAME),
clientContext.sharedExecutor
);
DataSourceStatusProvider dataSourceStatusProvider = new DataSourceStatusProviderImpl(
dataSourceStatusBroadcaster,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ private Loggers() {}
static final String EVALUATION_LOGGER_NAME = "Evaluation";
static final String EVENTS_LOGGER_NAME = "Events";
static final String HOOKS_LOGGER_NAME = "Hooks";
static final String STREAMING_SYNCHRONIZER = "StreamingSynchronizer";
static final String POLLING_SYNCHRONIZER = "PollingSynchronizer";
static final String POLLING_INITIALIZER = "PollingInitializer";
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class PollingInitializerImpl extends PollingBase implements Initializer {
private final SelectorSource selectorSource;

public PollingInitializerImpl(FDv2Requestor requestor, LDLogger logger, SelectorSource selectorSource) {
super(requestor, logger);
super(requestor, logger.subLogger(Loggers.POLLING_INITIALIZER));
this.selectorSource = selectorSource;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public PollingSynchronizerImpl(
ScheduledExecutorService sharedExecutor,
Duration pollInterval
) {
super(requestor, logger);
super(requestor, logger.subLogger(Loggers.POLLING_SYNCHRONIZER));
this.selectorSource = selectorSource;

synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public StreamingSynchronizerImpl(
) {
this.httpProperties = httpProperties;
this.selectorSource = selectorSource;
this.logger = logger;
this.logger = logger.subLogger(Loggers.STREAMING_SYNCHRONIZER);
this.payloadFilter = payloadFilter;
this.streamUri = HttpHelpers.concatenateUriPath(baseUri, requestPath);
this.initialReconnectDelay = initialReconnectDelaySeconds;
Expand Down
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a move to a new file.

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.launchdarkly.sdk.server;

import com.launchdarkly.sdk.server.datasources.Synchronizer;

class SynchronizerFactoryWithState {
public enum State {
/**
* This synchronizer is available to use.
*/
Available,

/**
* This synchronizer is no longer available to use.
*/
Blocked
}

private final FDv2DataSource.DataSourceFactory<Synchronizer> factory;

private State state = State.Available;


public SynchronizerFactoryWithState(FDv2DataSource.DataSourceFactory<Synchronizer> factory) {
this.factory = factory;
}

public State getState() {
return state;
}

public void block() {
state = State.Blocked;
}

public Synchronizer build() {
return factory.build();
}
}
Loading