2323import com .google .api .gax .rpc .TransportChannel ;
2424import com .google .api .gax .rpc .TransportChannelProvider ;
2525import com .google .auth .Credentials ;
26+ import com .google .cloud .bigtable .data .v2 .stub .EnhancedBigtableStubSettings ;
2627import com .google .cloud .bigtable .data .v2 .stub .metrics .ChannelPoolMetricsTracer ;
28+ import com .google .cloud .bigtable .gaxx .grpc .fallback .GcpFallbackChannel ;
29+ import com .google .cloud .bigtable .gaxx .grpc .fallback .GcpFallbackChannelOptions ;
2730import com .google .common .base .Preconditions ;
31+
32+ import io .grpc .Channel ;
2833import io .grpc .ManagedChannel ;
34+ import io .grpc .StatusRuntimeException ;
35+
36+ import static io .grpc .Status .Code .DEADLINE_EXCEEDED ;
37+ import static io .grpc .Status .Code .UNAUTHENTICATED ;
38+ import static io .grpc .Status .Code .UNAVAILABLE ;
39+ import static io .grpc .Status .Code .UNIMPLEMENTED ;
40+ import static io .grpc .Status .Code .UNKNOWN ;
41+
2942import java .io .IOException ;
43+ import java .time .Duration ;
3044import java .util .Map ;
45+ import java .util .Set ;
3146import java .util .concurrent .Executor ;
3247import java .util .concurrent .ScheduledExecutorService ;
48+ import java .util .function .Function ;
49+
3350import javax .annotation .Nullable ;
3451
3552/**
@@ -42,14 +59,17 @@ public final class BigtableTransportChannelProvider implements TransportChannelP
4259 private final InstantiatingGrpcChannelProvider delegate ;
4360 private final ChannelPrimer channelPrimer ;
4461 @ Nullable private final ChannelPoolMetricsTracer channelPoolMetricsTracer ;
62+ @ Nullable private final EnhancedBigtableStubSettings settings ;
4563
4664 private BigtableTransportChannelProvider (
4765 InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider ,
4866 ChannelPrimer channelPrimer ,
49- ChannelPoolMetricsTracer channelPoolMetricsTracer ) {
67+ ChannelPoolMetricsTracer channelPoolMetricsTracer ,
68+ EnhancedBigtableStubSettings settings ) {
5069 delegate = Preconditions .checkNotNull (instantiatingGrpcChannelProvider );
5170 this .channelPrimer = channelPrimer ;
5271 this .channelPoolMetricsTracer = channelPoolMetricsTracer ;
72+ this .settings = settings ;
5373 }
5474
5575 @ Override
@@ -72,7 +92,7 @@ public BigtableTransportChannelProvider withExecutor(Executor executor) {
7292 InstantiatingGrpcChannelProvider newChannelProvider =
7393 (InstantiatingGrpcChannelProvider ) delegate .withExecutor (executor );
7494 return new BigtableTransportChannelProvider (
75- newChannelProvider , channelPrimer , channelPoolMetricsTracer );
95+ newChannelProvider , channelPrimer , channelPoolMetricsTracer , settings );
7696 }
7797
7898 @ Override
@@ -85,7 +105,7 @@ public BigtableTransportChannelProvider withHeaders(Map<String, String> headers)
85105 InstantiatingGrpcChannelProvider newChannelProvider =
86106 (InstantiatingGrpcChannelProvider ) delegate .withHeaders (headers );
87107 return new BigtableTransportChannelProvider (
88- newChannelProvider , channelPrimer , channelPoolMetricsTracer );
108+ newChannelProvider , channelPrimer , channelPoolMetricsTracer , settings );
89109 }
90110
91111 @ Override
@@ -98,7 +118,7 @@ public TransportChannelProvider withEndpoint(String endpoint) {
98118 InstantiatingGrpcChannelProvider newChannelProvider =
99119 (InstantiatingGrpcChannelProvider ) delegate .withEndpoint (endpoint );
100120 return new BigtableTransportChannelProvider (
101- newChannelProvider , channelPrimer , channelPoolMetricsTracer );
121+ newChannelProvider , channelPrimer , channelPoolMetricsTracer , settings );
102122 }
103123
104124 @ Deprecated
@@ -113,7 +133,7 @@ public TransportChannelProvider withPoolSize(int size) {
113133 InstantiatingGrpcChannelProvider newChannelProvider =
114134 (InstantiatingGrpcChannelProvider ) delegate .withPoolSize (size );
115135 return new BigtableTransportChannelProvider (
116- newChannelProvider , channelPrimer , channelPoolMetricsTracer );
136+ newChannelProvider , channelPrimer , channelPoolMetricsTracer , settings );
117137 }
118138
119139 /** Expected to only be called once when BigtableClientContext is created */
@@ -144,14 +164,70 @@ public TransportChannel getTransportChannel() throws IOException {
144164
145165 BigtableChannelPool btChannelPool =
146166 BigtableChannelPool .create (btPoolSettings , channelFactory , channelPrimer );
167+
168+ ManagedChannel resultingChannel = btChannelPool ;
169+
170+ // TODO: Also check if directpath is possible.
171+ if (settings != null && settings .isFallbackEnabled () && settings .isDirectpathEnabled ()) {
172+ InstantiatingGrpcChannelProvider cloudpathChannelProvider =
173+ delegate .toBuilder ()
174+ .setAttemptDirectPath (false )
175+ .setChannelPoolSettings (ChannelPoolSettings .staticallySized (1 ))
176+ .build ();
177+
178+ ChannelFactory cloudpathFactory =
179+ () -> {
180+ try {
181+ GrpcTransportChannel channel =
182+ (GrpcTransportChannel ) cloudpathChannelProvider .getTransportChannel ();
183+ return (ManagedChannel ) channel .getChannel ();
184+ } catch (IOException e ) {
185+ throw new java .io .UncheckedIOException (e );
186+ }
187+ };
188+
189+ BigtableChannelPool btCloupathPool =
190+ BigtableChannelPool .create (btPoolSettings , cloudpathFactory , channelPrimer );
191+
192+ Function <Channel , String > probingFn =
193+ (channel ) -> {
194+ try {
195+ channelPrimer .sendPrimeRequestsAsync ((ManagedChannel ) channel ).get ();
196+ } catch (StatusRuntimeException e ) {
197+ return e .getStatus ().getCode ().toString ();
198+ } catch (Exception e ) {
199+ return "EXCEPTION" ;
200+ }
201+ return "" ;
202+ };
203+
204+ // Default options for now, but with probing.
205+ // TODO: enable oTel metrics if needed.
206+ GcpFallbackChannelOptions fallbackOptions = GcpFallbackChannelOptions .newBuilder ()
207+ .setPrimaryChannelName ("DIRECTPATH" )
208+ .setFallbackChannelName ("CLOUDPATH" )
209+ .setEnableFallback (true )
210+ .setPeriod (Duration .ofMinutes (1 ))
211+ .setErroneousStates (Set .of (UNAVAILABLE , UNAUTHENTICATED , DEADLINE_EXCEEDED , UNKNOWN , UNIMPLEMENTED ))
212+ .setFallbackProbingInterval (Duration .ofMinutes (15 ))
213+ .setPrimaryProbingInterval (Duration .ofMinutes (1 ))
214+ .setMinFailedCalls (3 )
215+ .setErrorRateThreshold (1f )
216+ .setFallbackProbingFunction (probingFn )
217+ .setPrimaryProbingFunction (probingFn )
218+ .build ();
219+
220+ resultingChannel = new GcpFallbackChannel (fallbackOptions , btChannelPool , btCloupathPool );
221+ }
147222
148223 if (channelPoolMetricsTracer != null ) {
149- channelPoolMetricsTracer .registerChannelInsightsProvider (btChannelPool ::getChannelInfos );
224+ // resultingChannel is either BigtableChannelPool or GcpFallbackChannel here and both implement BigtableChannelPoolObserver.
225+ channelPoolMetricsTracer .registerChannelInsightsProvider (((BigtableChannelPoolObserver ) resultingChannel )::getChannelInfos );
150226 channelPoolMetricsTracer .registerLoadBalancingStrategy (
151227 btPoolSettings .getLoadBalancingStrategy ().name ());
152228 }
153229
154- return GrpcTransportChannel .create (btChannelPool );
230+ return GrpcTransportChannel .create (resultingChannel );
155231 }
156232
157233 @ Override
@@ -169,7 +245,7 @@ public TransportChannelProvider withCredentials(Credentials credentials) {
169245 InstantiatingGrpcChannelProvider newChannelProvider =
170246 (InstantiatingGrpcChannelProvider ) delegate .withCredentials (credentials );
171247 return new BigtableTransportChannelProvider (
172- newChannelProvider , channelPrimer , channelPoolMetricsTracer );
248+ newChannelProvider , channelPrimer , channelPoolMetricsTracer , settings );
173249 }
174250
175251 /** Creates a BigtableTransportChannelProvider. */
@@ -178,6 +254,15 @@ public static BigtableTransportChannelProvider create(
178254 ChannelPrimer channelPrimer ,
179255 ChannelPoolMetricsTracer outstandingRpcsMetricTracke ) {
180256 return new BigtableTransportChannelProvider (
181- instantiatingGrpcChannelProvider , channelPrimer , outstandingRpcsMetricTracke );
257+ instantiatingGrpcChannelProvider , channelPrimer , outstandingRpcsMetricTracke , null );
258+ }
259+
260+ public static BigtableTransportChannelProvider create (
261+ InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider ,
262+ ChannelPrimer channelPrimer ,
263+ ChannelPoolMetricsTracer outstandingRpcsMetricTracer ,
264+ EnhancedBigtableStubSettings settings ) {
265+ return new BigtableTransportChannelProvider (
266+ instantiatingGrpcChannelProvider , channelPrimer , outstandingRpcsMetricTracer , settings );
182267 }
183268}
0 commit comments