Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.implementation.AuthorizationTokenType;
import com.azure.cosmos.implementation.ClientSideRequestStatistics;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.FailureValidator;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.HttpConstants.SubStatusCodes;
import com.azure.cosmos.implementation.IAuthorizationTokenProvider;
import com.azure.cosmos.implementation.ISessionContainer;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.PartitionIsMigratingException;
import com.azure.cosmos.implementation.PartitionKeyRangeGoneException;
import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException;
import com.azure.cosmos.implementation.RequestTimeoutException;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RetryContext;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.SessionTokenHelper;
Expand All @@ -33,10 +37,12 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -358,6 +364,225 @@ public void isGlobalStrongRequest(ConsistencyLevel defaultConsistencyLevel, RxDo
assertThat(consistencyWriter.isGlobalStrongRequest(req, storeResponse)).isEqualTo(isGlobalStrongExpected);
}

@Test(groups = "unit")
public void writeAsyncGlobalStrongRequest() {
runWriteAsyncBarrierableRequestTest(true, true);
}

@Test(groups = "unit")
public void writeAsyncGlobalStrongRequestFailed() {
runWriteAsyncBarrierableRequestTest(true, false);
}

@Test(groups = "unit")
public void writeAsyncNRegionCommitRequest() {
runWriteAsyncBarrierableRequestTest(false, true);
}

@Test(groups = "unit")
public void writeAsyncNRegionCommitRequestFailed() {
runWriteAsyncBarrierableRequestTest(false, false);
}

@Test(groups = "unit")
public void writeAsyncNoBarrierRequest() {
initializeConsistencyWriter(false);
RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext);
TimeoutHelper timeoutHelper = Mockito.mock(TimeoutHelper.class);
Mockito.doReturn(false).when(timeoutHelper).isElapsed();
StoreResponse storeResponse = Mockito.mock(StoreResponse.class);
Mockito.doReturn("0").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.NUMBER_OF_READ_REGIONS);
Mockito.doReturn(ConsistencyLevel.SESSION).when(serviceConfigReader).getDefaultConsistencyLevel();
ConsistencyWriter spyWriter = Mockito.spy(this.consistencyWriter);
Mockito.doReturn(Mono.just(storeResponse)).when(spyWriter).barrierForGlobalStrong(Mockito.any(), Mockito.any(), Mockito.any());
AddressInformation addressInformation = Mockito.mock(AddressInformation.class);
Uri primaryUri = Mockito.mock(Uri.class);
Mockito.doReturn(true).when(primaryUri).isPrimary();
Mockito.doReturn("Healthy").when(primaryUri).getHealthStatusDiagnosticString();
Mockito.doReturn(primaryUri).when(addressInformation).getPhysicalUri();
List<AddressInformation> addressList = Collections.singletonList(addressInformation);
Mockito.doReturn(Mono.just(addressList)).when(addressSelector).resolveAddressesAsync(Mockito.any(), Mockito.anyBoolean());
Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class));
Mono<StoreResponse> result = spyWriter.writeAsync(request, timeoutHelper, false);
StepVerifier.create(result)
.expectNext(storeResponse)
.expectComplete()
.verify();
}

@Test(groups = "unit")
public void getBarrierRequestType() {
// Setup ConsistencyWriter with useMultipleWriteLocations false
initializeConsistencyWriter(false);
ConsistencyWriter writer = this.consistencyWriter;
RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext);
StoreResponse response = Mockito.mock(StoreResponse.class);

// 1. Global strong enabled and isGlobalStrongRequest returns true
try (MockedStatic<ReplicatedResourceClient> replicatedResourceClientMock = Mockito.mockStatic(ReplicatedResourceClient.class)) {
replicatedResourceClientMock.when(ReplicatedResourceClient::isGlobalStrongEnabled).thenReturn(true);
ConsistencyWriter spyWriter = Mockito.spy(writer);
Mockito.doReturn(true).when(spyWriter).isGlobalStrongRequest(request, response);
BarrierType barrierType = spyWriter.getBarrierRequestType(request, response);
assertThat(barrierType).isEqualTo(BarrierType.GLOBAL_STRONG_WRITE);
}

