Skip to content

Commit dfbfeac

Browse files
authored
refactor(historical): use Subscription<HistoricalBarUpdate> (closes #431) (#488)
* refactor(historical): replace HistoricalDataStreamingSubscription with generic Subscription Custom struct duplicated cancel-on-drop, AtomicBool dedup, message_bus storage, tokio::spawn in async Drop, and inline message dispatch — all already provided by the generic Subscription<T>. Replaced with a 30-line StreamDecoder impl (mirrors MarketDepths / Bar / Trade pattern) and a 3-line factory body. Net -271 lines. Async API breaks: next() now returns Option<Result<HistoricalBarUpdate, Error>> and there is no separate error() accessor. Sync API unchanged in this PR; see #487 for the follow-up. Drop tests rewired to drive through the public factory rather than the deleted internal constructor, so they exercise the real cancel-on-drop path end-to-end. Closes #431. * comment: collapse keepUpToDate constraint to inline * simplify: shorten Subscription import path; drop narration comments
1 parent 4088c7a commit dfbfeac

6 files changed

Lines changed: 132 additions & 408 deletions

File tree

examples/async/historical_data.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
263263

264264
while let Some(update) = subscription.next().await {
265265
match update {
266-
HistoricalBarUpdate::Historical(data) => {
266+
Ok(HistoricalBarUpdate::Historical(data)) => {
267267
println!("Received {} initial historical bars", data.bars.len());
268268
if let Some(bar) = data.bars.last() {
269269
println!(
@@ -277,7 +277,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
277277
}
278278
println!("Now streaming updates...");
279279
}
280-
HistoricalBarUpdate::Update(bar) => {
280+
Ok(HistoricalBarUpdate::Update(bar)) => {
281281
println!(
282282
"UPDATE: {} - O: ${:.2}, H: ${:.2}, L: ${:.2}, C: ${:.2}, V: {:.0}",
283283
format!("{:02}:{:02}:{:02}", bar.date.hour(), bar.date.minute(), bar.date.second()),
@@ -288,10 +288,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
288288
bar.volume
289289
);
290290
}
291-
HistoricalBarUpdate::End { start, end } => {
291+
Ok(HistoricalBarUpdate::End { start, end }) => {
292292
println!("Stream ended: {} - {}", start, end);
293293
break;
294294
}
295+
Err(e) => {
296+
eprintln!("Stream error: {e}");
297+
break;
298+
}
295299
}
296300
}
297301

src/market_data/historical/async.rs

Lines changed: 4 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ use std::sync::Arc;
44

55
use log::{debug, error, warn};
66
use time::OffsetDateTime;
7-
use time_tz::Tz;
87

98
use crate::client::ClientRequestBuilders;
109
use crate::contracts::Contract;
1110
use crate::messages::IncomingMessages;
1211
use crate::protocol::{check_version, Features};
12+
use crate::subscriptions::r#async::Subscription;
1313
use crate::transport::{AsyncInternalSubscription, AsyncMessageBus};
1414
use crate::{Client, Error, MAX_RETRIES};
1515

@@ -250,17 +250,16 @@ impl Client {
250250
what_to_show: Option<WhatToShow>,
251251
trading_hours: TradingHours,
252252
keep_up_to_date: bool,
253-
) -> Result<HistoricalDataStreamingSubscription, Error> {
253+
) -> Result<Subscription<HistoricalBarUpdate>, Error> {
254254
if !contract.trading_class.is_empty() || contract.contract_id > 0 {
255255
check_version(self.server_version(), Features::TRADING_CLASS)?;
256256
}
257257

258-
// Note: end_date must be None when keepUpToDate=true (IBKR requirement)
259258
let builder = self.request();
260259
let request = encoders::encode_request_historical_data(
261260
builder.request_id(),
262261
contract,
263-
None, // end_date must be None for keepUpToDate
262+
None, // end_date must be None when keepUpToDate=true (IBKR requirement)
264263
duration,
265264
bar_size,
266265
what_to_show,
@@ -269,21 +268,7 @@ impl Client {
269268
&Vec::<crate::contracts::TagValue>::default(),
270269
)?;
271270

272-
let request_id = builder.request_id();
273-
let subscription = builder.send_raw(request).await?;
274-
275-
let tz: &'static Tz = self.time_zone.unwrap_or_else(|| {
276-
warn!("server timezone unknown. assuming UTC, but that may be incorrect!");
277-
time_tz::timezones::db::UTC
278-
});
279-
280-
Ok(HistoricalDataStreamingSubscription::new(
281-
subscription,
282-
self.server_version(),
283-
tz,
284-
request_id,
285-
self.message_bus.clone(),
286-
))
271+
builder.send::<HistoricalBarUpdate>(request).await
287272
}
288273
}
289274

@@ -407,143 +392,6 @@ impl<T: TickDecoder<T> + Send> Drop for TickSubscription<T> {
407392
}
408393
}
409394

410-
// === Historical Data Streaming with keepUpToDate ===
411-
412-
/// Async subscription for streaming historical data with keepUpToDate=true.
413-
///
414-
/// This subscription first yields the initial historical bars as a `Historical` variant,
415-
/// then continues to yield streaming updates for the current bar as `Update` variants.
416-
pub struct HistoricalDataStreamingSubscription {
417-
messages: AsyncInternalSubscription,
418-
server_version: i32,
419-
time_zone: &'static Tz,
420-
error: Option<Error>,
421-
request_id: i32,
422-
message_bus: Arc<dyn AsyncMessageBus>,
423-
cancelled: AtomicBool,
424-
}
425-
426-
impl HistoricalDataStreamingSubscription {
427-
fn new(
428-
messages: AsyncInternalSubscription,
429-
server_version: i32,
430-
time_zone: &'static Tz,
431-
request_id: i32,
432-
message_bus: Arc<dyn AsyncMessageBus>,
433-
) -> Self {
434-
Self {
435-
messages,
436-
server_version,
437-
time_zone,
438-
error: None,
439-
request_id,
440-
message_bus,
441-
cancelled: AtomicBool::new(false),
442-
}
443-
}
444-
445-
/// Cancel the streaming subscription, sending CancelHistoricalData to the server.
446-
pub async fn cancel(&self) {
447-
if self.cancelled.swap(true, Ordering::Relaxed) {
448-
return;
449-
}
450-
if let Ok(message) = encoders::encode_cancel_historical_data(self.request_id) {
451-
if let Err(e) = self.message_bus.cancel_subscription(self.request_id, message).await {
452-
warn!("error sending cancel historical data: {e}");
453-
}
454-
}
455-
}
456-
457-
/// Get the next update from the streaming subscription.
458-
///
459-
/// Returns:
460-
/// - `Some(HistoricalBarUpdate::Historical(data))` - Initial batch of historical bars (always first)
461-
/// - `Some(HistoricalBarUpdate::Update(bar))` - Streaming bar update
462-
/// - `None` - Subscription ended (connection closed or error)
463-
pub async fn next(&mut self) -> Option<HistoricalBarUpdate> {
464-
loop {
465-
match self.messages.next().await {
466-
Some(Ok(mut message)) => {
467-
match message.message_type() {
468-
IncomingMessages::HistoricalData => {
469-
// Initial historical data batch
470-
match decoders::decode_historical_data(self.server_version, self.time_zone, &mut message) {
471-
Ok(data) => {
472-
return Some(HistoricalBarUpdate::Historical(data));
473-
}
474-
Err(e) => {
475-
self.error = Some(e);
476-
return None;
477-
}
478-
}
479-
}
480-
IncomingMessages::HistoricalDataUpdate => {
481-
// Streaming bar update
482-
match decoders::decode_historical_data_update(self.time_zone, &mut message) {
483-
Ok(bar) => {
484-
return Some(HistoricalBarUpdate::Update(bar));
485-
}
486-
Err(e) => {
487-
self.error = Some(e);
488-
return None;
489-
}
490-
}
491-
}
492-
IncomingMessages::HistoricalDataEnd => {
493-
match decoders::decode_historical_data_end(self.server_version, self.time_zone, &mut message) {
494-
Ok((start, end)) => return Some(HistoricalBarUpdate::End { start, end }),
495-
Err(e) => {
496-
self.error = Some(e);
497-
return None;
498-
}
499-
}
500-
}
501-
IncomingMessages::Error => {
502-
self.error = Some(Error::from(message));
503-
return None;
504-
}
505-
_ => {
506-
// Skip unexpected messages
507-
debug!("unexpected message in streaming subscription: {:?}", message.message_type());
508-
continue;
509-
}
510-
}
511-
}
512-
Some(Err(e)) => {
513-
self.error = Some(e);
514-
return None;
515-
}
516-
None => {
517-
// Channel closed
518-
return None;
519-
}
520-
}
521-
}
522-
}
523-
524-
/// Returns the last error that occurred, if any.
525-
pub fn error(&self) -> Option<&Error> {
526-
self.error.as_ref()
527-
}
528-
}
529-
530-
impl Drop for HistoricalDataStreamingSubscription {
531-
fn drop(&mut self) {
532-
if self.cancelled.swap(true, Ordering::Relaxed) {
533-
return;
534-
}
535-
let request_id = self.request_id;
536-
let message_bus = self.message_bus.clone();
537-
if let Ok(message) = encoders::encode_cancel_historical_data(request_id) {
538-
tokio::spawn(async move {
539-
if let Err(e) = message_bus.cancel_subscription(request_id, message).await {
540-
warn!("error sending cancel historical data in drop: {e}");
541-
}
542-
});
543-
}
544-
}
545-
}
546-
547395
#[cfg(test)]
548396
#[path = "async_tests.rs"]
549397
mod tests;

src/market_data/historical/async_tests.rs

Lines changed: 50 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::*;
2-
use crate::common::test_utils::helpers::assert_proto_msg_id;
2+
use crate::common::test_utils::helpers::{assert_proto_msg_id, count_proto_msgs};
33
use crate::contracts::{Contract, Currency, Exchange, SecurityType, Symbol};
44
use crate::messages::OutgoingMessages;
55
use crate::server_versions;
@@ -579,9 +579,8 @@ async fn test_historical_data_streaming_with_updates() {
579579
.expect("streaming request should succeed");
580580

581581
// First: receive initial historical data
582-
let update1 = subscription.next().await;
583-
assert!(update1.is_some(), "Should receive initial historical data");
584-
match update1.unwrap() {
582+
let update1 = subscription.next().await.expect("Should receive initial historical data");
583+
match update1.expect("decode should succeed") {
585584
HistoricalBarUpdate::Historical(data) => {
586585
assert_eq!(data.bars.len(), 1, "Should have 1 initial bar");
587586
assert_eq!(data.bars[0].open, 185.50, "Wrong open price");
@@ -590,9 +589,8 @@ async fn test_historical_data_streaming_with_updates() {
590589
}
591590

592591
// Second: receive streaming update
593-
let update2 = subscription.next().await;
594-
assert!(update2.is_some(), "Should receive streaming update");
595-
match update2.unwrap() {
592+
let update2 = subscription.next().await.expect("Should receive streaming update");
593+
match update2.expect("decode should succeed") {
596594
HistoricalBarUpdate::Update(bar) => {
597595
assert_eq!(bar.open, 185.80, "Wrong open price in update");
598596
assert_eq!(bar.high, 186.10, "Wrong high price in update");
@@ -635,9 +633,8 @@ async fn test_historical_data_streaming_keep_up_to_date_false() {
635633
.expect("streaming request should succeed");
636634

637635
// Receive initial historical data
638-
let update1 = subscription.next().await;
639-
assert!(update1.is_some(), "Should receive initial historical data");
640-
match update1.unwrap() {
636+
let update1 = subscription.next().await.expect("Should receive initial historical data");
637+
match update1.expect("decode should succeed") {
641638
HistoricalBarUpdate::Historical(data) => {
642639
assert_eq!(data.bars.len(), 1, "Should have 1 initial bar");
643640
}
@@ -677,17 +674,10 @@ async fn test_historical_data_streaming_error_response() {
677674
.await
678675
.expect("streaming request should succeed");
679676

680-
// Should return None due to error
681-
let update = subscription.next().await;
682-
assert!(update.is_none(), "Should return None on error");
683-
684-
// Error should be accessible
685-
let error = subscription.error();
686-
assert!(error.is_some(), "Error should be stored");
687-
assert!(
688-
error.unwrap().to_string().contains("No market data permissions"),
689-
"Error should contain the message"
690-
);
677+
// Should yield Some(Err(_)) — Subscription<T> surfaces errors through next().
678+
let update = subscription.next().await.expect("error should arrive as Some(Err(_))");
679+
let err = update.expect_err("Should yield error result");
680+
assert!(err.to_string().contains("No market data permissions"), "Error should contain the message");
691681
}
692682

693683
#[tokio::test]
@@ -762,27 +752,32 @@ async fn test_streaming_subscription_sends_cancel_on_drop() {
762752
response_messages: vec![],
763753
});
764754

765-
let (_tx, rx) = tokio::sync::broadcast::channel(16);
766-
let internal = AsyncInternalSubscription::new(rx);
767-
let request_id = 9000;
755+
let mut client = Client::stubbed(message_bus.clone(), server_versions::SIZE_RULES);
756+
client.time_zone = Some(time_tz::timezones::db::UTC);
757+
let contract = Contract::stock("SPY").build();
768758

769759
{
770-
let _subscription = HistoricalDataStreamingSubscription::new(
771-
internal,
772-
server_versions::SIZE_RULES,
773-
time_tz::timezones::db::UTC,
774-
request_id,
775-
message_bus.clone(),
776-
);
777-
// subscription dropped here
760+
let _subscription = client
761+
.historical_data_streaming(
762+
&contract,
763+
Duration::days(1),
764+
BarSize::Hour,
765+
Some(WhatToShow::Trades),
766+
TradingHours::Regular,
767+
true,
768+
)
769+
.await
770+
.expect("streaming request should succeed");
778771
}
779772

780-
// Give tokio::spawn time to execute
781773
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
782774

783775
let messages = message_bus.request_messages.read().unwrap();
784-
assert_eq!(messages.len(), 1, "should send cancel message on drop");
785-
assert_proto_msg_id(&messages[0], OutgoingMessages::CancelHistoricalData);
776+
assert_eq!(
777+
count_proto_msgs(&messages, OutgoingMessages::CancelHistoricalData),
778+
1,
779+
"should send exactly one cancel message on drop"
780+
);
786781
}
787782

788783
#[tokio::test]
@@ -792,27 +787,32 @@ async fn test_streaming_subscription_cancel_prevents_duplicate_on_drop() {
792787
response_messages: vec![],
793788
});
794789

795-
let (_tx, rx) = tokio::sync::broadcast::channel(16);
796-
let internal = AsyncInternalSubscription::new(rx);
797-
let request_id = 9001;
790+
let mut client = Client::stubbed(message_bus.clone(), server_versions::SIZE_RULES);
791+
client.time_zone = Some(time_tz::timezones::db::UTC);
792+
let contract = Contract::stock("SPY").build();
798793

799794
{
800-
let subscription = HistoricalDataStreamingSubscription::new(
801-
internal,
802-
server_versions::SIZE_RULES,
803-
time_tz::timezones::db::UTC,
804-
request_id,
805-
message_bus.clone(),
806-
);
807-
808-
// Explicit cancel
809-
subscription.cancel().await;
795+
let subscription = client
796+
.historical_data_streaming(
797+
&contract,
798+
Duration::days(1),
799+
BarSize::Hour,
800+
Some(WhatToShow::Trades),
801+
TradingHours::Regular,
802+
true,
803+
)
804+
.await
805+
.expect("streaming request should succeed");
810806

811-
// Drop should not send a second cancel
807+
subscription.cancel().await;
812808
}
813809

814810
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
815811

816812
let messages = message_bus.request_messages.read().unwrap();
817-
assert_eq!(messages.len(), 1, "should send cancel only once");
813+
assert_eq!(
814+
count_proto_msgs(&messages, OutgoingMessages::CancelHistoricalData),
815+
1,
816+
"should send cancel only once"
817+
);
818818
}

0 commit comments

Comments
 (0)