Skip to content

Commit 5cd4f8a

Browse files
committed
chore: Update contract tests and add custom transport example
1 parent 92630fb commit 5cd4f8a

File tree

3 files changed

+138
-33
lines changed

3 files changed

+138
-33
lines changed

contract-tests/src/client_entity.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use eventsource_client as es;
21
use futures::future::FutureExt;
32
use launchdarkly_server_sdk::{
43
Context, ContextBuilder, MigratorBuilder, MultiContextBuilder, Reference,
@@ -21,7 +20,6 @@ use crate::command_params::{
2120
MigrationOperationResponse, MigrationVariationResponse, SecureModeHashResponse,
2221
};
2322
use crate::HttpsConnector;
24-
use crate::StreamingHttpsConnector;
2523
use crate::{
2624
command_params::{
2725
CommandParams, CommandResponse, EvaluateAllFlagsParams, EvaluateAllFlagsResponse,
@@ -38,8 +36,12 @@ impl ClientEntity {
3836
pub async fn new(
3937
create_instance_params: CreateInstanceParams,
4038
connector: HttpsConnector,
41-
streaming_https_connector: StreamingHttpsConnector,
4239
) -> Result<Self, BuildError> {
40+
// Create fresh transports for this client to avoid shared connection pool issues
41+
let transport =
42+
launchdarkly_server_sdk::HyperTransport::new_with_connector(connector.clone());
43+
let streaming_https_transport =
44+
eventsource_client::HyperTransport::builder().build_with_connector(connector.clone());
4345
let mut config_builder =
4446
ConfigBuilder::new(&create_instance_params.configuration.credential);
4547

@@ -74,8 +76,6 @@ impl ClientEntity {
7476
}
7577

7678
if let Some(streaming) = create_instance_params.configuration.streaming {
77-
let transport =
78-
es::HyperTransport::builder().build_with_connector(streaming_https_connector);
7979
if let Some(base_uri) = streaming.base_uri {
8080
service_endpoints_builder.streaming_base_url(&base_uri);
8181
}
@@ -84,7 +84,7 @@ impl ClientEntity {
8484
if let Some(delay) = streaming.initial_retry_delay_ms {
8585
streaming_builder.initial_reconnect_delay(Duration::from_millis(delay));
8686
}
87-
streaming_builder.transport(transport);
87+
streaming_builder.transport(streaming_https_transport.clone());
8888

8989
config_builder = config_builder.data_source(&streaming_builder);
9090
} else if let Some(polling) = create_instance_params.configuration.polling {
@@ -96,19 +96,15 @@ impl ClientEntity {
9696
if let Some(delay) = polling.poll_interval_ms {
9797
polling_builder.poll_interval(Duration::from_millis(delay));
9898
}
99-
let transport =
100-
launchdarkly_server_sdk::HyperTransport::new_with_connector(connector.clone());
101-
polling_builder.transport(transport);
99+
polling_builder.transport(transport.clone());
102100

103101
config_builder = config_builder.data_source(&polling_builder);
104102
} else {
105103
// If we didn't specify streaming or polling, we fall back to basic streaming. The only
106-
// customization we provide is the https connector to support testing multiple
107-
// connectors.
108-
let transport =
109-
es::HyperTransport::builder().build_with_connector(streaming_https_connector);
104+
// customization we provide is the transport to support testing multiple
105+
// transport implementations.
110106
let mut streaming_builder = StreamingDataSourceBuilder::new();
111-
streaming_builder.transport(transport);
107+
streaming_builder.transport(streaming_https_transport);
112108
config_builder = config_builder.data_source(&streaming_builder);
113109
}
114110

@@ -133,8 +129,6 @@ impl ClientEntity {
133129
if let Some(attributes) = events.global_private_attributes {
134130
processor_builder.private_attributes(attributes);
135131
}
136-
let transport =
137-
launchdarkly_server_sdk::HyperTransport::new_with_connector(connector.clone());
138132
processor_builder.transport(transport);
139133
processor_builder.omit_anonymous_contexts(events.omit_anonymous_contexts);
140134

contract-tests/src/main.rs

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer, Responder, Resu
77
use async_mutex::Mutex;
88
use client_entity::ClientEntity;
99
use futures::executor;
10-
use hyper_util::client::legacy::connect::HttpConnector;
1110
use launchdarkly_server_sdk::Reference;
1211
use serde::{self, Deserialize, Serialize};
1312
use std::collections::{HashMap, HashSet};
@@ -132,7 +131,6 @@ async fn create_client(
132131
let client_entity = match ClientEntity::new(
133132
create_instance_params.into_inner(),
134133
app_state.https_connector.clone(),
135-
app_state.streaming_https_connector.clone(),
136134
)
137135
.await
138136
{
@@ -207,18 +205,15 @@ struct AppState {
207205
counter: Mutex<u32>,
208206
client_entities: Mutex<HashMap<u32, ClientEntity>>,
209207
https_connector: HttpsConnector,
210-
streaming_https_connector: StreamingHttpsConnector,
211208
}
212209

213210
#[cfg(feature = "hyper-rustls")]
214-
type HttpsConnector = hyper_rustls::HttpsConnector<HttpConnector>;
215-
#[cfg(feature = "hyper-rustls")]
216-
type StreamingHttpsConnector = hyper_util::client::legacy::connect::HttpConnector;
211+
type HttpsConnector =
212+
hyper_rustls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>;
217213

218214
#[cfg(feature = "tls")]
219-
type HttpsConnector = hyper_tls::HttpsConnector<HttpConnector>;
220-
#[cfg(feature = "tls")]
221-
type StreamingHttpsConnector = hyper_tls::HttpsConnector<HttpConnector>;
215+
type HttpsConnector =
216+
hyper_tls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>;
222217

223218
#[actix_web::main]
224219
async fn main() -> std::io::Result<()> {
@@ -238,9 +233,7 @@ async fn main() -> std::io::Result<()> {
238233
let (tx, rx) = mpsc::channel::<()>();
239234

240235
#[cfg(feature = "hyper-rustls")]
241-
let streaming_https_connector = hyper_util::client::legacy::connect::HttpConnector::new();
242-
#[cfg(feature = "hyper-rustls")]
243-
let connector = hyper_rustls::HttpsConnectorBuilder::new()
236+
let https_connector = hyper_rustls::HttpsConnectorBuilder::new()
244237
.with_native_roots()
245238
.expect("Failed to load native root certificates")
246239
.https_or_http()
@@ -249,15 +242,12 @@ async fn main() -> std::io::Result<()> {
249242
.build();
250243

251244
#[cfg(feature = "tls")]
252-
let streaming_https_connector = hyper_tls::HttpsConnector::new();
253-
#[cfg(feature = "tls")]
254-
let connector = hyper_tls::HttpsConnector::new();
245+
let https_connector = hyper_tls::HttpsConnector::new();
255246

256247
let state = web::Data::new(AppState {
257248
counter: Mutex::new(0),
258249
client_entities: Mutex::new(HashMap::new()),
259-
https_connector: connector,
260-
streaming_https_connector,
250+
https_connector,
261251
});
262252

263253
let server = HttpServer::new(move || {
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
use bytes::Bytes;
2+
use http::Request;
3+
use launchdarkly_server_sdk::{
4+
ConfigBuilder, EventProcessorBuilder, HttpTransport, ResponseFuture,
5+
};
6+
use std::time::Instant;
7+
8+
/// Example of a custom transport that wraps another transport and adds logging.
9+
///
10+
/// This demonstrates how to implement the HttpTransport trait to add middleware
11+
/// functionality like logging, metrics, retries, circuit breakers, etc.
12+
#[derive(Clone)]
13+
struct LoggingTransport<T: HttpTransport> {
14+
inner: T,
15+
}
16+
17+
impl<T: HttpTransport> LoggingTransport<T> {
18+
fn new(inner: T) -> Self {
19+
Self { inner }
20+
}
21+
}
22+
23+
impl<T: HttpTransport> HttpTransport for LoggingTransport<T> {
24+
fn request(&self, request: Request<Bytes>) -> ResponseFuture {
25+
let method = request.method().clone();
26+
let uri = request.uri().clone();
27+
let start = Instant::now();
28+
29+
println!("[REQUEST] {method} {uri}");
30+
31+
let inner = self.inner.clone();
32+
Box::pin(async move {
33+
let result = inner.request(request).await;
34+
let elapsed = start.elapsed();
35+
36+
match &result {
37+
Ok(response) => {
38+
println!(
39+
"[RESPONSE] {} {} - Status: {} - Duration: {:?}",
40+
method,
41+
uri,
42+
response.status(),
43+
elapsed
44+
);
45+
}
46+
Err(e) => {
47+
println!("[ERROR] {method} {uri} - Error: {e} - Duration: {elapsed:?}");
48+
}
49+
}
50+
51+
result
52+
})
53+
}
54+
}
55+
56+
#[tokio::main]
57+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
58+
// Get SDK key from environment
59+
let sdk_key =
60+
std::env::var("LAUNCHDARKLY_SDK_KEY").unwrap_or_else(|_| "your-sdk-key".to_string());
61+
62+
if sdk_key == "your-sdk-key" {
63+
eprintln!("Please set LAUNCHDARKLY_SDK_KEY environment variable");
64+
std::process::exit(1);
65+
}
66+
67+
// Create the base HTTPS transport
68+
let base_transport = launchdarkly_server_sdk::HyperTransport::new_https();
69+
70+
// Wrap it with logging middleware
71+
let logging_transport = LoggingTransport::new(base_transport);
72+
73+
// Configure the SDK to use the custom transport
74+
let config = ConfigBuilder::new(&sdk_key)
75+
.event_processor(
76+
EventProcessorBuilder::new()
77+
.transport(logging_transport.clone())
78+
.flush_interval(std::time::Duration::from_secs(5)),
79+
)
80+
.build()?;
81+
82+
// Create the client - you'll see all HTTP requests logged
83+
println!("Initializing LaunchDarkly client with logging transport...");
84+
let client = launchdarkly_server_sdk::Client::build(config)?;
85+
client.start_with_default_executor();
86+
87+
// Wait for initialization
88+
println!("Waiting for client initialization...");
89+
match client
90+
.wait_for_initialization(std::time::Duration::from_secs(10))
91+
.await
92+
{
93+
Some(true) => {
94+
println!("Client initialized successfully!");
95+
96+
// Evaluate a flag (will trigger HTTP events)
97+
let context = launchdarkly_server_sdk::ContextBuilder::new("example-user-key")
98+
.build()
99+
.expect("Failed to create context");
100+
101+
let flag_value = client.bool_variation(&context, "example-flag", false);
102+
println!("Flag 'example-flag' evaluated to: {flag_value}");
103+
104+
// Wait a bit to see event flushing
105+
println!("Waiting to observe event flushing...");
106+
tokio::time::sleep(std::time::Duration::from_secs(6)).await;
107+
}
108+
Some(false) => {
109+
eprintln!("Client failed to initialize");
110+
}
111+
None => {
112+
eprintln!("Client initialization timed out");
113+
}
114+
}
115+
116+
// Shutdown the client
117+
println!("Shutting down client...");
118+
client.close();
119+
120+
Ok(())
121+
}

0 commit comments

Comments
 (0)