// 2. NRegionSynchronousCommitEnabled path
// Setup request.requestContext.getNRegionSynchronousCommitEnabled() to true
request.requestContext.setNRegionSynchronousCommitEnabled(true);
// useMultipleWriteLocations is already false
Mockito.doReturn("123").when(response).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN);
Mockito.doReturn(2L).when(response).getNumberOfReadRegions();
BarrierType barrierType = writer.getBarrierRequestType(request, response);
assertThat(barrierType).isEqualTo(BarrierType.N_REGION_SYNCHRONOUS_COMMIT);


// 3. Negative case: NRegionSynchronousCommitEnabled false
request.requestContext.setNRegionSynchronousCommitEnabled(false);
BarrierType negativeResult = writer.getBarrierRequestType(request, response);
assertThat(negativeResult).isEqualTo(BarrierType.NONE);

// 4. Negative case: useMultipleWriteLocations true
initializeConsistencyWriter(true);
writer = this.consistencyWriter;
request.requestContext.setNRegionSynchronousCommitEnabled(true);
BarrierType negativeResult2 = writer.getBarrierRequestType(request, response);
assertThat(negativeResult2).isEqualTo(BarrierType.NONE);

// 5. Negative case: GLOBAL_NREGION_COMMITTED_LSN header missing
initializeConsistencyWriter(false);
writer = this.consistencyWriter;
request.requestContext.setNRegionSynchronousCommitEnabled(true);
Mockito.doReturn(null).when(response).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN);
BarrierType negativeResult3 = writer.getBarrierRequestType(request, response);
assertThat(negativeResult3).isEqualTo(BarrierType.NONE);

// 6. Negative case: NUMBER_OF_READ_REGIONS header missing or zero
Mockito.doReturn(0L).when(response).getNumberOfReadRegions();
BarrierType negativeResult4 = writer.getBarrierRequestType(request, response);
assertThat(negativeResult4).isEqualTo(BarrierType.NONE);
}

private void runWriteAsyncBarrierableRequestTest(boolean globalStrong, boolean barrierMet) {
RxDocumentServiceRequest request = setupRequest(!globalStrong);
TimeoutHelper timeoutHelper = Mockito.mock(TimeoutHelper.class);
Mockito.doReturn(false).when(timeoutHelper).isElapsed();
StoreResponse storeResponse = setupStoreResponse(!globalStrong);
List<AddressInformation> addressList = setupAddressList();
List<StoreResult> storeResults = new ArrayList<>();
if (barrierMet) {
storeResults.add(getStoreResult(storeResponse, 1L));
storeResults.add(getStoreResult(storeResponse, 2L));
} else {
storeResults.add(getStoreResult(storeResponse, 1L));
}
StoreReader storeReader = setupStoreReader(storeResults);
initializeConsistencyWriterWithStoreReader(false, storeReader);
ConsistencyWriter spyWriter = Mockito.spy(this.consistencyWriter);
Mockito.doReturn(globalStrong ? ConsistencyLevel.STRONG : ConsistencyLevel.SESSION)
.when(serviceConfigReader).getDefaultConsistencyLevel();
Mockito.doReturn(Mono.just(addressList)).when(addressSelector).resolveAddressesAsync(Mockito.any(), Mockito.anyBoolean());
Mockito.doReturn(Mono.just(storeResponse)).when(transportClient).invokeResourceOperationAsync(Mockito.any(Uri.class), Mockito.any(RxDocumentServiceRequest.class));
Mono<StoreResponse> result = spyWriter.writeAsync(request, timeoutHelper, false);
if (!barrierMet) {
StepVerifier.create(result)
.expectErrorSatisfies(error -> {
assertThat(error).isInstanceOf(GoneException.class);
FailureValidator failureValidator = FailureValidator.builder()
.instanceOf(GoneException.class)
.statusCode(GONE)
.subStatusCode(globalStrong ? SubStatusCodes.GLOBAL_STRONG_WRITE_BARRIER_NOT_MET : SubStatusCodes.GLOBAL_N_REGION_COMMIT_WRITE_BARRIER_NOT_MET)
.build();
failureValidator.validate(error);
})
.verify();
} else {
StepVerifier.create(result)
.expectNext(storeResponse)
.expectComplete()
.verify();
}
}

