Skip to content

Commit 0b7a32e

Browse files
authored
feat: add ChannelFinder server interfaces (#4293)
* feat: add ChannelFinder server interfaces This commit adds the server abstraction interfaces for location-aware routing: - ChannelFinderServer: Interface representing a Spanner server endpoint with address, health check, and channel access - ChannelFinderServerFactory: Factory interface for creating and caching server connections - GrpcChannelFinderServerFactory: gRPC implementation that creates and manages gRPC channels for different server endpoints These interfaces enable the client to maintain connections to multiple Spanner servers and route requests directly to the appropriate server based on key location information. This is part of the experimental location-aware routing for improved latency. * incorporate changes * add tests * incorporate changes
1 parent 9cd296e commit 0b7a32e

File tree

4 files changed

+461
-0
lines changed

4 files changed

+461
-0
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner.spi.v1;
18+
19+
import com.google.api.core.InternalApi;
20+
import io.grpc.ManagedChannel;
21+
22+
/**
23+
* Represents a Spanner server endpoint for location-aware routing.
24+
*
25+
* <p>Each instance wraps a gRPC {@link ManagedChannel} connected to a specific Spanner server. The
26+
* {@link ChannelEndpointCache} creates and caches these instances.
27+
*
28+
* <p>Implementations must be thread-safe as instances may be shared across multiple concurrent
29+
* operations.
30+
*
31+
* @see ChannelEndpointCache
32+
*/
33+
@InternalApi
34+
public interface ChannelEndpoint {
35+
36+
/**
37+
* Returns the network address of this server.
38+
*
39+
* @return the server address in "host:port" format
40+
*/
41+
String getAddress();
42+
43+
/**
44+
* Returns whether this server is ready to accept RPCs.
45+
*
46+
* <p>A server is considered unhealthy if:
47+
*
48+
* <ul>
49+
* <li>The underlying channel is shutdown or terminated
50+
* <li>The channel is in a transient failure state
51+
* </ul>
52+
*
53+
* @return true if the server is healthy and ready to accept RPCs
54+
*/
55+
boolean isHealthy();
56+
57+
/**
58+
* Returns the gRPC channel for making RPCs to this server.
59+
*
60+
* <p>The returned channel is managed by the {@link ChannelEndpointCache} and should not be shut
61+
* down directly by callers.
62+
*
63+
* @return the managed channel for this server
64+
*/
65+
ManagedChannel getChannel();
66+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner.spi.v1;
18+
19+
import com.google.api.core.InternalApi;
20+
21+
/**
22+
* Cache for server connections used in location-aware routing.
23+
*
24+
* <p>Implementations are expected to cache {@link ChannelEndpoint} instances such that repeated
25+
* calls with the same address return the same instance. This allows routing components to
26+
* efficiently manage server references.
27+
*
28+
* <p>Implementations must be thread-safe. Multiple threads may concurrently call {@link
29+
* #get(String)} with different addresses.
30+
*/
31+
@InternalApi
32+
public interface ChannelEndpointCache {
33+
34+
/**
35+
* Returns the default channel endpoint.
36+
*
37+
* <p>The default channel is the original endpoint configured in {@link
38+
* com.google.cloud.spanner.SpannerOptions}. It is used as a fallback when the location cache does
39+
* not have routing information for a request.
40+
*
41+
* @return the default channel, never null
42+
*/
43+
ChannelEndpoint defaultChannel();
44+
45+
/**
46+
* Returns a cached server for the given address, creating it if needed.
47+
*
48+
* <p>If a server for this address already exists in the cache, the cached instance is returned.
49+
* Otherwise, a new server connection is created and cached.
50+
*
51+
* @param address the server address in "host:port" format
52+
* @return a server instance for the address, never null
53+
* @throws com.google.cloud.spanner.SpannerException if the channel cannot be created
54+
*/
55+
ChannelEndpoint get(String address);
56+
57+
/**
58+
* Evicts a server from the cache and gracefully shuts down its channel.
59+
*
60+
* <p>This method should be called when a server becomes unhealthy or is no longer needed. The
61+
* channel shutdown is graceful: existing RPCs are allowed to complete, but new RPCs will not be
62+
* accepted on this channel.
63+
*
64+
* <p>If the address is not in the cache, this method does nothing.
65+
*
66+
* @param address the server address to evict
67+
*/
68+
void evict(String address);
69+
70+
/**
71+
* Shuts down all cached server connections.
72+
*
73+
* <p>This method should be called when the Spanner client is closed to release all resources.
74+
* Each channel is shut down gracefully, allowing in-flight RPCs to complete.
75+
*
76+
* <p>After calling this method, the cache should not be used to create new connections.
77+
*/
78+
void shutdown();
79+
}
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner.spi.v1;
18+
19+
import com.google.api.core.InternalApi;
20+
import com.google.api.gax.grpc.GrpcTransportChannel;
21+
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
22+
import com.google.api.gax.rpc.TransportChannelProvider;
23+
import com.google.cloud.spanner.ErrorCode;
24+
import com.google.cloud.spanner.SpannerExceptionFactory;
25+
import com.google.common.annotations.VisibleForTesting;
26+
import io.grpc.ConnectivityState;
27+
import io.grpc.ManagedChannel;
28+
import java.io.IOException;
29+
import java.util.Map;
30+
import java.util.concurrent.ConcurrentHashMap;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.AtomicBoolean;
33+
34+
/**
35+
* gRPC implementation of {@link ChannelEndpointCache}.
36+
*
37+
* <p>This cache creates and caches gRPC channels per address. It uses {@link
38+
* InstantiatingGrpcChannelProvider#withEndpoint(String)} to create new channels with the same
39+
* configuration but different endpoints, avoiding race conditions.
40+
*/
41+
@InternalApi
42+
class GrpcChannelEndpointCache implements ChannelEndpointCache {
43+
44+
/** Timeout for graceful channel shutdown. */
45+
private static final long SHUTDOWN_TIMEOUT_SECONDS = 5;
46+
47+
private final InstantiatingGrpcChannelProvider baseProvider;
48+
private final Map<String, GrpcChannelEndpoint> servers = new ConcurrentHashMap<>();
49+
private final GrpcChannelEndpoint defaultEndpoint;
50+
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
51+
52+
/**
53+
* Creates a new cache with the given channel provider.
54+
*
55+
* @param channelProvider the base provider used to create channels. New channels for different
56+
* endpoints are created using {@link InstantiatingGrpcChannelProvider#withEndpoint(String)}.
57+
* @throws IOException if the default channel cannot be created
58+
*/
59+
public GrpcChannelEndpointCache(InstantiatingGrpcChannelProvider channelProvider)
60+
throws IOException {
61+
this.baseProvider = channelProvider;
62+
String defaultEndpoint = channelProvider.getEndpoint();
63+
this.defaultEndpoint = new GrpcChannelEndpoint(defaultEndpoint, channelProvider);
64+
this.servers.put(defaultEndpoint, this.defaultEndpoint);
65+
}
66+
67+
@Override
68+
public ChannelEndpoint defaultChannel() {
69+
return defaultEndpoint;
70+
}
71+
72+
@Override
73+
public ChannelEndpoint get(String address) {
74+
if (isShutdown.get()) {
75+
throw SpannerExceptionFactory.newSpannerException(
76+
ErrorCode.FAILED_PRECONDITION, "ChannelEndpointCache has been shut down");
77+
}
78+
79+
return servers.computeIfAbsent(
80+
address,
81+
addr -> {
82+
try {
83+
// Create a new provider with the same config but different endpoint.
84+
// This is thread-safe as withEndpoint() returns a new provider instance.
85+
TransportChannelProvider newProvider = baseProvider.withEndpoint(addr);
86+
return new GrpcChannelEndpoint(addr, newProvider);
87+
} catch (IOException e) {
88+
throw SpannerExceptionFactory.newSpannerException(
89+
ErrorCode.INTERNAL, "Failed to create channel for address: " + addr, e);
90+
}
91+
});
92+
}
93+
94+
@Override
95+
public void evict(String address) {
96+
if (defaultEndpoint.getAddress().equals(address)) {
97+
return;
98+
}
99+
GrpcChannelEndpoint server = servers.remove(address);
100+
if (server != null) {
101+
shutdownChannel(server, false);
102+
}
103+
}
104+
105+
@Override
106+
public void shutdown() {
107+
if (!isShutdown.compareAndSet(false, true)) {
108+
return;
109+
}
110+
for (GrpcChannelEndpoint server : servers.values()) {
111+
shutdownChannel(server, true);
112+
}
113+
servers.clear();
114+
}
115+
116+
/**
117+
* Shuts down a server's channel.
118+
*
119+
* <p>First attempts a graceful shutdown. When awaitTermination is true, waits for in-flight RPCs
120+
* to complete and forces shutdown on timeout.
121+
*/
122+
private void shutdownChannel(GrpcChannelEndpoint server, boolean awaitTermination) {
123+
ManagedChannel channel = server.getChannel();
124+
if (channel.isShutdown()) {
125+
return;
126+
}
127+
128+
channel.shutdown();
129+
if (!awaitTermination) {
130+
return;
131+
}
132+
try {
133+
if (!channel.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
134+
channel.shutdownNow();
135+
}
136+
} catch (InterruptedException e) {
137+
channel.shutdownNow();
138+
Thread.currentThread().interrupt();
139+
}
140+
}
141+
142+
/** gRPC implementation of {@link ChannelEndpoint}. */
143+
static class GrpcChannelEndpoint implements ChannelEndpoint {
144+
private final String address;
145+
private final ManagedChannel channel;
146+
147+
/**
148+
* Creates a server from a channel provider.
149+
*
150+
* @param address the server address
151+
* @param provider the channel provider (must be a gRPC provider)
152+
* @throws IOException if the channel cannot be created
153+
*/
154+
GrpcChannelEndpoint(String address, TransportChannelProvider provider) throws IOException {
155+
this.address = address;
156+
TransportChannelProvider readyProvider = provider;
157+
if (provider.needsHeaders()) {
158+
readyProvider = provider.withHeaders(java.util.Collections.emptyMap());
159+
}
160+
GrpcTransportChannel transportChannel =
161+
(GrpcTransportChannel) readyProvider.getTransportChannel();
162+
this.channel = (ManagedChannel) transportChannel.getChannel();
163+
}
164+
165+
/**
166+
* Creates a server with an existing channel. Primarily for testing.
167+
*
168+
* @param address the server address
169+
* @param channel the managed channel
170+
*/
171+
@VisibleForTesting
172+
GrpcChannelEndpoint(String address, ManagedChannel channel) {
173+
this.address = address;
174+
this.channel = channel;
175+
}
176+
177+
@Override
178+
public String getAddress() {
179+
return address;
180+
}
181+
182+
@Override
183+
public boolean isHealthy() {
184+
if (channel.isShutdown() || channel.isTerminated()) {
185+
return false;
186+
}
187+
// Check connectivity state without triggering a connection attempt.
188+
// Some channel implementations don't support getState(), in which case
189+
// we assume the channel is healthy if it's not shutdown/terminated.
190+
try {
191+
ConnectivityState state = channel.getState(false);
192+
return state != ConnectivityState.SHUTDOWN && state != ConnectivityState.TRANSIENT_FAILURE;
193+
} catch (UnsupportedOperationException ignore) {
194+
return true;
195+
}
196+
}
197+
198+
@Override
199+
public ManagedChannel getChannel() {
200+
return channel;
201+
}
202+
}
203+
}

0 commit comments

Comments
 (0)