Skip to content

Commit 734a306

Browse files
keelerm84claude
andcommitted
refactor: Migrate feature requester to HttpTransport
Convert feature requester and polling data source to use the generic HttpTransport trait instead of hyper Client directly. - Replace HyperFeatureRequester<C> with HttpFeatureRequester<T> - Replace HyperFeatureRequesterBuilder with HttpFeatureRequesterBuilder - Update PollingDataSourceBuilder to accept transport - Stream and collect response body for JSON parsing - Remove hyper-specific imports and trait bounds - Update test helpers to use HyperTransport BREAKING CHANGE: PollingDataSourceBuilder generic parameter changed from connector (C) to transport (T: HttpTransport). Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent e984692 commit 734a306

File tree

5 files changed

+68
-140
lines changed

5 files changed

+68
-140
lines changed

contract-tests/src/client_entity.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,9 @@ impl ClientEntity {
9696
if let Some(delay) = polling.poll_interval_ms {
9797
polling_builder.poll_interval(Duration::from_millis(delay));
9898
}
99-
polling_builder.https_connector(connector.clone());
99+
let transport =
100+
launchdarkly_server_sdk::HyperTransport::new_with_connector(connector.clone());
101+
polling_builder.transport(transport);
100102

101103
config_builder = config_builder.data_source(&polling_builder);
102104
} else {

launchdarkly-server-sdk/src/data_source.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -366,14 +366,13 @@ mod tests {
366366
time::Duration,
367367
};
368368

369-
use hyper_util::client::legacy::connect::HttpConnector;
370369
use mockito::Matcher;
371370
use parking_lot::RwLock;
372371
use test_case::test_case;
373372
use tokio::sync::broadcast;
374373

375374
use super::{DataSource, PollingDataSource, StreamingDataSource};
376-
use crate::feature_requester_builders::HyperFeatureRequesterBuilder;
375+
use crate::feature_requester_builders::HttpFeatureRequesterBuilder;
377376
use crate::{stores::store::InMemoryDataStore, LAUNCHDARKLY_TAGS_HEADER};
378377
use eventsource_client as es;
379378

@@ -453,8 +452,8 @@ mod tests {
453452
let (shutdown_tx, _) = broadcast::channel::<()>(1);
454453
let initialized = Arc::new(AtomicBool::new(false));
455454

456-
let hyper_builder =
457-
HyperFeatureRequesterBuilder::new(&server.url(), "sdk-key", HttpConnector::new());
455+
let transport = crate::HyperTransport::new();
456+
let hyper_builder = HttpFeatureRequesterBuilder::new(&server.url(), "sdk-key", transport);
458457

459458
let polling = PollingDataSource::new(
460459
Arc::new(Mutex::new(Box::new(hyper_builder))),

launchdarkly-server-sdk/src/data_source_builders.rs

Lines changed: 24 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
use super::service_endpoints;
22
use crate::data_source::{DataSource, NullDataSource, PollingDataSource, StreamingDataSource};
3-
use crate::feature_requester_builders::{FeatureRequesterFactory, HyperFeatureRequesterBuilder};
3+
use crate::feature_requester_builders::{FeatureRequesterFactory, HttpFeatureRequesterBuilder};
4+
use crate::transport::HttpTransport;
45
use eventsource_client as es;
5-
use http::Uri;
6-
#[cfg(feature = "hyper-rustls")]
7-
use hyper_rustls::HttpsConnectorBuilder;
86
use std::sync::{Arc, Mutex};
97
use std::time::Duration;
108
use thiserror::Error;
@@ -173,19 +171,17 @@ impl Default for NullDataSourceBuilder {
173171
///
174172
/// Adjust the initial reconnect delay.
175173
/// ```
176-
/// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder};
177-
/// # use hyper_rustls::HttpsConnector;
178-
/// # use hyper_util::client::legacy::connect::HttpConnector;
174+
/// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder, HyperTransport};
179175
/// # use std::time::Duration;
180176
/// # fn main() {
181-
/// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::<HttpsConnector<HttpConnector>>::new()
177+
/// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::<HyperTransport>::new()
182178
/// .poll_interval(Duration::from_secs(60)));
183179
/// # }
184180
/// ```
185181
#[derive(Clone)]
186-
pub struct PollingDataSourceBuilder<C> {
182+
pub struct PollingDataSourceBuilder<T: HttpTransport = crate::HyperTransport> {
187183
poll_interval: Duration,
188-
connector: Option<C>,
184+
transport: Option<T>,
189185
}
190186

191187
/// Contains methods for configuring the polling data source.
@@ -203,21 +199,19 @@ pub struct PollingDataSourceBuilder<C> {
203199
///
204200
/// Adjust the poll interval.
205201
/// ```
206-
/// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder};
202+
/// # use launchdarkly_server_sdk::{PollingDataSourceBuilder, ConfigBuilder, HyperTransport};
207203
/// # use std::time::Duration;
208-
/// # use hyper_rustls::HttpsConnector;
209-
/// # use hyper_util::client::legacy::connect::HttpConnector;
210204
/// # fn main() {
211-
/// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::<HttpsConnector<HttpConnector>>::new()
205+
/// ConfigBuilder::new("sdk-key").data_source(PollingDataSourceBuilder::<HyperTransport>::new()
212206
/// .poll_interval(Duration::from_secs(60)));
213207
/// # }
214208
/// ```
215-
impl<C> PollingDataSourceBuilder<C> {
209+
impl<T: HttpTransport> PollingDataSourceBuilder<T> {
216210
/// Create a new instance of the [PollingDataSourceBuilder] with default values.
217211
pub fn new() -> Self {
218212
Self {
219213
poll_interval: MINIMUM_POLL_INTERVAL,
220-
connector: None,
214+
transport: None,
221215
}
222216
}
223217

@@ -230,62 +224,43 @@ impl<C> PollingDataSourceBuilder<C> {
230224
self
231225
}
232226

233-
/// Sets the connector for the polling client to use. This allows for re-use of a connector
227+
/// Sets the transport for the polling client to use. This allows for re-use of a transport
234228
/// between multiple client instances. This is especially useful for the `sdk-test-harness`
235229
/// where many client instances are created throughout the test and reading the native
236230
/// certificates is a substantial portion of the runtime.
237-
pub fn https_connector(&mut self, connector: C) -> &mut Self {
238-
self.connector = Some(connector);
231+
pub fn transport(&mut self, transport: T) -> &mut Self {
232+
self.transport = Some(transport);
239233
self
240234
}
241235
}
242236

243-
impl<C> DataSourceFactory for PollingDataSourceBuilder<C>
244-
where
245-
C: tower::Service<Uri> + Clone + Send + Sync + 'static,
246-
C::Response: hyper_util::client::legacy::connect::Connection
247-
+ hyper::rt::Read
248-
+ hyper::rt::Write
249-
+ Send
250-
+ Unpin,
251-
C::Future: Send + Unpin + 'static,
252-
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
253-
{
237+
impl<T: HttpTransport> DataSourceFactory for PollingDataSourceBuilder<T> {
254238
fn build(
255239
&self,
256240
endpoints: &service_endpoints::ServiceEndpoints,
257241
sdk_key: &str,
258242
tags: Option<String>,
259243
) -> Result<Arc<dyn DataSource>, BuildError> {
260244
let feature_requester_builder: Result<Box<dyn FeatureRequesterFactory>, BuildError> =
261-
match &self.connector {
245+
match &self.transport {
262246
#[cfg(feature = "hyper-rustls")]
263247
None => {
264-
let connector = HttpsConnectorBuilder::new()
265-
.with_native_roots()
266-
.unwrap_or_else(|_| {
267-
log::debug!("Falling back to webpki roots for polling HTTPS connector");
268-
HttpsConnectorBuilder::new().with_webpki_roots()
269-
})
270-
.https_or_http()
271-
.enable_http1()
272-
.enable_http2()
273-
.build();
274-
275-
Ok(Box::new(HyperFeatureRequesterBuilder::new(
248+
let transport = crate::HyperTransport::new_https();
249+
250+
Ok(Box::new(HttpFeatureRequesterBuilder::new(
276251
endpoints.polling_base_url(),
277252
sdk_key,
278-
connector,
253+
transport,
279254
)))
280255
}
281256
#[cfg(not(feature = "hyper-rustls"))]
282257
None => Err(BuildError::InvalidConfig(
283-
"https connector required when rustls is disabled".into(),
258+
"transport is required when hyper-rustls feature is disabled".into(),
284259
)),
285-
Some(connector) => Ok(Box::new(HyperFeatureRequesterBuilder::new(
260+
Some(transport) => Ok(Box::new(HttpFeatureRequesterBuilder::new(
286261
endpoints.polling_base_url(),
287262
sdk_key,
288-
connector.clone(),
263+
transport.clone(),
289264
))),
290265
};
291266

@@ -302,7 +277,7 @@ where
302277
}
303278
}
304279

305-
impl<C> Default for PollingDataSourceBuilder<C> {
280+
impl<T: HttpTransport> Default for PollingDataSourceBuilder<T> {
306281
fn default() -> Self {
307282
PollingDataSourceBuilder::new()
308283
}
@@ -349,7 +324,6 @@ impl DataSourceFactory for MockDataSourceBuilder {
349324
#[cfg(test)]
350325
mod tests {
351326
use eventsource_client::{HyperTransport, ResponseFuture};
352-
use hyper_util::client::legacy::connect::HttpConnector;
353327

354328
use super::*;
355329

@@ -392,7 +366,7 @@ mod tests {
392366

393367
#[test]
394368
fn default_polling_builder_has_correct_defaults() {
395-
let builder = PollingDataSourceBuilder::<HttpConnector>::new();
369+
let builder = PollingDataSourceBuilder::<crate::HyperTransport>::new();
396370
assert_eq!(builder.poll_interval, MINIMUM_POLL_INTERVAL,);
397371
}
398372

launchdarkly-server-sdk/src/feature_requester.rs

Lines changed: 25 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use crate::reqwest::is_http_error_recoverable;
2+
use crate::transport::HttpTransport;
23
use bytes::Bytes;
34
use futures::future::BoxFuture;
4-
use http_body_util::{BodyExt, Empty};
5-
use hyper_util::client::legacy::Client as HyperClient;
5+
use futures::stream::StreamExt;
66
use std::collections::HashMap;
7-
use std::sync::Arc;
87

98
use super::stores::store_types::AllData;
109
use launchdarkly_server_sdk_evaluation::{Flag, Segment};
@@ -22,29 +21,23 @@ pub trait FeatureRequester: Send {
2221
fn get_all(&mut self) -> BoxFuture<Result<AllData<Flag, Segment>, FeatureRequesterError>>;
2322
}
2423

25-
type BoxedBody =
26-
http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
27-
28-
pub struct HyperFeatureRequester<C> {
29-
http: Arc<HyperClient<C, BoxedBody>>,
24+
pub struct HttpFeatureRequester<T: HttpTransport> {
25+
transport: T,
3026
url: http::Uri,
3127
sdk_key: String,
3228
cache: Option<CachedEntry>,
3329
default_headers: HashMap<&'static str, String>,
3430
}
3531

36-
impl<C> HyperFeatureRequester<C> {
32+
impl<T: HttpTransport> HttpFeatureRequester<T> {
3733
pub fn new(
38-
http: HyperClient<
39-
C,
40-
http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>,
41-
>,
34+
transport: T,
4235
url: http::Uri,
4336
sdk_key: String,
4437
default_headers: HashMap<&'static str, String>,
4538
) -> Self {
4639
Self {
47-
http: Arc::new(http),
40+
transport,
4841
url,
4942
sdk_key,
5043
cache: None,
@@ -53,19 +46,16 @@ impl<C> HyperFeatureRequester<C> {
5346
}
5447
}
5548

56-
impl<C> FeatureRequester for HyperFeatureRequester<C>
57-
where
58-
C: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static,
59-
{
49+
impl<T: HttpTransport> FeatureRequester for HttpFeatureRequester<T> {
6050
fn get_all(&mut self) -> BoxFuture<Result<AllData<Flag, Segment>, FeatureRequesterError>> {
6151
Box::pin(async {
6252
let uri = self.url.clone();
6353
let key = self.sdk_key.clone();
6454

65-
let http = self.http.clone();
55+
let transport = self.transport.clone();
6656
let cache = self.cache.clone();
6757

68-
let mut request_builder = hyper::http::Request::builder()
58+
let mut request_builder = http::Request::builder()
6959
.uri(uri)
7060
.method("GET")
7161
.header("Content-Type", "application/json")
@@ -82,16 +72,9 @@ where
8272
}
8373

8474
// Create empty body for GET request
85-
let empty_body: http_body_util::combinators::BoxBody<
86-
Bytes,
87-
Box<dyn std::error::Error + Send + Sync>,
88-
> = Empty::<Bytes>::new()
89-
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
90-
.boxed();
75+
let request = request_builder.body(Bytes::new()).unwrap();
9176

92-
let result = http
93-
.request(request_builder.body(empty_body).unwrap())
94-
.await;
77+
let result = transport.request(request).await;
9578

9679
let response = match result {
9780
Ok(response) => response,
@@ -117,16 +100,17 @@ where
117100
.map_or_else(|_| "".into(), |s| s.into());
118101

119102
if response.status().is_success() {
120-
let body_bytes = response
121-
.into_body()
122-
.collect()
123-
.await
124-
.map_err(|e| {
103+
// Collect streaming body
104+
let mut body_bytes = Vec::new();
105+
let mut stream = response.into_body();
106+
while let Some(chunk) = stream.next().await {
107+
let chunk = chunk.map_err(|e| {
125108
error!("An error occurred while reading the polling response body: {e}");
126109
FeatureRequesterError::Temporary
127-
})?
128-
.to_bytes();
129-
let json = serde_json::from_slice::<AllData<Flag, Segment>>(body_bytes.as_ref());
110+
})?;
111+
body_bytes.extend_from_slice(&chunk);
112+
}
113+
let json = serde_json::from_slice::<AllData<Flag, Segment>>(&body_bytes);
130114

131115
return match json {
132116
Ok(all_data) => {
@@ -262,16 +246,12 @@ mod tests {
262246
}
263247
}
264248

265-
fn build_feature_requester(
266-
url: String,
267-
) -> HyperFeatureRequester<hyper_util::client::legacy::connect::HttpConnector> {
268-
use hyper_util::rt::TokioExecutor;
269-
let connector = hyper_util::client::legacy::connect::HttpConnector::new();
270-
let http = HyperClient::builder(TokioExecutor::new()).build(connector);
249+
fn build_feature_requester(url: String) -> HttpFeatureRequester<crate::HyperTransport> {
271250
let url = http::Uri::from_str(&url).expect("Failed parsing the mock server url");
251+
let transport = crate::HyperTransport::new();
272252

273-
HyperFeatureRequester::new(
274-
http,
253+
HttpFeatureRequester::new(
254+
transport,
275255
url,
276256
"sdk-key".to_string(),
277257
HashMap::<&str, String>::new(),

0 commit comments

Comments
 (0)