private RxDocumentServiceRequest setupRequest(boolean nRegionCommit) {
RxDocumentServiceRequest request = mockDocumentServiceRequest(clientContext);
if (nRegionCommit) {
request.requestContext.setNRegionSynchronousCommitEnabled(true);
}
Mockito.doReturn(ResourceType.Document).when(request).getResourceType();
Mockito.doReturn(OperationType.Create).when(request).getOperationType();
Mockito.doReturn("1-MxAPlgMgA=").when(request).getResourceId();
request.authorizationTokenType = AuthorizationTokenType.PrimaryMasterKey;
return request;
}

private StoreResponse setupStoreResponse(boolean nRegionCommit) {
StoreResponse storeResponse = Mockito.mock(StoreResponse.class);
Mockito.doReturn(1L).when(storeResponse).getNumberOfReadRegions();
Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.NUMBER_OF_READ_REGIONS);
if (nRegionCommit) {
Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_N_REGION_COMMITTED_GLSN);
} else {
Mockito.doReturn("1").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN);
}
Mockito.doReturn("2").when(storeResponse).getHeaderValue(WFConstants.BackendHeaders.LSN);
return storeResponse;
}

private List<AddressInformation> setupAddressList() {
AddressInformation addressInformation = Mockito.mock(AddressInformation.class);
Uri primaryUri = Mockito.mock(Uri.class);
Mockito.doReturn(true).when(primaryUri).isPrimary();
Mockito.doReturn("Healthy").when(primaryUri).getHealthStatusDiagnosticString();
Mockito.doReturn(primaryUri).when(addressInformation).getPhysicalUri();
return Collections.singletonList(addressInformation);
}

private StoreReader setupStoreReader(List<StoreResult> storeResults) {
StoreReader storeReader = Mockito.mock(StoreReader.class);
Mono<List<StoreResult>>[] monos = storeResults.stream()
.map(Collections::singletonList)
.map(Mono::just)
.toArray(Mono[]::new);
Mockito.when(storeReader.readMultipleReplicaAsync(
Mockito.any(),
Mockito.anyBoolean(),
Mockito.anyInt(),
Mockito.anyBoolean(),
Mockito.anyBoolean(),
Mockito.any(),
Mockito.anyBoolean(),
Mockito.anyBoolean()))
.thenReturn(monos.length > 0 ? monos[0] : Mono.empty(),
Arrays.copyOfRange(monos, 1, monos.length));
return storeReader;
}

private StoreResult getStoreResult(StoreResponse storeResponse, long globalCommittedLsn) {
return new StoreResult(
storeResponse,
null,
"1",
1,
1,
1.0,
UUID.randomUUID().toString(),
UUID.randomUUID().toString(),
4,
2,
true,
null,
globalCommittedLsn,
1,
1,
null,
0.3,
90.0);
}




private void initializeConsistencyWriter(boolean useMultipleWriteLocation) {
addressSelector = Mockito.mock(AddressSelector.class);
sessionContainer = Mockito.mock(ISessionContainer.class);
Expand All @@ -375,6 +600,24 @@ private void initializeConsistencyWriter(boolean useMultipleWriteLocation) {
null);
}

private void initializeConsistencyWriterWithStoreReader(boolean useMultipleWriteLocation, StoreReader reader) {
addressSelector = Mockito.mock(AddressSelector.class);
sessionContainer = Mockito.mock(ISessionContainer.class);
transportClient = Mockito.mock(TransportClient.class);
IAuthorizationTokenProvider authorizationTokenProvider = Mockito.mock(IAuthorizationTokenProvider.class);
serviceConfigReader = Mockito.mock(GatewayServiceConfigurationReader.class);

consistencyWriter = new ConsistencyWriter(clientContext,
addressSelector,
sessionContainer,
transportClient,
authorizationTokenProvider,
serviceConfigReader,
useMultipleWriteLocation,
reader,
null);
}

