Skip to content

Commit 75bc04d

Browse files
fix: Retry multiplexed session failures
1 parent 3d585cf commit 75bc04d

File tree

2 files changed

+193
-23
lines changed

2 files changed

+193
-23
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 56 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.concurrent.atomic.AtomicInteger;
5454
import java.util.concurrent.atomic.AtomicLong;
5555
import java.util.concurrent.atomic.AtomicReference;
56+
import java.util.concurrent.locks.ReentrantLock;
5657

5758
/**
5859
* {@link TransactionRunner} that automatically handles "UNIMPLEMENTED" errors with the message
@@ -315,6 +316,18 @@ public void close() {
315316
*/
316317
private final AtomicBoolean unimplemented = new AtomicBoolean(false);
317318

319+
/**
320+
* This flag is set to true if create session RPC is in progress. This flag prevents application
321+
* from firing two requests concurrently
322+
*/
323+
private final AtomicBoolean retryingSessionCreation = new AtomicBoolean(true);
324+
325+
/**
326+
* This lock is used to prevent two threads from retrying createSession RPC requests in
327+
* concurrently.
328+
*/
329+
private final ReentrantLock sessionCreationLock = new ReentrantLock();
330+
318331
/**
319332
* This flag is set to true if the server return UNIMPLEMENTED when a read-write transaction is
320333
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
@@ -358,11 +371,20 @@ public void close() {
358371
SettableApiFuture.create();
359372
this.readWriteBeginTransactionReferenceFuture = SettableApiFuture.create();
360373
this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture);
374+
asyncCreateMultiplexedSession(initialSessionReferenceFuture);
375+
maybeWaitForSessionCreation(
376+
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
377+
initialSessionReferenceFuture);
378+
}
379+
380+
private void asyncCreateMultiplexedSession(
381+
SettableApiFuture<SessionReference> sessionReferenceFuture) {
361382
this.sessionClient.asyncCreateMultiplexedSession(
362383
new SessionConsumer() {
363384
@Override
364385
public void onSessionReady(SessionImpl session) {
365-
initialSessionReferenceFuture.set(session.getSessionReference());
386+
retryingSessionCreation.set(false);
387+
sessionReferenceFuture.set(session.getSessionReference());
366388
// only start the maintainer if we actually managed to create a session in the first
367389
// place.
368390
maintainer.start();
@@ -394,13 +416,11 @@ public void onSessionReady(SessionImpl session) {
394416
public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {
395417
// Mark multiplexes sessions as unimplemented and fall back to regular sessions if
396418
// UNIMPLEMENTED is returned.
419+
retryingSessionCreation.set(false);
397420
maybeMarkUnimplemented(t);
398-
initialSessionReferenceFuture.setException(t);
421+
sessionReferenceFuture.setException(t);
399422
}
400423
});
401-
maybeWaitForSessionCreation(
402-
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
403-
initialSessionReferenceFuture);
404424
}
405425

406426
void setPool(SessionPool pool) {
@@ -546,8 +566,27 @@ MultiplexedSessionMaintainer getMaintainer() {
546566
return this.maintainer;
547567
}
548568

569+
ApiFuture<SessionReference> getCurrentSessionReferenceFuture() {
570+
return ApiFutures.immediateFuture(getCurrentSessionReference());
571+
}
572+
549573
@VisibleForTesting
550574
SessionReference getCurrentSessionReference() {
575+
try {
576+
return this.multiplexedSessionReference.get().get();
577+
} catch (ExecutionException | InterruptedException exception) {
578+
return maybeRetrySessionCreation();
579+
}
580+
}
581+
582+
private SessionReference maybeRetrySessionCreation() {
583+
sessionCreationLock.lock();
584+
if (isMultiplexedSessionsSupported() && retryingSessionCreation.compareAndSet(false, true)) {
585+
SettableApiFuture<SessionReference> settableApiFuture = SettableApiFuture.create();
586+
asyncCreateMultiplexedSession(settableApiFuture);
587+
multiplexedSessionReference.set(settableApiFuture);
588+
}
589+
sessionCreationLock.unlock();
551590
try {
552591
return this.multiplexedSessionReference.get().get();
553592
} catch (ExecutionException executionException) {
@@ -587,28 +626,22 @@ private DatabaseClient createMultiplexedSessionTransaction(boolean singleUse) {
587626

588627
private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(
589628
boolean singleUse) {
590-
try {
591-
return new MultiplexedSessionTransaction(
592-
this,
593-
tracer.getCurrentSpan(),
594-
// Getting the result of the SettableApiFuture that contains the multiplexed session will
595-
// also automatically propagate any error that happened during the creation of the
596-
// session, such as for example a DatabaseNotFound exception. We therefore do not need
597-
// any special handling of such errors.
598-
multiplexedSessionReference.get().get(),
599-
singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT,
600-
singleUse,
601-
this.pool);
602-
} catch (ExecutionException executionException) {
603-
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
604-
} catch (InterruptedException interruptedException) {
605-
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
606-
}
629+
return new MultiplexedSessionTransaction(
630+
this,
631+
tracer.getCurrentSpan(),
632+
// Getting the result of the SettableApiFuture that contains the multiplexed session will
633+
// also automatically propagate any error that happened during the creation of the
634+
// session, such as for example a DatabaseNotFound exception. We therefore do not need
635+
// any special handling of such errors.
636+
getCurrentSessionReference(),
637+
singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT,
638+
singleUse,
639+
this.pool);
607640
}
608641

609642
private DelayedMultiplexedSessionTransaction createDelayedMultiplexSessionTransaction() {
610643
return new DelayedMultiplexedSessionTransaction(
611-
this, tracer.getCurrentSpan(), multiplexedSessionReference.get(), this.pool);
644+
this, tracer.getCurrentSpan(), getCurrentSessionReferenceFuture(), this.pool);
612645
}
613646

614647
private int getSingleUseChannelHint() {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
import java.time.Duration;
5555
import java.util.*;
5656
import java.util.concurrent.CountDownLatch;
57+
import java.util.concurrent.ExecutorService;
58+
import java.util.concurrent.Executors;
5759
import java.util.concurrent.TimeUnit;
5860
import java.util.concurrent.atomic.AtomicInteger;
5961
import java.util.concurrent.atomic.AtomicReference;
@@ -245,6 +247,141 @@ public void testUnimplementedErrorOnCreation_fallsBackToRegularSessions() {
245247
assertEquals(0L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
246248
}
247249

250+
@Test
251+
public void testDeadlineExceededErrorWithOneRetry() {
252+
// Setting up two exceptions
253+
mockSpanner.setCreateSessionExecutionTime(
254+
SimulatedExecutionTime.ofExceptions(
255+
Arrays.asList(
256+
Status.DEADLINE_EXCEEDED
257+
.withDescription(
258+
"CallOptions deadline exceeded after 22.986872393s. "
259+
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
260+
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
261+
.asRuntimeException(),
262+
Status.DEADLINE_EXCEEDED
263+
.withDescription(
264+
"CallOptions deadline exceeded after 22.986872393s. "
265+
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
266+
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
267+
.asRuntimeException())));
268+
DatabaseClientImpl client =
269+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
270+
assertNotNull(client.multiplexedSessionDatabaseClient);
271+
272+
// initial fetch call fails with exception
273+
// this call will try to fetch it again which again throws an exception
274+
assertThrows(
275+
SpannerException.class,
276+
() -> {
277+
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
278+
//noinspection StatementWithEmptyBody
279+
while (resultSet.next()) {
280+
// ignore
281+
}
282+
}
283+
});
284+
285+
// When third request comes it should succeed
286+
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
287+
//noinspection StatementWithEmptyBody
288+
while (resultSet.next()) {
289+
// ignore
290+
}
291+
}
292+
293+
// Verify that we received one ExecuteSqlRequest, and that it used a multiplexed session.
294+
assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
295+
List<ExecuteSqlRequest> requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
296+
297+
Session session = mockSpanner.getSession(requests.get(0).getSession());
298+
assertNotNull(session);
299+
assertTrue(session.getMultiplexed());
300+
301+
assertNotNull(client.multiplexedSessionDatabaseClient);
302+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
303+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
304+
}
305+
306+
@Test
307+
public void testDeadlineExceededErrorWithOneRetryWithParallelRequests()
308+
throws InterruptedException {
309+
mockSpanner.setCreateSessionExecutionTime(
310+
SimulatedExecutionTime.ofMinimumAndRandomTimeAndExceptions(
311+
2000, 0,
312+
Arrays.asList(
313+
Status.DEADLINE_EXCEEDED
314+
.withDescription(
315+
"CallOptions deadline exceeded after 22.986872393s. "
316+
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
317+
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
318+
.asRuntimeException(),
319+
Status.DEADLINE_EXCEEDED
320+
.withDescription(
321+
"CallOptions deadline exceeded after 22.986872393s. "
322+
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
323+
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
324+
.asRuntimeException())));
325+
DatabaseClientImpl client =
326+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
327+
assertNotNull(client.multiplexedSessionDatabaseClient);
328+
329+
330+
ExecutorService executor = Executors.newCachedThreadPool();
331+
332+
// First set of request should fail with an error
333+
CountDownLatch failureCountDownLatch = new CountDownLatch(3);
334+
for (int i = 0; i < 3; i++) {
335+
executor.submit(() -> {
336+
try {
337+
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
338+
//noinspection StatementWithEmptyBody
339+
while (resultSet.next()) {
340+
// ignore
341+
}
342+
}
343+
} catch (SpannerException e) {
344+
failureCountDownLatch.countDown();
345+
}
346+
});
347+
}
348+
349+
assertTrue(failureCountDownLatch.await(2, TimeUnit.SECONDS));
350+
assertEquals(0, failureCountDownLatch.getCount());
351+
352+
// Second set of requests should pass
353+
CountDownLatch countDownLatch = new CountDownLatch(3);
354+
for (int i = 0; i < 3; i++) {
355+
executor.submit(() -> {
356+
try {
357+
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
358+
//noinspection StatementWithEmptyBody
359+
while (resultSet.next()) {
360+
// ignore
361+
}
362+
}
363+
} catch (SpannerException e) {
364+
countDownLatch.countDown();
365+
}
366+
});
367+
}
368+
369+
assertFalse(countDownLatch.await(3, TimeUnit.SECONDS));
370+
assertEquals(3, countDownLatch.getCount());
371+
372+
// Verify that we received 3 ExecuteSqlRequest, and that it used a multiplexed session.
373+
assertEquals(3, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
374+
List<ExecuteSqlRequest> requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
375+
376+
Session session = mockSpanner.getSession(requests.get(0).getSession());
377+
assertNotNull(session);
378+
assertTrue(session.getMultiplexed());
379+
380+
assertNotNull(client.multiplexedSessionDatabaseClient);
381+
assertEquals(3L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
382+
assertEquals(3L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
383+
}
384+
248385
@Test
249386
public void
250387
testUnimplementedErrorOnCreation_firstReceivesError_secondFallsBackToRegularSessions() {

0 commit comments

Comments
 (0)