public static <T> void validateError(Mono<T> single,
FailureValidator validator) {
TestSubscriber<T> testSubscriber = TestSubscriber.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ public static final class Properties {
public static final String THINCLIENT_READABLE_LOCATIONS = "thinClientReadableLocations";
public static final String DATABASE_ACCOUNT_ENDPOINT = "databaseAccountEndpoint";
public static final String ENABLE_PER_PARTITION_FAILOVER_BEHAVIOR = "enablePerPartitionFailoverBehavior";
public static final String ENABLE_N_REGION_SYNCHRONOUS_COMMIT = "enableNRegionSynchronousCommit";

//Authorization
public static final String MASTER_TOKEN = "master";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,23 @@ public Boolean isPerPartitionFailoverBehaviorEnabled() {
return null;
}

/**
* Returns true if the account supports N region synchronous commit,
* false if enableNRegionSynchronousCommit evaluates to null or false.
* <p>
* If enableNRegionSynchronousCommit property does not exist in account metadata JSON payload, null is returned.
*
* @return true if the account supports N region synchronous commit, false otherwise.
*/
public Boolean isNRegionSynchronousCommitEnabled() {

if (super.has(Constants.Properties.ENABLE_N_REGION_SYNCHRONOUS_COMMIT)) {
return ObjectUtils.defaultIfNull(super.getBoolean(Constants.Properties.ENABLE_N_REGION_SYNCHRONOUS_COMMIT), false);
}

return null;
}

public void setIsPerPartitionFailoverBehaviorEnabled(boolean value) {
this.set(Constants.Properties.ENABLE_PER_PARTITION_FAILOVER_BEHAVIOR, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.ReadConsistencyStrategy;
import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PartitionLevelFailoverInfo;
import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionFailoverInfoHolder;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PerPartitionCircuitBreakerInfoHolder;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.LocationSpecificHealthContext;
import com.azure.cosmos.implementation.directconnectivity.BarrierType;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.StoreResult;
import com.azure.cosmos.implementation.directconnectivity.TimeoutHelper;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PartitionLevelFailoverInfo;
import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionFailoverInfoHolder;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PerPartitionCircuitBreakerInfoHolder;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.LocationSpecificHealthContext;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
import com.azure.cosmos.implementation.throughputControl.ThroughputControlRequestContext;
Expand All @@ -38,7 +39,7 @@ public class DocumentServiceRequestContext implements Cloneable {
public volatile ISessionToken sessionToken;
public volatile long quorumSelectedLSN;
public volatile long globalCommittedSelectedLSN;
public volatile StoreResponse globalStrongWriteResponse;
public volatile StoreResponse cachedWriteResponse;
public volatile ConsistencyLevel originalRequestConsistencyLevel;
public volatile ReadConsistencyStrategy readConsistencyStrategy;
public volatile PartitionKeyRange resolvedPartitionKeyRange;
Expand All @@ -65,6 +66,8 @@ public class DocumentServiceRequestContext implements Cloneable {
private volatile long approximateBloomFilterInsertionCount;
private final Set<String> sessionTokenEvaluationResults = ConcurrentHashMap.newKeySet();
private volatile List<String> unavailableRegionsForPartition;
private volatile boolean nRegionSynchronousCommitEnabled;
private volatile BarrierType barrierType = BarrierType.NONE;

// For cancelled rntbd requests, track the response as OperationCancelledException which later will be used to populate the cosmosDiagnostics
public final Map<String, CosmosException> rntbdCancelledRequestMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -148,7 +151,7 @@ public DocumentServiceRequestContext clone() {
context.sessionToken = this.sessionToken;
context.quorumSelectedLSN = this.quorumSelectedLSN;
context.globalCommittedSelectedLSN = this.globalCommittedSelectedLSN;
context.globalStrongWriteResponse = this.globalStrongWriteResponse;
context.cachedWriteResponse = this.cachedWriteResponse;
context.originalRequestConsistencyLevel = this.originalRequestConsistencyLevel;
context.readConsistencyStrategy = this.readConsistencyStrategy;
context.resolvedPartitionKeyRange = this.resolvedPartitionKeyRange;
Expand Down Expand Up @@ -266,5 +269,21 @@ public void setPerPartitionAutomaticFailoverInfoHolder(PartitionLevelFailoverInf
this.perPartitionFailoverInfoHolder.setPartitionLevelFailoverInfo(partitionLevelFailoverInfo);
}
}

public boolean getNRegionSynchronousCommitEnabled() {
return nRegionSynchronousCommitEnabled;
}

public void setNRegionSynchronousCommitEnabled(Boolean nRegionSynchronousCommitEnabled) {
this.nRegionSynchronousCommitEnabled = nRegionSynchronousCommitEnabled;
}

public BarrierType getBarrierType() {
return barrierType;
}

public void setBarrierType(BarrierType barrierType) {
this.barrierType = barrierType;
}
}

Loading