From ca7c4f309cfab467daecb3db5b5d78a71b2222b8 Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Fri, 10 Apr 2026 21:11:17 -1000 Subject: [PATCH 1/9] add inbound protobuf decoders (Phase 3) Shared converters in proto/decoders.rs for Contract, Order, OrderState, Execution, ContractDetails, and other reusable types. Domain-specific decoders in each module decoders.rs with _proto suffix. Unit tests construct protobuf bytes with prost and verify decoded domain structs. --- src/accounts/common/decoders.rs | 152 ++++- src/contracts/common/decoders/mod.rs | 4 + src/contracts/common/decoders/proto.rs | 59 ++ .../common/{decoders.rs => decoders/text.rs} | 2 +- src/display_groups/common/decoders.rs | 7 + src/market_data/historical/common/decoders.rs | 246 ++++++++ src/market_data/realtime/common/decoders.rs | 240 +++++++- src/news/common/decoders.rs | 70 +++ src/orders/common/decoders.rs | 180 ++++++ src/proto/decoders.rs | 542 ++++++++++++++++++ src/proto/mod.rs | 3 + src/scanner/common/decoders.rs | 25 + src/wsh/common/decoders.rs | 38 ++ 13 files changed, 1564 insertions(+), 4 deletions(-) create mode 100644 src/contracts/common/decoders/mod.rs create mode 100644 src/contracts/common/decoders/proto.rs rename src/contracts/common/{decoders.rs => decoders/text.rs} (99%) create mode 100644 src/proto/decoders.rs diff --git a/src/accounts/common/decoders.rs b/src/accounts/common/decoders.rs index 2382e564..f509c939 100644 --- a/src/accounts/common/decoders.rs +++ b/src/accounts/common/decoders.rs @@ -1,8 +1,10 @@ use time::OffsetDateTime; +use prost::Message; + use crate::contracts::{Contract, Currency, Exchange, SecurityType, Symbol}; use crate::messages::ResponseMessage; -use crate::{server_versions, Error}; +use crate::{proto, server_versions, Error}; use super::super::{ AccountMultiValue, AccountPortfolioValue, AccountSummary, AccountUpdateTime, AccountValue, FamilyCode, PnL, PnLSingle, Position, PositionMulti, @@ -254,6 +256,105 @@ pub(crate) fn decode_account_multi_value(message: &mut ResponseMessage) -> Resul Ok(value) } +// === Protobuf decoders === + +#[allow(dead_code)] +pub(crate) fn decode_position_proto(bytes: &[u8]) -> Result { + let p = proto::Position::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let contract = p.contract.as_ref().map(proto::decoders::decode_contract).unwrap_or_default(); + Ok(Position { + account: p.account.unwrap_or_default(), + contract, + position: p.position.as_deref().and_then(|s| s.parse::().ok()).unwrap_or_default(), + average_cost: p.avg_cost.unwrap_or_default(), + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_account_value_proto(bytes: &[u8]) -> Result { + let p = proto::AccountValue::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + Ok(AccountValue { + key: p.key.unwrap_or_default(), + value: p.value.unwrap_or_default(), + currency: p.currency.unwrap_or_default(), + account: p.account_name, + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_account_portfolio_value_proto(bytes: &[u8]) -> Result { + let p = proto::PortfolioValue::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let contract = p.contract.as_ref().map(proto::decoders::decode_contract).unwrap_or_default(); + Ok(AccountPortfolioValue { + contract, + position: p.position.as_deref().and_then(|s| s.parse::().ok()).unwrap_or_default(), + market_price: p.market_price.unwrap_or_default(), + market_value: p.market_value.unwrap_or_default(), + average_cost: p.average_cost.unwrap_or_default(), + unrealized_pnl: p.unrealized_pnl.unwrap_or_default(), + realized_pnl: p.realized_pnl.unwrap_or_default(), + account: p.account_name, + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_pnl_proto(bytes: &[u8]) -> Result { + let p = proto::PnL::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + Ok(PnL { + daily_pnl: p.daily_pn_l.unwrap_or_default(), + unrealized_pnl: p.unrealized_pn_l.filter(|&v| v != f64::MAX), + realized_pnl: p.realized_pn_l.filter(|&v| v != f64::MAX), + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_pnl_single_proto(bytes: &[u8]) -> Result { + let p = proto::PnLSingle::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + Ok(PnLSingle { + position: p.position.as_deref().and_then(|s| s.parse::().ok()).unwrap_or_default(), + daily_pnl: p.daily_pn_l.unwrap_or_default(), + unrealized_pnl: p.unrealized_pn_l.unwrap_or_default(), + realized_pnl: p.realized_pn_l.unwrap_or_default(), + value: p.value.unwrap_or_default(), + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_account_summary_proto(bytes: &[u8]) -> Result { + let p = proto::AccountSummary::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + Ok(AccountSummary { + account: p.account.unwrap_or_default(), + tag: p.tag.unwrap_or_default(), + value: p.value.unwrap_or_default(), + currency: p.currency.unwrap_or_default(), + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_position_multi_proto(bytes: &[u8]) -> Result { + let p = proto::PositionMulti::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let contract = p.contract.as_ref().map(proto::decoders::decode_contract).unwrap_or_default(); + Ok(PositionMulti { + account: p.account.unwrap_or_default(), + contract, + position: p.position.as_deref().and_then(|s| s.parse::().ok()).unwrap_or_default(), + average_cost: p.avg_cost.unwrap_or_default(), + model_code: p.model_code.unwrap_or_default(), + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_account_multi_value_proto(bytes: &[u8]) -> Result { + let p = proto::AccountUpdateMulti::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + Ok(AccountMultiValue { + account: p.account.unwrap_or_default(), + model_code: p.model_code.unwrap_or_default(), + key: p.key.unwrap_or_default(), + value: p.value.unwrap_or_default(), + currency: p.currency.unwrap_or_default(), + }) +} + #[cfg(test)] mod tests { use crate::{ @@ -1126,4 +1227,53 @@ mod tests { assert!(result.is_ok(), "Decoding failed: {:?}", result.err()); assert_eq!(result.unwrap().timestamp, "12:34:56", "Timestamp mismatch"); } + + #[test] + fn test_decode_position_proto() { + use prost::Message; + + let proto_msg = crate::proto::Position { + account: Some("DU1234".into()), + contract: Some(crate::proto::Contract { + con_id: Some(265598), + symbol: Some("AAPL".into()), + sec_type: Some("STK".into()), + exchange: Some("SMART".into()), + currency: Some("USD".into()), + ..Default::default() + }), + position: Some("100".into()), + avg_cost: Some(150.25), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = super::decode_position_proto(&bytes).unwrap(); + assert_eq!(result.account, "DU1234"); + assert_eq!(result.contract.contract_id, 265598); + assert_eq!(result.contract.symbol.to_string(), "AAPL"); + assert_eq!(result.position, 100.0); + assert_eq!(result.average_cost, 150.25); + } + + #[test] + fn test_decode_pnl_proto() { + use prost::Message; + + let proto_msg = crate::proto::PnL { + req_id: Some(1), + daily_pn_l: Some(1234.56), + unrealized_pn_l: Some(500.0), + realized_pn_l: Some(f64::MAX), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = super::decode_pnl_proto(&bytes).unwrap(); + assert_eq!(result.daily_pnl, 1234.56); + assert_eq!(result.unrealized_pnl, Some(500.0)); + assert_eq!(result.realized_pnl, None); // f64::MAX filtered out + } } diff --git a/src/contracts/common/decoders/mod.rs b/src/contracts/common/decoders/mod.rs new file mode 100644 index 00000000..c8159840 --- /dev/null +++ b/src/contracts/common/decoders/mod.rs @@ -0,0 +1,4 @@ +pub(crate) mod proto; +mod text; + +pub(crate) use text::*; diff --git a/src/contracts/common/decoders/proto.rs b/src/contracts/common/decoders/proto.rs new file mode 100644 index 00000000..d59d60f4 --- /dev/null +++ b/src/contracts/common/decoders/proto.rs @@ -0,0 +1,59 @@ +use crate::contracts::ContractDetails; +use crate::Error; + +#[allow(dead_code)] +pub(crate) fn decode_contract_data_proto(bytes: &[u8]) -> Result { + let p: crate::proto::ContractData = prost::Message::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let default_contract = crate::proto::Contract::default(); + let default_details = crate::proto::ContractDetails::default(); + let proto_contract = p.contract.as_ref().unwrap_or(&default_contract); + let proto_details = p.contract_details.as_ref().unwrap_or(&default_details); + Ok(crate::proto::decoders::decode_contract_details(proto_contract, proto_details)) +} + +#[cfg(test)] +mod tests { + use super::*; + use prost::Message; + + #[test] + fn test_decode_contract_data_proto() { + let proto_msg = crate::proto::ContractData { + req_id: Some(1), + contract: Some(crate::proto::Contract { + con_id: Some(265598), + symbol: Some("AAPL".into()), + sec_type: Some("STK".into()), + exchange: Some("SMART".into()), + currency: Some("USD".into()), + local_symbol: Some("AAPL".into()), + trading_class: Some("NMS".into()), + ..Default::default() + }), + contract_details: Some(crate::proto::ContractDetails { + market_name: Some("NMS".into()), + min_tick: Some("0.01".into()), + long_name: Some("APPLE INC".into()), + industry: Some("Technology".into()), + category: Some("Computers".into()), + subcategory: Some("Consumer Electronics".into()), + ..Default::default() + }), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_contract_data_proto(&bytes).unwrap(); + assert_eq!(result.contract.contract_id, 265598); + assert_eq!(result.contract.symbol.to_string(), "AAPL"); + assert_eq!(result.contract.currency.to_string(), "USD"); + assert_eq!(result.contract.local_symbol, "AAPL"); + assert_eq!(result.market_name, "NMS"); + assert_eq!(result.min_tick, 0.01); + assert_eq!(result.long_name, "APPLE INC"); + assert_eq!(result.industry, "Technology"); + assert_eq!(result.category, "Computers"); + assert_eq!(result.subcategory, "Consumer Electronics"); + } +} diff --git a/src/contracts/common/decoders.rs b/src/contracts/common/decoders/text.rs similarity index 99% rename from src/contracts/common/decoders.rs rename to src/contracts/common/decoders/text.rs index 72312abb..322f1c3b 100644 --- a/src/contracts/common/decoders.rs +++ b/src/contracts/common/decoders/text.rs @@ -8,7 +8,7 @@ use crate::{ server_versions, Error, }; -use super::super::{ +use crate::contracts::{ Contract, ContractDescription, ContractDetails, FundAssetType, FundDistributionPolicyIndicator, IneligibilityReason, MarketRule, OptionChain, OptionComputation, PriceIncrement, TagValue, }; diff --git a/src/display_groups/common/decoders.rs b/src/display_groups/common/decoders.rs index a0858550..c56b242b 100644 --- a/src/display_groups/common/decoders.rs +++ b/src/display_groups/common/decoders.rs @@ -25,6 +25,13 @@ pub(crate) fn decode_display_group_updated(message: &mut ResponseMessage) -> Res Ok(DisplayGroupUpdate::new(contract_info)) } +#[allow(dead_code)] +pub(crate) fn decode_display_group_updated_proto(bytes: &[u8]) -> Result { + use prost::Message; + let p = crate::proto::DisplayGroupUpdated::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + Ok(DisplayGroupUpdate::new(p.contract_info.unwrap_or_default())) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/market_data/historical/common/decoders.rs b/src/market_data/historical/common/decoders.rs index 687a3c63..0396c4df 100644 --- a/src/market_data/historical/common/decoders.rs +++ b/src/market_data/historical/common/decoders.rs @@ -335,6 +335,148 @@ fn parse_bar_date(text: &str, time_zone: &Tz) -> Result { } } +// === Protobuf decoders === + +use prost::Message; + +use crate::proto; + +#[allow(dead_code)] +fn parse_str_f64(opt: &Option) -> f64 { + opt.as_deref().and_then(|s| s.parse::().ok()).unwrap_or_default() +} + +#[allow(dead_code)] +fn parse_str_i32(opt: &Option) -> i32 { + opt.as_deref().and_then(|s| s.parse::().ok()).unwrap_or_default() +} + +#[allow(dead_code)] +fn ts(secs: i64) -> OffsetDateTime { + OffsetDateTime::from_unix_timestamp(secs).unwrap_or(OffsetDateTime::UNIX_EPOCH) +} + +#[allow(dead_code)] +pub(crate) fn decode_historical_data_proto(bytes: &[u8]) -> Result, Error> { + let msg = proto::HistoricalData::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + + let bars = msg + .historical_data_bars + .iter() + .map(|b| { + let date_str = b.date.as_deref().unwrap_or_default(); + let date = date_str + .parse::() + .map(|ts_val| OffsetDateTime::from_unix_timestamp(ts_val).unwrap_or(OffsetDateTime::UNIX_EPOCH)) + .unwrap_or(OffsetDateTime::UNIX_EPOCH); + + Bar { + date, + open: b.open.unwrap_or_default(), + high: b.high.unwrap_or_default(), + low: b.low.unwrap_or_default(), + close: b.close.unwrap_or_default(), + volume: parse_str_f64(&b.volume), + wap: parse_str_f64(&b.wap), + count: b.bar_count.unwrap_or(-1), + } + }) + .collect(); + + Ok(bars) +} + +#[allow(dead_code)] +pub(crate) fn decode_head_timestamp_proto(bytes: &[u8]) -> Result { + let msg = proto::HeadTimestamp::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + Ok(msg.head_timestamp.unwrap_or_default()) +} + +#[allow(dead_code)] +pub(crate) fn decode_real_time_bar_proto(bytes: &[u8]) -> Result { + let msg = proto::RealTimeBarTick::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + + Ok(crate::market_data::realtime::Bar { + date: OffsetDateTime::from_unix_timestamp(msg.time.unwrap_or_default()).unwrap_or(OffsetDateTime::UNIX_EPOCH), + open: msg.open.unwrap_or_default(), + high: msg.high.unwrap_or_default(), + low: msg.low.unwrap_or_default(), + close: msg.close.unwrap_or_default(), + volume: parse_str_f64(&msg.volume), + wap: parse_str_f64(&msg.wap), + count: msg.count.unwrap_or_default(), + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_historical_ticks_proto(bytes: &[u8]) -> Result<(Vec, bool), Error> { + let msg = proto::HistoricalTicks::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + + let ticks = msg + .historical_ticks + .iter() + .map(|t| TickMidpoint { + timestamp: ts(t.time.unwrap_or_default()), + price: t.price.unwrap_or_default(), + size: parse_str_i32(&t.size), + }) + .collect(); + + Ok((ticks, msg.is_done.unwrap_or_default())) +} + +#[allow(dead_code)] +pub(crate) fn decode_historical_ticks_last_proto(bytes: &[u8]) -> Result<(Vec, bool), Error> { + let msg = proto::HistoricalTicksLast::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + + let ticks = msg + .historical_ticks_last + .iter() + .map(|t| { + let attr = t.tick_attrib_last.as_ref(); + TickLast { + timestamp: ts(t.time.unwrap_or_default()), + tick_attribute_last: TickAttributeLast { + past_limit: attr.and_then(|a| a.past_limit).unwrap_or_default(), + unreported: attr.and_then(|a| a.unreported).unwrap_or_default(), + }, + price: t.price.unwrap_or_default(), + size: parse_str_i32(&t.size), + exchange: t.exchange.clone().unwrap_or_default(), + special_conditions: t.special_conditions.clone().unwrap_or_default(), + } + }) + .collect(); + + Ok((ticks, msg.is_done.unwrap_or_default())) +} + +#[allow(dead_code)] +pub(crate) fn decode_historical_ticks_bid_ask_proto(bytes: &[u8]) -> Result<(Vec, bool), Error> { + let msg = proto::HistoricalTicksBidAsk::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + + let ticks = msg + .historical_ticks_bid_ask + .iter() + .map(|t| { + let attr = t.tick_attrib_bid_ask.as_ref(); + TickBidAsk { + timestamp: ts(t.time.unwrap_or_default()), + tick_attribute_bid_ask: TickAttributeBidAsk { + ask_past_high: attr.and_then(|a| a.ask_past_high).unwrap_or_default(), + bid_past_low: attr.and_then(|a| a.bid_past_low).unwrap_or_default(), + }, + price_bid: t.price_bid.unwrap_or_default(), + price_ask: t.price_ask.unwrap_or_default(), + size_bid: parse_str_i32(&t.size_bid), + size_ask: parse_str_i32(&t.size_ask), + } + }) + .collect(); + + Ok((ticks, msg.is_done.unwrap_or_default())) +} + #[cfg(test)] mod tests { use super::*; @@ -577,4 +719,108 @@ mod tests { assert!(msg.contains("not_a_number"), "error should include the bad value: {msg}"); assert!(msg.contains("invalid digit"), "error should include parse reason: {msg}"); } + + #[test] + fn test_decode_historical_data_proto() { + use prost::Message; + + let proto_msg = crate::proto::HistoricalData { + req_id: Some(1), + historical_data_bars: vec![ + crate::proto::HistoricalDataBar { + date: Some("1681133400".into()), + open: Some(185.50), + high: Some(186.00), + low: Some(185.00), + close: Some(185.75), + volume: Some("1000".into()), + wap: Some("185.625".into()), + bar_count: Some(150), + }, + crate::proto::HistoricalDataBar { + date: Some("1681219800".into()), + open: Some(186.00), + high: Some(187.00), + low: Some(185.50), + close: Some(186.50), + volume: Some("2000".into()), + wap: Some("186.25".into()), + bar_count: Some(300), + }, + ], + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let bars = decode_historical_data_proto(&bytes).unwrap(); + assert_eq!(bars.len(), 2); + + assert_eq!(bars[0].date, datetime!(2023-04-10 13:30:00 UTC)); + assert_eq!(bars[0].open, 185.50); + assert_eq!(bars[0].high, 186.00); + assert_eq!(bars[0].low, 185.00); + assert_eq!(bars[0].close, 185.75); + assert_eq!(bars[0].volume, 1000.0); + assert_eq!(bars[0].wap, 185.625); + assert_eq!(bars[0].count, 150); + + assert_eq!(bars[1].open, 186.00); + assert_eq!(bars[1].count, 300); + } + + #[test] + fn test_decode_historical_ticks_last_proto() { + use prost::Message; + + let proto_msg = crate::proto::HistoricalTicksLast { + req_id: Some(1), + historical_ticks_last: vec![ + crate::proto::HistoricalTickLast { + time: Some(1681133400), + tick_attrib_last: Some(crate::proto::TickAttribLast { + past_limit: Some(true), + unreported: Some(false), + }), + price: Some(11.63), + size: Some("100".into()), + exchange: Some("ISLAND".into()), + special_conditions: Some("O X".into()), + }, + crate::proto::HistoricalTickLast { + time: Some(1681133401), + tick_attrib_last: Some(crate::proto::TickAttribLast { + past_limit: Some(false), + unreported: Some(true), + }), + price: Some(11.73), + size: Some("50".into()), + exchange: Some("FINRA".into()), + special_conditions: Some("I".into()), + }, + ], + is_done: Some(true), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let (ticks, done) = decode_historical_ticks_last_proto(&bytes).unwrap(); + assert!(done); + assert_eq!(ticks.len(), 2); + + assert_eq!(ticks[0].timestamp, datetime!(2023-04-10 13:30:00 UTC)); + assert!(ticks[0].tick_attribute_last.past_limit); + assert!(!ticks[0].tick_attribute_last.unreported); + assert_eq!(ticks[0].price, 11.63); + assert_eq!(ticks[0].size, 100); + assert_eq!(ticks[0].exchange, "ISLAND"); + assert_eq!(ticks[0].special_conditions, "O X"); + + assert_eq!(ticks[1].timestamp, datetime!(2023-04-10 13:30:01 UTC)); + assert!(!ticks[1].tick_attribute_last.past_limit); + assert!(ticks[1].tick_attribute_last.unreported); + assert_eq!(ticks[1].size, 50); + assert_eq!(ticks[1].exchange, "FINRA"); + } } diff --git a/src/market_data/realtime/common/decoders.rs b/src/market_data/realtime/common/decoders.rs index c7d51ec5..653023e6 100644 --- a/src/market_data/realtime/common/decoders.rs +++ b/src/market_data/realtime/common/decoders.rs @@ -5,8 +5,8 @@ use crate::Error; use crate::{messages::ResponseMessage, server_versions}; use crate::market_data::realtime::{ - Bar, BidAsk, BidAskAttribute, DepthMarketDataDescription, MarketDepth, MarketDepthL2, MidPoint, TickEFP, TickGeneric, TickPrice, TickPriceSize, - TickRequestParameters, TickSize, TickString, TickType, TickTypes, Trade, TradeAttribute, + Bar, BidAsk, BidAskAttribute, DepthMarketDataDescription, MarketDepth, MarketDepthL2, MidPoint, TickAttribute, TickEFP, TickGeneric, TickPrice, + TickPriceSize, TickRequestParameters, TickSize, TickString, TickType, TickTypes, Trade, TradeAttribute, }; pub(crate) fn decode_realtime_bar(context: &DecoderContext, message: &mut ResponseMessage) -> Result { @@ -240,6 +240,141 @@ pub(crate) fn decode_tick_request_parameters(message: &mut ResponseMessage) -> R }) } +// === Protobuf decoders === + +#[allow(dead_code)] +pub(crate) fn decode_tick_price_proto(bytes: &[u8]) -> Result { + use prost::Message; + let msg = crate::proto::TickPrice::decode(bytes).map_err(|e| Error::Simple(e.to_string()))?; + + let tick_type = TickType::from(msg.tick_type.unwrap_or_default()); + let price = msg.price.unwrap_or_default(); + let size: f64 = msg.size.as_deref().and_then(|s| s.parse().ok()).unwrap_or(f64::MAX); + let attr_mask = msg.attr_mask.unwrap_or_default(); + + let attributes = TickAttribute { + can_auto_execute: attr_mask & 0x1 != 0, + past_limit: attr_mask & 0x2 != 0, + pre_open: attr_mask & 0x4 != 0, + }; + + let size_tick_type = match tick_type { + TickType::Bid => TickType::BidSize, + TickType::Ask => TickType::AskSize, + TickType::Last => TickType::LastSize, + TickType::DelayedBid => TickType::DelayedBidSize, + TickType::DelayedAsk => TickType::DelayedAskSize, + TickType::DelayedLast => TickType::DelayedLastSize, + _ => TickType::Unknown, + }; + + if size_tick_type == TickType::Unknown || size == f64::MAX { + Ok(TickTypes::Price(TickPrice { + tick_type, + price, + attributes, + })) + } else { + Ok(TickTypes::PriceSize(TickPriceSize { + price_tick_type: tick_type, + price, + attributes, + size_tick_type, + size, + })) + } +} + +#[allow(dead_code)] +pub(crate) fn decode_tick_size_proto(bytes: &[u8]) -> Result { + use prost::Message; + let msg = crate::proto::TickSize::decode(bytes).map_err(|e| Error::Simple(e.to_string()))?; + + Ok(TickSize { + tick_type: TickType::from(msg.tick_type.unwrap_or_default()), + size: msg.size.as_deref().and_then(|s| s.parse().ok()).unwrap_or_default(), + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_tick_string_proto(bytes: &[u8]) -> Result { + use prost::Message; + let msg = crate::proto::TickString::decode(bytes).map_err(|e| Error::Simple(e.to_string()))?; + + Ok(TickString { + tick_type: TickType::from(msg.tick_type.unwrap_or_default()), + value: msg.value.unwrap_or_default(), + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_tick_generic_proto(bytes: &[u8]) -> Result { + use prost::Message; + let msg = crate::proto::TickGeneric::decode(bytes).map_err(|e| Error::Simple(e.to_string()))?; + + Ok(TickGeneric { + tick_type: TickType::from(msg.tick_type.unwrap_or_default()), + value: msg.value.unwrap_or_default(), + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_tick_option_computation_proto(bytes: &[u8]) -> Result { + use prost::Message; + let msg = crate::proto::TickOptionComputation::decode(bytes).map_err(|e| Error::Simple(e.to_string()))?; + + fn optional(val: Option) -> Option { + val.filter(|&v| v != f64::MAX) + } + + Ok(OptionComputation { + field: TickType::from(msg.tick_type.unwrap_or_default()), + tick_attribute: msg.tick_attrib, + implied_volatility: optional(msg.implied_vol), + delta: optional(msg.delta), + option_price: optional(msg.opt_price), + present_value_dividend: optional(msg.pv_dividend), + gamma: optional(msg.gamma), + vega: optional(msg.vega), + theta: optional(msg.theta), + underlying_price: optional(msg.und_price), + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_market_depth_proto(bytes: &[u8]) -> Result { + use prost::Message; + let msg = crate::proto::MarketDepth::decode(bytes).map_err(|e| Error::Simple(e.to_string()))?; + + let data = msg.market_depth_data.ok_or_else(|| Error::Simple("missing market_depth_data".into()))?; + + Ok(MarketDepth { + position: data.position.unwrap_or_default(), + operation: data.operation.unwrap_or_default(), + side: data.side.unwrap_or_default(), + price: data.price.unwrap_or_default(), + size: data.size.as_deref().and_then(|s| s.parse().ok()).unwrap_or_default(), + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_market_depth_l2_proto(bytes: &[u8]) -> Result { + use prost::Message; + let msg = crate::proto::MarketDepthL2::decode(bytes).map_err(|e| Error::Simple(e.to_string()))?; + + let data = msg.market_depth_data.ok_or_else(|| Error::Simple("missing market_depth_data".into()))?; + + Ok(MarketDepthL2 { + position: data.position.unwrap_or_default(), + market_maker: data.market_maker.unwrap_or_default(), + operation: data.operation.unwrap_or_default(), + side: data.side.unwrap_or_default(), + price: data.price.unwrap_or_default(), + size: data.size.as_deref().and_then(|s| s.parse().ok()).unwrap_or_default(), + smart_depth: data.is_smart_depth.unwrap_or_default(), + }) +} + #[cfg(test)] mod tests { use super::*; @@ -722,4 +857,105 @@ mod tests { } } } + + #[cfg(test)] + mod proto_tests { + use super::*; + use prost::Message; + + #[test] + fn test_decode_tick_price_proto_with_size() { + // TickType::Bid = 1, should produce PriceSize with BidSize + let proto_msg = crate::proto::TickPrice { + req_id: Some(1), + tick_type: Some(1), // Bid + price: Some(150.25), + size: Some("100".into()), + attr_mask: Some(0x5), // can_auto_execute + pre_open + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_tick_price_proto(&bytes).unwrap(); + match result { + TickTypes::PriceSize(ps) => { + assert_eq!(ps.price_tick_type, TickType::Bid); + assert_eq!(ps.price, 150.25); + assert_eq!(ps.size, 100.0); + assert_eq!(ps.size_tick_type, TickType::BidSize); + assert!(ps.attributes.can_auto_execute); + assert!(!ps.attributes.past_limit); + assert!(ps.attributes.pre_open); + } + _ => panic!("expected PriceSize variant"), + } + } + + #[test] + fn test_decode_tick_price_proto_unknown_type() { + // TickType 99 => Unknown size tick type => returns Price variant + let proto_msg = crate::proto::TickPrice { + req_id: Some(1), + tick_type: Some(99), + price: Some(42.0), + size: Some("10".into()), + attr_mask: Some(0x2), // past_limit + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_tick_price_proto(&bytes).unwrap(); + match result { + TickTypes::Price(tp) => { + assert_eq!(tp.price, 42.0); + assert!(tp.attributes.past_limit); + } + _ => panic!("expected Price variant for unknown tick type"), + } + } + + #[test] + fn test_decode_tick_size_proto() { + let proto_msg = crate::proto::TickSize { + req_id: Some(1), + tick_type: Some(0), // BidSize + size: Some("500".into()), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_tick_size_proto(&bytes).unwrap(); + assert_eq!(result.tick_type, TickType::BidSize); + assert_eq!(result.size, 500.0); + } + + #[test] + fn test_decode_market_depth_proto() { + let proto_msg = crate::proto::MarketDepth { + req_id: Some(1), + market_depth_data: Some(crate::proto::MarketDepthData { + position: Some(3), + operation: Some(0), + side: Some(1), + price: Some(149.50), + size: Some("200".into()), + market_maker: None, + is_smart_depth: None, + }), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_market_depth_proto(&bytes).unwrap(); + assert_eq!(result.position, 3); + assert_eq!(result.operation, 0); + assert_eq!(result.side, 1); + assert_eq!(result.price, 149.50); + assert_eq!(result.size, 200.0); + } + } } diff --git a/src/news/common/decoders.rs b/src/news/common/decoders.rs index e619a28e..145bda8e 100644 --- a/src/news/common/decoders.rs +++ b/src/news/common/decoders.rs @@ -97,6 +97,55 @@ fn parse_unix_timestamp(time: &str) -> Result { } } +#[allow(dead_code)] +pub(crate) fn decode_news_bulletin_proto(bytes: &[u8]) -> Result { + use prost::Message; + let p = crate::proto::NewsBulletin::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + Ok(NewsBulletin { + message_id: p.news_msg_id.unwrap_or_default(), + message_type: p.news_msg_type.unwrap_or_default(), + message: p.news_message.unwrap_or_default(), + exchange: p.originating_exch.unwrap_or_default(), + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_news_article_proto(bytes: &[u8]) -> Result { + use prost::Message; + let p = crate::proto::NewsArticle::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + Ok(NewsArticleBody { + article_type: ArticleType::from(p.article_type.unwrap_or_default()), + article_text: p.article_text.unwrap_or_default(), + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_historical_news_proto(bytes: &[u8]) -> Result { + use prost::Message; + let p = crate::proto::HistoricalNews::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + + let time = p + .time + .as_deref() + .and_then(|t| { + let format = format_description!("[year]-[month]-[day] [hour]:[minute]:[second].[subsecond]"); + PrimitiveDateTime::parse(t, format).ok() + }) + .and_then(|dt| match dt.assume_timezone(timezones::db::UTC) { + time_tz::OffsetResult::Some(v) => Some(v), + _ => None, + }) + .unwrap_or(OffsetDateTime::UNIX_EPOCH); + + Ok(NewsArticle { + time, + provider_code: p.provider_code.unwrap_or_default(), + article_id: p.article_id.unwrap_or_default(), + headline: p.headline.unwrap_or_default(), + extra_data: String::new(), + }) +} + #[cfg(test)] mod tests { use super::*; @@ -115,4 +164,25 @@ mod tests { assert!(msg.contains("not_a_number"), "error should include the bad value: {msg}"); assert!(msg.contains("invalid digit"), "error should include parse reason: {msg}"); } + + #[test] + fn test_decode_news_bulletin_proto() { + use prost::Message; + + let proto_msg = crate::proto::NewsBulletin { + news_msg_id: Some(42), + news_msg_type: Some(1), + news_message: Some("Market closed early".into()), + originating_exch: Some("NYSE".into()), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_news_bulletin_proto(&bytes).unwrap(); + assert_eq!(result.message_id, 42); + assert_eq!(result.message_type, 1); + assert_eq!(result.message, "Market closed early"); + assert_eq!(result.exchange, "NYSE"); + } } diff --git a/src/orders/common/decoders.rs b/src/orders/common/decoders.rs index c4fe6df7..b0586e73 100644 --- a/src/orders/common/decoders.rs +++ b/src/orders/common/decoders.rs @@ -1128,6 +1128,83 @@ fn decode_percent_change_condition(message: &mut ResponseMessage, is_conjunction })) } +// === Protobuf decoders === + +#[allow(dead_code)] +pub(crate) fn decode_open_order_proto(bytes: &[u8]) -> Result { + let p = prost::Message::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p: crate::proto::OpenOrder = p; + let contract = p.contract.as_ref().map(crate::proto::decoders::decode_contract).unwrap_or_default(); + let order = p.order.as_ref().map(crate::proto::decoders::decode_order).unwrap_or_default(); + let order_state = p.order_state.as_ref().map(crate::proto::decoders::decode_order_state).unwrap_or_default(); + + Ok(OrderData { + order_id: p.order_id.unwrap_or_default(), + contract, + order, + order_state, + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_order_status_proto(bytes: &[u8]) -> Result { + let p: crate::proto::OrderStatus = prost::Message::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + + Ok(OrderStatus { + order_id: p.order_id.unwrap_or_default(), + status: p.status.unwrap_or_default(), + filled: p.filled.as_deref().and_then(|s| s.parse().ok()).unwrap_or_default(), + remaining: p.remaining.as_deref().and_then(|s| s.parse().ok()).unwrap_or_default(), + average_fill_price: p.avg_fill_price.unwrap_or_default(), + perm_id: p.perm_id.unwrap_or_default() as i32, + parent_id: p.parent_id.unwrap_or_default(), + last_fill_price: p.last_fill_price.unwrap_or_default(), + client_id: p.client_id.unwrap_or_default(), + why_held: p.why_held.unwrap_or_default(), + market_cap_price: p.mkt_cap_price.unwrap_or_default(), + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_execution_data_proto(bytes: &[u8]) -> Result { + let p: crate::proto::ExecutionDetails = prost::Message::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + + Ok(ExecutionData { + request_id: p.req_id.unwrap_or_default(), + contract: p.contract.as_ref().map(crate::proto::decoders::decode_contract).unwrap_or_default(), + execution: p.execution.as_ref().map(crate::proto::decoders::decode_execution).unwrap_or_default(), + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_completed_order_proto(bytes: &[u8]) -> Result { + let p: crate::proto::CompletedOrder = prost::Message::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let contract = p.contract.as_ref().map(crate::proto::decoders::decode_contract).unwrap_or_default(); + let order = p.order.as_ref().map(crate::proto::decoders::decode_order).unwrap_or_default(); + let order_state = p.order_state.as_ref().map(crate::proto::decoders::decode_order_state).unwrap_or_default(); + + Ok(OrderData { + order_id: order.order_id, + contract, + order, + order_state, + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_commission_report_proto(bytes: &[u8]) -> Result { + let p: crate::proto::CommissionAndFeesReport = prost::Message::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + + Ok(CommissionReport { + execution_id: p.exec_id.unwrap_or_default(), + commission: p.commission_and_fees.unwrap_or_default(), + currency: p.currency.unwrap_or_default(), + realized_pnl: p.realized_pnl.filter(|&v| v != f64::MAX), + yields: p.bond_yield.filter(|&v| v != f64::MAX), + yield_redemption_date: p.yield_redemption_date.unwrap_or_default(), + }) +} + #[cfg(test)] mod tests { use super::*; @@ -2315,4 +2392,107 @@ mod tests { assert!(!result.order.professional_customer); assert_eq!(result.order.submitter, ""); } + + #[test] + fn test_decode_open_order_proto() { + use prost::Message; + + let proto_msg = crate::proto::OpenOrder { + order_id: Some(42), + contract: Some(crate::proto::Contract { + con_id: Some(265598), + symbol: Some("AAPL".into()), + sec_type: Some("STK".into()), + exchange: Some("SMART".into()), + currency: Some("USD".into()), + ..Default::default() + }), + order: Some(crate::proto::Order { + order_id: Some(42), + action: Some("BUY".into()), + total_quantity: Some("100".into()), + order_type: Some("LMT".into()), + lmt_price: Some(150.0), + ..Default::default() + }), + order_state: Some(crate::proto::OrderState { + status: Some("Submitted".into()), + ..Default::default() + }), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_open_order_proto(&bytes).unwrap(); + assert_eq!(result.order_id, 42); + assert_eq!(result.contract.contract_id, 265598); + assert_eq!(result.contract.symbol.to_string(), "AAPL"); + assert_eq!(result.order.order_id, 42); + assert_eq!(result.order.action, Action::Buy); + assert_eq!(result.order.total_quantity, 100.0); + assert_eq!(result.order.order_type, "LMT"); + assert_eq!(result.order.limit_price, Some(150.0)); + assert_eq!(result.order_state.status, "Submitted"); + } + + #[test] + fn test_decode_order_status_proto() { + use prost::Message; + + let proto_msg = crate::proto::OrderStatus { + order_id: Some(99), + status: Some("Filled".into()), + filled: Some("50".into()), + remaining: Some("0".into()), + avg_fill_price: Some(152.5), + perm_id: Some(123456), + parent_id: Some(10), + last_fill_price: Some(152.75), + client_id: Some(7), + why_held: Some("locate".into()), + mkt_cap_price: Some(1.23), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_order_status_proto(&bytes).unwrap(); + assert_eq!(result.order_id, 99); + assert_eq!(result.status, "Filled"); + assert_eq!(result.filled, 50.0); + assert_eq!(result.remaining, 0.0); + assert_eq!(result.average_fill_price, 152.5); + assert_eq!(result.perm_id, 123456); + assert_eq!(result.parent_id, 10); + assert_eq!(result.last_fill_price, 152.75); + assert_eq!(result.client_id, 7); + assert_eq!(result.why_held, "locate"); + assert_eq!(result.market_cap_price, 1.23); + } + + #[test] + fn test_decode_commission_report_proto() { + use prost::Message; + + let proto_msg = crate::proto::CommissionAndFeesReport { + exec_id: Some("exec123".into()), + commission_and_fees: Some(1.25), + currency: Some("USD".into()), + realized_pnl: Some(500.0), + bond_yield: Some(f64::MAX), + yield_redemption_date: Some("20260101".into()), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_commission_report_proto(&bytes).unwrap(); + assert_eq!(result.execution_id, "exec123"); + assert_eq!(result.commission, 1.25); + assert_eq!(result.currency, "USD"); + assert_eq!(result.realized_pnl, Some(500.0)); + assert_eq!(result.yields, None); // f64::MAX filtered out + assert_eq!(result.yield_redemption_date, "20260101"); + } } diff --git a/src/proto/decoders.rs b/src/proto/decoders.rs new file mode 100644 index 00000000..497b8523 --- /dev/null +++ b/src/proto/decoders.rs @@ -0,0 +1,542 @@ +use prost::Message; + +use crate::contracts::{ + ComboLeg, ComboLegOpenClose, Contract, ContractDetails, Currency, DeltaNeutralContract, Exchange, FundAssetType, FundDistributionPolicyIndicator, + IneligibilityReason, SecurityType, Symbol, TagValue, +}; +use crate::orders::conditions::TriggerMethod; +use crate::orders::{ + Action, Execution, Liquidity, OcaType, Order, OrderAllocation, OrderCondition, OrderOpenClose, OrderOrigin, OrderState, ReferencePriceType, + Rule80A, ShortSaleSlot, SoftDollarTier, TimeInForce, VolatilityType, +}; +use crate::proto; +use crate::Error; + +// === Helper functions === + +fn s(opt: &Option) -> String { + opt.clone().unwrap_or_default() +} + +fn parse_f64(opt: &Option) -> f64 { + opt.as_deref().and_then(|s| s.parse::().ok()).unwrap_or_default() +} + +fn optional_f64(val: Option) -> Option { + val.filter(|&v| v != f64::MAX) +} + +fn optional_string_f64(opt: &Option) -> Option { + opt.as_deref() + .and_then(|s| s.parse::().ok()) + .and_then(|v| if v == f64::MAX { None } else { Some(v) }) +} + +// === Shared converters === + +pub fn decode_contract(proto: &proto::Contract) -> Contract { + Contract { + contract_id: proto.con_id.unwrap_or_default(), + symbol: Symbol::from(s(&proto.symbol)), + security_type: SecurityType::from(proto.sec_type.as_deref().unwrap_or_default()), + last_trade_date_or_contract_month: s(&proto.last_trade_date_or_contract_month), + strike: proto.strike.unwrap_or_default(), + right: s(&proto.right), + multiplier: proto.multiplier.map(|m| m.to_string()).unwrap_or_default(), + exchange: Exchange::from(s(&proto.exchange)), + primary_exchange: Exchange::from(s(&proto.primary_exch)), + currency: Currency::from(s(&proto.currency)), + local_symbol: s(&proto.local_symbol), + trading_class: s(&proto.trading_class), + include_expired: proto.include_expired.unwrap_or_default(), + security_id_type: s(&proto.sec_id_type), + security_id: s(&proto.sec_id), + description: s(&proto.description), + issuer_id: s(&proto.issuer_id), + combo_legs_description: s(&proto.combo_legs_descrip), + combo_legs: proto.combo_legs.iter().map(decode_combo_leg).collect(), + delta_neutral_contract: proto.delta_neutral_contract.as_ref().map(decode_delta_neutral_contract), + last_trade_date: None, + } +} + +pub fn decode_combo_leg(proto: &proto::ComboLeg) -> ComboLeg { + ComboLeg { + contract_id: proto.con_id.unwrap_or_default(), + ratio: proto.ratio.unwrap_or_default(), + action: s(&proto.action), + exchange: s(&proto.exchange), + open_close: ComboLegOpenClose::from(proto.open_close.unwrap_or_default()), + short_sale_slot: proto.short_sales_slot.unwrap_or_default(), + designated_location: s(&proto.designated_location), + exempt_code: proto.exempt_code.unwrap_or_default(), + } +} + +pub fn decode_delta_neutral_contract(proto: &proto::DeltaNeutralContract) -> DeltaNeutralContract { + DeltaNeutralContract { + contract_id: proto.con_id.unwrap_or_default(), + delta: proto.delta.unwrap_or_default(), + price: proto.price.unwrap_or_default(), + } +} + +pub fn decode_soft_dollar_tier(proto: &proto::SoftDollarTier) -> SoftDollarTier { + SoftDollarTier { + name: s(&proto.name), + value: s(&proto.value), + display_name: s(&proto.display_name), + } +} + +pub fn decode_order(proto: &proto::Order) -> Order { + let mut order = Order::default(); + + order.client_id = proto.client_id.unwrap_or_default(); + order.order_id = proto.order_id.unwrap_or_default(); + order.perm_id = proto.perm_id.unwrap_or_default() as i32; + order.parent_id = proto.parent_id.unwrap_or_default(); + + order.action = Action::from(proto.action.as_deref().unwrap_or("BUY")); + order.total_quantity = parse_f64(&proto.total_quantity); + order.display_size = proto.display_size.map(Some).unwrap_or(Some(0)); + order.order_type = s(&proto.order_type); + order.limit_price = optional_f64(proto.lmt_price); + order.aux_price = optional_f64(proto.aux_price); + order.tif = TimeInForce::from(proto.tif.as_deref().unwrap_or("DAY")); + + // clearing info + order.account = s(&proto.account); + order.settling_firm = s(&proto.settling_firm); + order.clearing_account = s(&proto.clearing_account); + order.clearing_intent = s(&proto.clearing_intent); + + // secondary attributes + order.all_or_none = proto.all_or_none.unwrap_or_default(); + order.block_order = proto.block_order.unwrap_or_default(); + order.hidden = proto.hidden.unwrap_or_default(); + order.outside_rth = proto.outside_rth.unwrap_or_default(); + order.sweep_to_fill = proto.sweep_to_fill.unwrap_or_default(); + order.percent_offset = optional_f64(proto.percent_offset); + order.trailing_percent = optional_f64(proto.trailing_percent); + order.trail_stop_price = optional_f64(proto.trail_stop_price); + order.min_qty = proto.min_qty; + order.good_after_time = s(&proto.good_after_time); + order.good_till_date = s(&proto.good_till_date); + order.oca_group = s(&proto.oca_group); + order.order_ref = s(&proto.order_ref); + order.rule_80_a = proto.rule80_a.as_deref().and_then(Rule80A::from); + order.oca_type = OcaType::from(proto.oca_type.unwrap_or_default()); + order.trigger_method = TriggerMethod::from(proto.trigger_method.unwrap_or_default()); + + // extended order fields + order.active_start_time = s(&proto.active_start_time); + order.active_stop_time = s(&proto.active_stop_time); + + // advisor allocation + order.fa_group = s(&proto.fa_group); + order.fa_method = s(&proto.fa_method); + order.fa_percentage = s(&proto.fa_percentage); + + // volatility orders + order.volatility = optional_f64(proto.volatility); + order.volatility_type = proto.volatility_type.map(VolatilityType::from); + order.continuous_update = proto.continuous_update.unwrap_or_default(); + order.reference_price_type = proto.reference_price_type.map(ReferencePriceType::from); + order.delta_neutral_order_type = s(&proto.delta_neutral_order_type); + order.delta_neutral_aux_price = optional_f64(proto.delta_neutral_aux_price); + order.delta_neutral_con_id = proto.delta_neutral_con_id.unwrap_or_default(); + order.delta_neutral_open_close = s(&proto.delta_neutral_open_close); + order.delta_neutral_short_sale = proto.delta_neutral_short_sale.unwrap_or_default(); + order.delta_neutral_short_sale_slot = proto.delta_neutral_short_sale_slot.unwrap_or_default(); + order.delta_neutral_designated_location = s(&proto.delta_neutral_designated_location); + + // scale orders + order.scale_init_level_size = proto.scale_init_level_size; + order.scale_subs_level_size = proto.scale_subs_level_size; + order.scale_price_increment = optional_f64(proto.scale_price_increment); + order.scale_price_adjust_value = optional_f64(proto.scale_price_adjust_value); + order.scale_price_adjust_interval = proto.scale_price_adjust_interval; + order.scale_profit_offset = optional_f64(proto.scale_profit_offset); + order.scale_auto_reset = proto.scale_auto_reset.unwrap_or_default(); + order.scale_init_position = proto.scale_init_position; + order.scale_init_fill_qty = proto.scale_init_fill_qty; + order.scale_random_percent = proto.scale_random_percent.unwrap_or_default(); + order.scale_table = s(&proto.scale_table); + + // hedge orders + order.hedge_type = s(&proto.hedge_type); + order.hedge_param = s(&proto.hedge_param); + + // algo orders + order.algo_strategy = s(&proto.algo_strategy); + order.algo_params = proto + .algo_params + .iter() + .map(|(k, v)| TagValue { + tag: k.clone(), + value: v.clone(), + }) + .collect(); + order.algo_id = s(&proto.algo_id); + + // combo orders + order.smart_combo_routing_params = proto + .smart_combo_routing_params + .iter() + .map(|(k, v)| TagValue { + tag: k.clone(), + value: v.clone(), + }) + .collect(); + + // processing control + order.what_if = proto.what_if.unwrap_or_default(); + order.transmit = proto.transmit.unwrap_or(true); + order.override_percentage_constraints = proto.override_percentage_constraints.unwrap_or_default(); + + // institutional orders + order.open_close = proto.open_close.as_deref().and_then(OrderOpenClose::from); + order.origin = OrderOrigin::from(proto.origin.unwrap_or_default()); + order.short_sale_slot = ShortSaleSlot::from(proto.short_sale_slot.unwrap_or_default()); + order.designated_location = s(&proto.designated_location); + order.exempt_code = proto.exempt_code.unwrap_or(-1); + order.delta_neutral_settling_firm = s(&proto.delta_neutral_settling_firm); + order.delta_neutral_clearing_account = s(&proto.delta_neutral_clearing_account); + order.delta_neutral_clearing_intent = s(&proto.delta_neutral_clearing_intent); + + // SMART routing + order.discretionary_amt = proto.discretionary_amt.unwrap_or_default(); + order.opt_out_smart_routing = proto.opt_out_smart_routing.unwrap_or_default(); + + // BOX orders + order.starting_price = optional_f64(proto.starting_price); + order.stock_ref_price = optional_f64(proto.stock_ref_price); + order.delta = optional_f64(proto.delta); + + // pegged orders + order.stock_range_lower = optional_f64(proto.stock_range_lower); + order.stock_range_upper = optional_f64(proto.stock_range_upper); + + // not held + order.not_held = proto.not_held.unwrap_or_default(); + + // order misc options + order.order_misc_options = proto + .order_misc_options + .iter() + .map(|(k, v)| TagValue { + tag: k.clone(), + value: v.clone(), + }) + .collect(); + + // solicited / randomize + order.solicited = proto.solicited.unwrap_or_default(); + order.randomize_size = proto.randomize_size.unwrap_or_default(); + order.randomize_price = proto.randomize_price.unwrap_or_default(); + + // PEG2BENCH fields + order.reference_contract_id = proto.reference_contract_id.unwrap_or_default(); + order.pegged_change_amount = optional_f64(proto.pegged_change_amount); + order.is_pegged_change_amount_decrease = proto.is_pegged_change_amount_decrease.unwrap_or_default(); + order.reference_change_amount = optional_f64(proto.reference_change_amount); + order.reference_exchange = s(&proto.reference_exchange_id); + order.adjusted_order_type = s(&proto.adjusted_order_type); + order.trigger_price = optional_f64(proto.trigger_price); + order.adjusted_stop_price = optional_f64(proto.adjusted_stop_price); + order.adjusted_stop_limit_price = optional_f64(proto.adjusted_stop_limit_price); + order.adjusted_trailing_amount = optional_f64(proto.adjusted_trailing_amount); + order.adjustable_trailing_unit = proto.adjustable_trailing_unit.unwrap_or_default(); + order.limit_price_offset = optional_f64(proto.lmt_price_offset); + + // conditions + order.conditions = proto.conditions.iter().map(decode_order_condition).collect(); + order.conditions_cancel_order = proto.conditions_cancel_order.unwrap_or_default(); + order.conditions_ignore_rth = proto.conditions_ignore_rth.unwrap_or_default(); + + // models + order.model_code = s(&proto.model_code); + order.ext_operator = s(&proto.ext_operator); + order.soft_dollar_tier = proto.soft_dollar_tier.as_ref().map(decode_soft_dollar_tier).unwrap_or_default(); + + // native cash quantity + order.cash_qty = optional_f64(proto.cash_qty); + + // MIFID2 + order.mifid2_decision_maker = s(&proto.mifid2_decision_maker); + order.mifid2_decision_algo = s(&proto.mifid2_decision_algo); + order.mifid2_execution_trader = s(&proto.mifid2_execution_trader); + order.mifid2_execution_algo = s(&proto.mifid2_execution_algo); + + // additional fields + order.dont_use_auto_price_for_hedge = proto.dont_use_auto_price_for_hedge.unwrap_or_default(); + order.is_oms_container = proto.is_oms_container.unwrap_or_default(); + order.discretionary_up_to_limit_price = proto.discretionary_up_to_limit_price.unwrap_or_default(); + order.auto_cancel_date = s(&proto.auto_cancel_date); + order.filled_quantity = parse_f64(&proto.filled_quantity); + order.ref_futures_con_id = proto.ref_futures_con_id.map(Some).unwrap_or(Some(0)); + order.auto_cancel_parent = proto.auto_cancel_parent.unwrap_or_default(); + order.shareholder = s(&proto.shareholder); + order.imbalance_only = proto.imbalance_only.unwrap_or_default(); + order.route_marketable_to_bbo = proto.route_marketable_to_bbo.unwrap_or_default() != 0; + order.parent_perm_id = proto.parent_perm_id; + order.use_price_mgmt_algo = proto.use_price_mgmt_algo.unwrap_or_default() != 0; + order.duration = proto.duration; + order.post_to_ats = proto.post_to_ats; + order.advanced_error_override = s(&proto.advanced_error_override); + order.manual_order_time = s(&proto.manual_order_time); + order.min_trade_qty = proto.min_trade_qty; + order.min_compete_size = proto.min_compete_size; + order.compete_against_best_offset = optional_f64(proto.compete_against_best_offset); + order.mid_offset_at_whole = optional_f64(proto.mid_offset_at_whole); + order.mid_offset_at_half = optional_f64(proto.mid_offset_at_half); + order.customer_account = s(&proto.customer_account); + order.professional_customer = proto.professional_customer.unwrap_or_default(); + order.bond_accrued_interest = s(&proto.bond_accrued_interest); + order.include_overnight = proto.include_overnight.unwrap_or_default(); + order.manual_order_indicator = proto.manual_order_indicator; + order.submitter = s(&proto.submitter); + + order +} + +fn decode_order_condition(proto: &proto::OrderCondition) -> OrderCondition { + use crate::orders::conditions::*; + + let condition_type = proto.r#type.unwrap_or_default(); + let is_conjunction = proto.is_conjunction_connection.unwrap_or(true); + let is_more = proto.is_more.unwrap_or_default(); + + match condition_type { + 1 => OrderCondition::Price(PriceCondition { + contract_id: proto.con_id.unwrap_or_default(), + exchange: s(&proto.exchange), + price: proto.price.unwrap_or_default(), + is_more, + is_conjunction, + trigger_method: TriggerMethod::from(proto.trigger_method.unwrap_or_default()), + }), + 3 => OrderCondition::Time(TimeCondition { + time: s(&proto.time), + is_more, + is_conjunction, + }), + 4 => OrderCondition::Margin(MarginCondition { + percent: proto.percent.unwrap_or_default(), + is_more, + is_conjunction, + }), + 5 => OrderCondition::Execution(ExecutionCondition { + symbol: s(&proto.symbol), + security_type: s(&proto.sec_type), + exchange: s(&proto.exchange), + is_conjunction, + }), + 6 => OrderCondition::Volume(VolumeCondition { + contract_id: proto.con_id.unwrap_or_default(), + exchange: s(&proto.exchange), + volume: proto.volume.unwrap_or_default(), + is_more, + is_conjunction, + }), + 7 => OrderCondition::PercentChange(PercentChangeCondition { + contract_id: proto.con_id.unwrap_or_default(), + exchange: s(&proto.exchange), + percent: proto.change_percent.unwrap_or_default(), + is_more, + is_conjunction, + }), + _ => OrderCondition::Price(PriceCondition::default()), + } +} + +pub fn decode_order_state(proto: &proto::OrderState) -> OrderState { + OrderState { + status: s(&proto.status), + initial_margin_before: optional_f64(proto.init_margin_before), + maintenance_margin_before: optional_f64(proto.maint_margin_before), + equity_with_loan_before: optional_f64(proto.equity_with_loan_before), + initial_margin_change: optional_f64(proto.init_margin_change), + maintenance_margin_change: optional_f64(proto.maint_margin_change), + equity_with_loan_change: optional_f64(proto.equity_with_loan_change), + initial_margin_after: optional_f64(proto.init_margin_after), + maintenance_margin_after: optional_f64(proto.maint_margin_after), + equity_with_loan_after: optional_f64(proto.equity_with_loan_after), + commission: optional_f64(proto.commission_and_fees), + minimum_commission: optional_f64(proto.min_commission_and_fees), + maximum_commission: optional_f64(proto.max_commission_and_fees), + commission_currency: s(&proto.commission_and_fees_currency), + margin_currency: s(&proto.margin_currency), + initial_margin_before_outside_rth: optional_f64(proto.init_margin_before_outside_rth), + maintenance_margin_before_outside_rth: optional_f64(proto.maint_margin_before_outside_rth), + equity_with_loan_before_outside_rth: optional_f64(proto.equity_with_loan_before_outside_rth), + initial_margin_change_outside_rth: optional_f64(proto.init_margin_change_outside_rth), + maintenance_margin_change_outside_rth: optional_f64(proto.maint_margin_change_outside_rth), + equity_with_loan_change_outside_rth: optional_f64(proto.equity_with_loan_change_outside_rth), + initial_margin_after_outside_rth: optional_f64(proto.init_margin_after_outside_rth), + maintenance_margin_after_outside_rth: optional_f64(proto.maint_margin_after_outside_rth), + equity_with_loan_after_outside_rth: optional_f64(proto.equity_with_loan_after_outside_rth), + suggested_size: optional_string_f64(&proto.suggested_size), + reject_reason: s(&proto.reject_reason), + order_allocations: proto.order_allocations.iter().map(decode_order_allocation).collect(), + warning_text: s(&proto.warning_text), + completed_time: s(&proto.completed_time), + completed_status: s(&proto.completed_status), + } +} + +fn decode_order_allocation(proto: &proto::OrderAllocation) -> OrderAllocation { + OrderAllocation { + account: s(&proto.account), + position: optional_string_f64(&proto.position), + position_desired: optional_string_f64(&proto.position_desired), + position_after: optional_string_f64(&proto.position_after), + desired_alloc_qty: optional_string_f64(&proto.desired_alloc_qty), + allowed_alloc_qty: optional_string_f64(&proto.allowed_alloc_qty), + is_monetary: proto.is_monetary.unwrap_or_default(), + } +} + +pub fn decode_execution(proto: &proto::Execution) -> Execution { + Execution { + order_id: proto.order_id.unwrap_or_default(), + client_id: proto.client_id.unwrap_or_default(), + execution_id: s(&proto.exec_id), + time: s(&proto.time), + account_number: s(&proto.acct_number), + exchange: s(&proto.exchange), + side: s(&proto.side), + shares: parse_f64(&proto.shares), + price: proto.price.unwrap_or_default(), + perm_id: proto.perm_id.unwrap_or_default() as i32, + liquidation: if proto.is_liquidation.unwrap_or_default() { 1 } else { 0 }, + cumulative_quantity: parse_f64(&proto.cum_qty), + average_price: proto.avg_price.unwrap_or_default(), + order_reference: s(&proto.order_ref), + ev_rule: s(&proto.ev_rule), + ev_multiplier: optional_f64(proto.ev_multiplier), + model_code: s(&proto.model_code), + last_liquidity: Liquidity::from(proto.last_liquidity.unwrap_or_default()), + pending_price_revision: proto.is_price_revision_pending.unwrap_or_default(), + submitter: s(&proto.submitter), + } +} + +pub fn decode_contract_details(proto_contract: &proto::Contract, proto_details: &proto::ContractDetails) -> ContractDetails { + let contract = decode_contract(proto_contract); + + ContractDetails { + contract, + market_name: s(&proto_details.market_name), + min_tick: proto_details.min_tick.as_deref().and_then(|s| s.parse().ok()).unwrap_or_default(), + order_types: proto_details + .order_types + .as_deref() + .map(|s| s.split(',').map(|t| t.trim().to_string()).collect()) + .unwrap_or_default(), + valid_exchanges: proto_details + .valid_exchanges + .as_deref() + .map(|s| s.split(',').map(|t| t.trim().to_string()).collect()) + .unwrap_or_default(), + price_magnifier: proto_details.price_magnifier.unwrap_or_default(), + under_contract_id: proto_details.under_con_id.unwrap_or_default(), + long_name: s(&proto_details.long_name), + contract_month: s(&proto_details.contract_month), + industry: s(&proto_details.industry), + category: s(&proto_details.category), + subcategory: s(&proto_details.subcategory), + time_zone_id: s(&proto_details.time_zone_id), + trading_hours: proto_details + .trading_hours + .as_deref() + .map(|s| s.split(';').map(|t| t.to_string()).collect()) + .unwrap_or_default(), + liquid_hours: proto_details + .liquid_hours + .as_deref() + .map(|s| s.split(';').map(|t| t.to_string()).collect()) + .unwrap_or_default(), + ev_rule: s(&proto_details.ev_rule), + ev_multiplier: proto_details.ev_multiplier.unwrap_or_default(), + agg_group: proto_details.agg_group.unwrap_or_default(), + sec_id_list: proto_details + .sec_id_list + .iter() + .map(|(k, v)| TagValue { + tag: k.clone(), + value: v.clone(), + }) + .collect(), + under_symbol: s(&proto_details.under_symbol), + under_security_type: s(&proto_details.under_sec_type), + market_rule_ids: proto_details + .market_rule_ids + .as_deref() + .map(|s| s.split(',').map(|t| t.to_string()).collect()) + .unwrap_or_default(), + real_expiration_date: s(&proto_details.real_expiration_date), + stock_type: s(&proto_details.stock_type), + min_size: parse_f64(&proto_details.min_size), + size_increment: parse_f64(&proto_details.size_increment), + suggested_size_increment: parse_f64(&proto_details.suggested_size_increment), + // fund fields + fund_name: s(&proto_details.fund_name), + fund_family: s(&proto_details.fund_family), + fund_type: s(&proto_details.fund_type), + fund_front_load: s(&proto_details.fund_front_load), + fund_back_load: s(&proto_details.fund_back_load), + fund_back_load_time_interval: s(&proto_details.fund_back_load_time_interval), + fund_management_fee: s(&proto_details.fund_management_fee), + fund_closed: proto_details.fund_closed.unwrap_or_default(), + fund_closed_for_new_investors: proto_details.fund_closed_for_new_investors.unwrap_or_default(), + fund_closed_for_new_money: proto_details.fund_closed_for_new_money.unwrap_or_default(), + fund_notify_amount: s(&proto_details.fund_notify_amount), + fund_minimum_initial_purchase: s(&proto_details.fund_minimum_initial_purchase), + fund_subsequent_minimum_purchase: s(&proto_details.fund_minimum_subsequent_purchase), + fund_blue_sky_states: s(&proto_details.fund_blue_sky_states), + fund_blue_sky_territories: s(&proto_details.fund_blue_sky_territories), + fund_distribution_policy_indicator: FundDistributionPolicyIndicator::from( + proto_details.fund_distribution_policy_indicator.as_deref().unwrap_or(""), + ), + fund_asset_type: FundAssetType::from(proto_details.fund_asset_type.as_deref().unwrap_or("")), + // bond fields + cusip: s(&proto_details.cusip), + ratings: s(&proto_details.ratings), + desc_append: s(&proto_details.desc_append), + bond_type: s(&proto_details.bond_type), + coupon_type: s(&proto_details.coupon_type), + callable: proto_details.callable.unwrap_or_default(), + putable: proto_details.puttable.unwrap_or_default(), + coupon: proto_details.coupon.unwrap_or_default(), + convertible: proto_details.convertible.unwrap_or_default(), + maturity: String::new(), + issue_date: s(&proto_details.issue_date), + next_option_date: s(&proto_details.next_option_date), + next_option_type: s(&proto_details.next_option_type), + next_option_partial: proto_details.next_option_partial.unwrap_or_default(), + notes: s(&proto_details.bond_notes), + // ineligibility reasons + ineligibility_reasons: proto_details + .ineligibility_reason_list + .iter() + .map(|r| IneligibilityReason { + id: s(&r.id), + description: s(&r.description), + }) + .collect(), + // defaults for fields not in protobuf + last_trade_time: String::new(), + } +} + +pub fn decode_error_message(bytes: &[u8]) -> Result<(i32, i32, String, String), Error> { + let p = proto::ErrorMessage::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + Ok(( + p.id.unwrap_or_default(), + p.error_code.unwrap_or_default(), + s(&p.error_msg), + s(&p.advanced_order_reject_json), + )) +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index ea2edb3f..83d8488b 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,2 +1,5 @@ #![allow(missing_docs, clippy::all)] include!("protobuf.rs"); + +#[allow(clippy::all)] +pub mod decoders; diff --git a/src/scanner/common/decoders.rs b/src/scanner/common/decoders.rs index 8efdc5a0..7e8d80fb 100644 --- a/src/scanner/common/decoders.rs +++ b/src/scanner/common/decoders.rs @@ -58,3 +58,28 @@ pub(in crate::scanner) fn decode_scanner_data(mut message: ResponseMessage) -> R Ok(matches) } + +#[allow(dead_code)] +pub(crate) fn decode_scanner_data_proto(bytes: &[u8]) -> Result, Error> { + use prost::Message; + let p = crate::proto::ScannerData::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + + let mut results = Vec::with_capacity(p.scanner_data_element.len()); + for elem in &p.scanner_data_element { + let contract = elem.contract.as_ref().map(crate::proto::decoders::decode_contract).unwrap_or_default(); + + let mut contract_details = crate::contracts::ContractDetails { + contract, + ..Default::default() + }; + contract_details.market_name = elem.market_name.clone().unwrap_or_default(); + + results.push(ScannerData { + rank: elem.rank.unwrap_or_default(), + contract_details, + leg: elem.combo_key.clone().unwrap_or_default(), + }); + } + + Ok(results) +} diff --git a/src/wsh/common/decoders.rs b/src/wsh/common/decoders.rs index ce9f0255..544e3153 100644 --- a/src/wsh/common/decoders.rs +++ b/src/wsh/common/decoders.rs @@ -22,6 +22,24 @@ pub(in crate::wsh) fn decode_wsh_event_data(mut message: ResponseMessage) -> Res }) } +#[allow(dead_code)] +pub(crate) fn decode_wsh_metadata_proto(bytes: &[u8]) -> Result { + use prost::Message; + let p = crate::proto::WshMetaData::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + Ok(WshMetadata { + data_json: p.data_json.unwrap_or_default(), + }) +} + +#[allow(dead_code)] +pub(crate) fn decode_wsh_event_data_proto(bytes: &[u8]) -> Result { + use prost::Message; + let p = crate::proto::WshEventData::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + Ok(WshEventData { + data_json: p.data_json.unwrap_or_default(), + }) +} + /// Helper function to decode event data messages with error handling pub(in crate::wsh) fn decode_event_data_message(message: ResponseMessage) -> Result { match message.message_type() { @@ -30,3 +48,23 @@ pub(in crate::wsh) fn decode_event_data_message(message: ResponseMessage) -> Res _ => Err(Error::UnexpectedResponse(message)), } } + +#[cfg(test)] +mod tests { + use super::*; + use prost::Message; + + #[test] + fn test_decode_wsh_metadata_proto() { + let proto_msg = crate::proto::WshMetaData { + req_id: Some(1), + data_json: Some(r#"{"key":"value"}"#.into()), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_wsh_metadata_proto(&bytes).unwrap(); + assert_eq!(result.data_json, r#"{"key":"value"}"#); + } +} From ca843365a108b641100b7e46455e825efa7944f4 Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Fri, 10 Apr 2026 22:00:08 -1000 Subject: [PATCH 2/9] change perm_id from i32 to i64, standardize proto decoder imports and error messages --- src/contracts/common/decoders/proto.rs | 4 +++- src/display_groups/common/decoders.rs | 2 +- src/market_data/realtime/common/decoders.rs | 23 ++++++++------------- src/news/common/decoders.rs | 4 +--- src/orders/common/decoders.rs | 11 +++++----- src/orders/mod.rs | 6 +++--- src/proto/decoders.rs | 4 ++-- src/scanner/common/decoders.rs | 3 ++- src/wsh/common/decoders.rs | 4 ++-- 9 files changed, 28 insertions(+), 33 deletions(-) diff --git a/src/contracts/common/decoders/proto.rs b/src/contracts/common/decoders/proto.rs index d59d60f4..fadd72cd 100644 --- a/src/contracts/common/decoders/proto.rs +++ b/src/contracts/common/decoders/proto.rs @@ -1,9 +1,11 @@ +use prost::Message; + use crate::contracts::ContractDetails; use crate::Error; #[allow(dead_code)] pub(crate) fn decode_contract_data_proto(bytes: &[u8]) -> Result { - let p: crate::proto::ContractData = prost::Message::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p: crate::proto::ContractData = Message::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; let default_contract = crate::proto::Contract::default(); let default_details = crate::proto::ContractDetails::default(); let proto_contract = p.contract.as_ref().unwrap_or(&default_contract); diff --git a/src/display_groups/common/decoders.rs b/src/display_groups/common/decoders.rs index c56b242b..7166e731 100644 --- a/src/display_groups/common/decoders.rs +++ b/src/display_groups/common/decoders.rs @@ -1,6 +1,7 @@ //! Decoders for display group messages. use log::warn; +use prost::Message; use crate::messages::{IncomingMessages, ResponseMessage}; use crate::Error; @@ -27,7 +28,6 @@ pub(crate) fn decode_display_group_updated(message: &mut ResponseMessage) -> Res #[allow(dead_code)] pub(crate) fn decode_display_group_updated_proto(bytes: &[u8]) -> Result { - use prost::Message; let p = crate::proto::DisplayGroupUpdated::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; Ok(DisplayGroupUpdate::new(p.contract_info.unwrap_or_default())) } diff --git a/src/market_data/realtime/common/decoders.rs b/src/market_data/realtime/common/decoders.rs index 653023e6..0f8c1643 100644 --- a/src/market_data/realtime/common/decoders.rs +++ b/src/market_data/realtime/common/decoders.rs @@ -1,3 +1,5 @@ +use prost::Message; + use crate::contracts::decode_option_computation; use crate::contracts::OptionComputation; use crate::subscriptions::DecoderContext; @@ -244,8 +246,7 @@ pub(crate) fn decode_tick_request_parameters(message: &mut ResponseMessage) -> R #[allow(dead_code)] pub(crate) fn decode_tick_price_proto(bytes: &[u8]) -> Result { - use prost::Message; - let msg = crate::proto::TickPrice::decode(bytes).map_err(|e| Error::Simple(e.to_string()))?; + let msg = crate::proto::TickPrice::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; let tick_type = TickType::from(msg.tick_type.unwrap_or_default()); let price = msg.price.unwrap_or_default(); @@ -287,8 +288,7 @@ pub(crate) fn decode_tick_price_proto(bytes: &[u8]) -> Result #[allow(dead_code)] pub(crate) fn decode_tick_size_proto(bytes: &[u8]) -> Result { - use prost::Message; - let msg = crate::proto::TickSize::decode(bytes).map_err(|e| Error::Simple(e.to_string()))?; + let msg = crate::proto::TickSize::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; Ok(TickSize { tick_type: TickType::from(msg.tick_type.unwrap_or_default()), @@ -298,8 +298,7 @@ pub(crate) fn decode_tick_size_proto(bytes: &[u8]) -> Result { #[allow(dead_code)] pub(crate) fn decode_tick_string_proto(bytes: &[u8]) -> Result { - use prost::Message; - let msg = crate::proto::TickString::decode(bytes).map_err(|e| Error::Simple(e.to_string()))?; + let msg = crate::proto::TickString::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; Ok(TickString { tick_type: TickType::from(msg.tick_type.unwrap_or_default()), @@ -309,8 +308,7 @@ pub(crate) fn decode_tick_string_proto(bytes: &[u8]) -> Result Result { - use prost::Message; - let msg = crate::proto::TickGeneric::decode(bytes).map_err(|e| Error::Simple(e.to_string()))?; + let msg = crate::proto::TickGeneric::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; Ok(TickGeneric { tick_type: TickType::from(msg.tick_type.unwrap_or_default()), @@ -320,8 +318,7 @@ pub(crate) fn decode_tick_generic_proto(bytes: &[u8]) -> Result Result { - use prost::Message; - let msg = crate::proto::TickOptionComputation::decode(bytes).map_err(|e| Error::Simple(e.to_string()))?; + let msg = crate::proto::TickOptionComputation::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; fn optional(val: Option) -> Option { val.filter(|&v| v != f64::MAX) @@ -343,8 +340,7 @@ pub(crate) fn decode_tick_option_computation_proto(bytes: &[u8]) -> Result Result { - use prost::Message; - let msg = crate::proto::MarketDepth::decode(bytes).map_err(|e| Error::Simple(e.to_string()))?; + let msg = crate::proto::MarketDepth::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; let data = msg.market_depth_data.ok_or_else(|| Error::Simple("missing market_depth_data".into()))?; @@ -359,8 +355,7 @@ pub(crate) fn decode_market_depth_proto(bytes: &[u8]) -> Result Result { - use prost::Message; - let msg = crate::proto::MarketDepthL2::decode(bytes).map_err(|e| Error::Simple(e.to_string()))?; + let msg = crate::proto::MarketDepthL2::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; let data = msg.market_depth_data.ok_or_else(|| Error::Simple("missing market_depth_data".into()))?; diff --git a/src/news/common/decoders.rs b/src/news/common/decoders.rs index 145bda8e..223c4538 100644 --- a/src/news/common/decoders.rs +++ b/src/news/common/decoders.rs @@ -1,5 +1,6 @@ use std::str; +use prost::Message; use time::macros::format_description; use time::{OffsetDateTime, PrimitiveDateTime}; use time_tz::{timezones, PrimitiveDateTimeExt, Tz}; @@ -99,7 +100,6 @@ fn parse_unix_timestamp(time: &str) -> Result { #[allow(dead_code)] pub(crate) fn decode_news_bulletin_proto(bytes: &[u8]) -> Result { - use prost::Message; let p = crate::proto::NewsBulletin::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; Ok(NewsBulletin { message_id: p.news_msg_id.unwrap_or_default(), @@ -111,7 +111,6 @@ pub(crate) fn decode_news_bulletin_proto(bytes: &[u8]) -> Result Result { - use prost::Message; let p = crate::proto::NewsArticle::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; Ok(NewsArticleBody { article_type: ArticleType::from(p.article_type.unwrap_or_default()), @@ -121,7 +120,6 @@ pub(crate) fn decode_news_article_proto(bytes: &[u8]) -> Result Result { - use prost::Message; let p = crate::proto::HistoricalNews::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; let time = p diff --git a/src/orders/common/decoders.rs b/src/orders/common/decoders.rs index b0586e73..c9807483 100644 --- a/src/orders/common/decoders.rs +++ b/src/orders/common/decoders.rs @@ -127,7 +127,7 @@ impl OrderDecoder { } fn read_perm_id(&mut self) -> Result<(), Error> { - self.order.perm_id = self.message.next_int()?; + self.order.perm_id = self.message.next_long()?; Ok(()) } @@ -858,7 +858,7 @@ pub(crate) fn decode_order_status(server_version: i32, message: &mut ResponseMes filled: message.next_double()?, remaining: message.next_double()?, average_fill_price: message.next_double()?, - perm_id: message.next_int()?, + perm_id: message.next_long()?, parent_id: message.next_int()?, last_fill_price: message.next_double()?, client_id: message.next_int()?, @@ -905,7 +905,7 @@ pub(crate) fn decode_execution_data(server_version: i32, message: &mut ResponseM execution.side = message.next_string()?; execution.shares = message.next_double()?; execution.price = message.next_double()?; - execution.perm_id = message.next_int()?; + execution.perm_id = message.next_long()?; execution.client_id = message.next_int()?; execution.liquidation = message.next_int()?; execution.cumulative_quantity = message.next_double()?; @@ -1132,8 +1132,7 @@ fn decode_percent_change_condition(message: &mut ResponseMessage, is_conjunction #[allow(dead_code)] pub(crate) fn decode_open_order_proto(bytes: &[u8]) -> Result { - let p = prost::Message::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; - let p: crate::proto::OpenOrder = p; + let p: crate::proto::OpenOrder = prost::Message::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; let contract = p.contract.as_ref().map(crate::proto::decoders::decode_contract).unwrap_or_default(); let order = p.order.as_ref().map(crate::proto::decoders::decode_order).unwrap_or_default(); let order_state = p.order_state.as_ref().map(crate::proto::decoders::decode_order_state).unwrap_or_default(); @@ -1156,7 +1155,7 @@ pub(crate) fn decode_order_status_proto(bytes: &[u8]) -> Result Order { order.client_id = proto.client_id.unwrap_or_default(); order.order_id = proto.order_id.unwrap_or_default(); - order.perm_id = proto.perm_id.unwrap_or_default() as i32; + order.perm_id = proto.perm_id.unwrap_or_default(); order.parent_id = proto.parent_id.unwrap_or_default(); order.action = Action::from(proto.action.as_deref().unwrap_or("BUY")); @@ -409,7 +409,7 @@ pub fn decode_execution(proto: &proto::Execution) -> Execution { side: s(&proto.side), shares: parse_f64(&proto.shares), price: proto.price.unwrap_or_default(), - perm_id: proto.perm_id.unwrap_or_default() as i32, + perm_id: proto.perm_id.unwrap_or_default(), liquidation: if proto.is_liquidation.unwrap_or_default() { 1 } else { 0 }, cumulative_quantity: parse_f64(&proto.cum_qty), average_price: proto.avg_price.unwrap_or_default(), diff --git a/src/scanner/common/decoders.rs b/src/scanner/common/decoders.rs index 7e8d80fb..a691bcdd 100644 --- a/src/scanner/common/decoders.rs +++ b/src/scanner/common/decoders.rs @@ -1,3 +1,5 @@ +use prost::Message; + use crate::contracts::{Currency, Exchange, SecurityType, Symbol}; use crate::messages::{IncomingMessages, ResponseMessage}; use crate::Error; @@ -61,7 +63,6 @@ pub(in crate::scanner) fn decode_scanner_data(mut message: ResponseMessage) -> R #[allow(dead_code)] pub(crate) fn decode_scanner_data_proto(bytes: &[u8]) -> Result, Error> { - use prost::Message; let p = crate::proto::ScannerData::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; let mut results = Vec::with_capacity(p.scanner_data_element.len()); diff --git a/src/wsh/common/decoders.rs b/src/wsh/common/decoders.rs index 544e3153..847676b7 100644 --- a/src/wsh/common/decoders.rs +++ b/src/wsh/common/decoders.rs @@ -1,5 +1,7 @@ //! Decoders for Wall Street Horizon messages +use prost::Message; + use crate::messages::{IncomingMessages, ResponseMessage}; use crate::wsh::{WshEventData, WshMetadata}; use crate::Error; @@ -24,7 +26,6 @@ pub(in crate::wsh) fn decode_wsh_event_data(mut message: ResponseMessage) -> Res #[allow(dead_code)] pub(crate) fn decode_wsh_metadata_proto(bytes: &[u8]) -> Result { - use prost::Message; let p = crate::proto::WshMetaData::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; Ok(WshMetadata { data_json: p.data_json.unwrap_or_default(), @@ -33,7 +34,6 @@ pub(crate) fn decode_wsh_metadata_proto(bytes: &[u8]) -> Result Result { - use prost::Message; let p = crate::proto::WshEventData::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; Ok(WshEventData { data_json: p.data_json.unwrap_or_default(), From 21619085f6f7bff44502c09da19a092b4d4208a8 Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Fri, 10 Apr 2026 22:04:47 -1000 Subject: [PATCH 3/9] add Error::ProtobufDecode variant, remove map_err boilerplate --- src/accounts/common/decoders.rs | 16 ++++++++-------- src/contracts/common/decoders/proto.rs | 2 +- src/display_groups/common/decoders.rs | 2 +- src/errors.rs | 13 +++++++++++++ src/market_data/historical/common/decoders.rs | 12 ++++++------ src/market_data/realtime/common/decoders.rs | 14 +++++++------- src/news/common/decoders.rs | 6 +++--- src/orders/common/decoders.rs | 10 +++++----- src/proto/decoders.rs | 2 +- src/scanner/common/decoders.rs | 2 +- src/wsh/common/decoders.rs | 4 ++-- 11 files changed, 48 insertions(+), 35 deletions(-) diff --git a/src/accounts/common/decoders.rs b/src/accounts/common/decoders.rs index f509c939..f451fff3 100644 --- a/src/accounts/common/decoders.rs +++ b/src/accounts/common/decoders.rs @@ -260,7 +260,7 @@ pub(crate) fn decode_account_multi_value(message: &mut ResponseMessage) -> Resul #[allow(dead_code)] pub(crate) fn decode_position_proto(bytes: &[u8]) -> Result { - let p = proto::Position::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p = proto::Position::decode(bytes)?; let contract = p.contract.as_ref().map(proto::decoders::decode_contract).unwrap_or_default(); Ok(Position { account: p.account.unwrap_or_default(), @@ -272,7 +272,7 @@ pub(crate) fn decode_position_proto(bytes: &[u8]) -> Result { #[allow(dead_code)] pub(crate) fn decode_account_value_proto(bytes: &[u8]) -> Result { - let p = proto::AccountValue::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p = proto::AccountValue::decode(bytes)?; Ok(AccountValue { key: p.key.unwrap_or_default(), value: p.value.unwrap_or_default(), @@ -283,7 +283,7 @@ pub(crate) fn decode_account_value_proto(bytes: &[u8]) -> Result Result { - let p = proto::PortfolioValue::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p = proto::PortfolioValue::decode(bytes)?; let contract = p.contract.as_ref().map(proto::decoders::decode_contract).unwrap_or_default(); Ok(AccountPortfolioValue { contract, @@ -299,7 +299,7 @@ pub(crate) fn decode_account_portfolio_value_proto(bytes: &[u8]) -> Result Result { - let p = proto::PnL::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p = proto::PnL::decode(bytes)?; Ok(PnL { daily_pnl: p.daily_pn_l.unwrap_or_default(), unrealized_pnl: p.unrealized_pn_l.filter(|&v| v != f64::MAX), @@ -309,7 +309,7 @@ pub(crate) fn decode_pnl_proto(bytes: &[u8]) -> Result { #[allow(dead_code)] pub(crate) fn decode_pnl_single_proto(bytes: &[u8]) -> Result { - let p = proto::PnLSingle::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p = proto::PnLSingle::decode(bytes)?; Ok(PnLSingle { position: p.position.as_deref().and_then(|s| s.parse::().ok()).unwrap_or_default(), daily_pnl: p.daily_pn_l.unwrap_or_default(), @@ -321,7 +321,7 @@ pub(crate) fn decode_pnl_single_proto(bytes: &[u8]) -> Result #[allow(dead_code)] pub(crate) fn decode_account_summary_proto(bytes: &[u8]) -> Result { - let p = proto::AccountSummary::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p = proto::AccountSummary::decode(bytes)?; Ok(AccountSummary { account: p.account.unwrap_or_default(), tag: p.tag.unwrap_or_default(), @@ -332,7 +332,7 @@ pub(crate) fn decode_account_summary_proto(bytes: &[u8]) -> Result Result { - let p = proto::PositionMulti::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p = proto::PositionMulti::decode(bytes)?; let contract = p.contract.as_ref().map(proto::decoders::decode_contract).unwrap_or_default(); Ok(PositionMulti { account: p.account.unwrap_or_default(), @@ -345,7 +345,7 @@ pub(crate) fn decode_position_multi_proto(bytes: &[u8]) -> Result Result { - let p = proto::AccountUpdateMulti::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p = proto::AccountUpdateMulti::decode(bytes)?; Ok(AccountMultiValue { account: p.account.unwrap_or_default(), model_code: p.model_code.unwrap_or_default(), diff --git a/src/contracts/common/decoders/proto.rs b/src/contracts/common/decoders/proto.rs index fadd72cd..29f4b152 100644 --- a/src/contracts/common/decoders/proto.rs +++ b/src/contracts/common/decoders/proto.rs @@ -5,7 +5,7 @@ use crate::Error; #[allow(dead_code)] pub(crate) fn decode_contract_data_proto(bytes: &[u8]) -> Result { - let p: crate::proto::ContractData = Message::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p: crate::proto::ContractData = Message::decode(bytes)?; let default_contract = crate::proto::Contract::default(); let default_details = crate::proto::ContractDetails::default(); let proto_contract = p.contract.as_ref().unwrap_or(&default_contract); diff --git a/src/display_groups/common/decoders.rs b/src/display_groups/common/decoders.rs index 7166e731..a33cb6a6 100644 --- a/src/display_groups/common/decoders.rs +++ b/src/display_groups/common/decoders.rs @@ -28,7 +28,7 @@ pub(crate) fn decode_display_group_updated(message: &mut ResponseMessage) -> Res #[allow(dead_code)] pub(crate) fn decode_display_group_updated_proto(bytes: &[u8]) -> Result { - let p = crate::proto::DisplayGroupUpdated::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p = crate::proto::DisplayGroupUpdated::decode(bytes)?; Ok(DisplayGroupUpdate::new(p.contract_info.unwrap_or_default())) } diff --git a/src/errors.rs b/src/errors.rs index a859c136..db1a7fa2 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -101,6 +101,10 @@ pub enum Error { /// Wraps errors parsing historical data parameters. #[error("HistoricalParseError: {0}")] HistoricalParseError(HistoricalParseError), + + /// Failed to decode a protobuf message. + #[error("protobuf decode error: {0}")] + ProtobufDecode(#[from] prost::DecodeError), } impl From for Error { @@ -230,6 +234,15 @@ mod tests { assert!(matches!(error, Error::Poison(_))); } + #[test] + fn test_from_protobuf_decode_error() { + let bad_bytes: &[u8] = &[0xff, 0xff]; + let decode_err = prost::Message::decode(bad_bytes).map(|_: crate::proto::TickPrice| ()).unwrap_err(); + let error: Error = decode_err.into(); + assert!(matches!(error, Error::ProtobufDecode(_))); + assert!(error.to_string().contains("protobuf decode error")); + } + #[test] fn test_non_exhaustive() { fn assert_non_exhaustive() {} diff --git a/src/market_data/historical/common/decoders.rs b/src/market_data/historical/common/decoders.rs index 0396c4df..a7d5ba3b 100644 --- a/src/market_data/historical/common/decoders.rs +++ b/src/market_data/historical/common/decoders.rs @@ -358,7 +358,7 @@ fn ts(secs: i64) -> OffsetDateTime { #[allow(dead_code)] pub(crate) fn decode_historical_data_proto(bytes: &[u8]) -> Result, Error> { - let msg = proto::HistoricalData::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let msg = proto::HistoricalData::decode(bytes)?; let bars = msg .historical_data_bars @@ -388,13 +388,13 @@ pub(crate) fn decode_historical_data_proto(bytes: &[u8]) -> Result, Err #[allow(dead_code)] pub(crate) fn decode_head_timestamp_proto(bytes: &[u8]) -> Result { - let msg = proto::HeadTimestamp::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let msg = proto::HeadTimestamp::decode(bytes)?; Ok(msg.head_timestamp.unwrap_or_default()) } #[allow(dead_code)] pub(crate) fn decode_real_time_bar_proto(bytes: &[u8]) -> Result { - let msg = proto::RealTimeBarTick::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let msg = proto::RealTimeBarTick::decode(bytes)?; Ok(crate::market_data::realtime::Bar { date: OffsetDateTime::from_unix_timestamp(msg.time.unwrap_or_default()).unwrap_or(OffsetDateTime::UNIX_EPOCH), @@ -410,7 +410,7 @@ pub(crate) fn decode_real_time_bar_proto(bytes: &[u8]) -> Result Result<(Vec, bool), Error> { - let msg = proto::HistoricalTicks::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let msg = proto::HistoricalTicks::decode(bytes)?; let ticks = msg .historical_ticks @@ -427,7 +427,7 @@ pub(crate) fn decode_historical_ticks_proto(bytes: &[u8]) -> Result<(Vec Result<(Vec, bool), Error> { - let msg = proto::HistoricalTicksLast::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let msg = proto::HistoricalTicksLast::decode(bytes)?; let ticks = msg .historical_ticks_last @@ -453,7 +453,7 @@ pub(crate) fn decode_historical_ticks_last_proto(bytes: &[u8]) -> Result<(Vec Result<(Vec, bool), Error> { - let msg = proto::HistoricalTicksBidAsk::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let msg = proto::HistoricalTicksBidAsk::decode(bytes)?; let ticks = msg .historical_ticks_bid_ask diff --git a/src/market_data/realtime/common/decoders.rs b/src/market_data/realtime/common/decoders.rs index 0f8c1643..f62fe0b5 100644 --- a/src/market_data/realtime/common/decoders.rs +++ b/src/market_data/realtime/common/decoders.rs @@ -246,7 +246,7 @@ pub(crate) fn decode_tick_request_parameters(message: &mut ResponseMessage) -> R #[allow(dead_code)] pub(crate) fn decode_tick_price_proto(bytes: &[u8]) -> Result { - let msg = crate::proto::TickPrice::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let msg = crate::proto::TickPrice::decode(bytes)?; let tick_type = TickType::from(msg.tick_type.unwrap_or_default()); let price = msg.price.unwrap_or_default(); @@ -288,7 +288,7 @@ pub(crate) fn decode_tick_price_proto(bytes: &[u8]) -> Result #[allow(dead_code)] pub(crate) fn decode_tick_size_proto(bytes: &[u8]) -> Result { - let msg = crate::proto::TickSize::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let msg = crate::proto::TickSize::decode(bytes)?; Ok(TickSize { tick_type: TickType::from(msg.tick_type.unwrap_or_default()), @@ -298,7 +298,7 @@ pub(crate) fn decode_tick_size_proto(bytes: &[u8]) -> Result { #[allow(dead_code)] pub(crate) fn decode_tick_string_proto(bytes: &[u8]) -> Result { - let msg = crate::proto::TickString::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let msg = crate::proto::TickString::decode(bytes)?; Ok(TickString { tick_type: TickType::from(msg.tick_type.unwrap_or_default()), @@ -308,7 +308,7 @@ pub(crate) fn decode_tick_string_proto(bytes: &[u8]) -> Result Result { - let msg = crate::proto::TickGeneric::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let msg = crate::proto::TickGeneric::decode(bytes)?; Ok(TickGeneric { tick_type: TickType::from(msg.tick_type.unwrap_or_default()), @@ -318,7 +318,7 @@ pub(crate) fn decode_tick_generic_proto(bytes: &[u8]) -> Result Result { - let msg = crate::proto::TickOptionComputation::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let msg = crate::proto::TickOptionComputation::decode(bytes)?; fn optional(val: Option) -> Option { val.filter(|&v| v != f64::MAX) @@ -340,7 +340,7 @@ pub(crate) fn decode_tick_option_computation_proto(bytes: &[u8]) -> Result Result { - let msg = crate::proto::MarketDepth::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let msg = crate::proto::MarketDepth::decode(bytes)?; let data = msg.market_depth_data.ok_or_else(|| Error::Simple("missing market_depth_data".into()))?; @@ -355,7 +355,7 @@ pub(crate) fn decode_market_depth_proto(bytes: &[u8]) -> Result Result { - let msg = crate::proto::MarketDepthL2::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let msg = crate::proto::MarketDepthL2::decode(bytes)?; let data = msg.market_depth_data.ok_or_else(|| Error::Simple("missing market_depth_data".into()))?; diff --git a/src/news/common/decoders.rs b/src/news/common/decoders.rs index 223c4538..d4d92312 100644 --- a/src/news/common/decoders.rs +++ b/src/news/common/decoders.rs @@ -100,7 +100,7 @@ fn parse_unix_timestamp(time: &str) -> Result { #[allow(dead_code)] pub(crate) fn decode_news_bulletin_proto(bytes: &[u8]) -> Result { - let p = crate::proto::NewsBulletin::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p = crate::proto::NewsBulletin::decode(bytes)?; Ok(NewsBulletin { message_id: p.news_msg_id.unwrap_or_default(), message_type: p.news_msg_type.unwrap_or_default(), @@ -111,7 +111,7 @@ pub(crate) fn decode_news_bulletin_proto(bytes: &[u8]) -> Result Result { - let p = crate::proto::NewsArticle::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p = crate::proto::NewsArticle::decode(bytes)?; Ok(NewsArticleBody { article_type: ArticleType::from(p.article_type.unwrap_or_default()), article_text: p.article_text.unwrap_or_default(), @@ -120,7 +120,7 @@ pub(crate) fn decode_news_article_proto(bytes: &[u8]) -> Result Result { - let p = crate::proto::HistoricalNews::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p = crate::proto::HistoricalNews::decode(bytes)?; let time = p .time diff --git a/src/orders/common/decoders.rs b/src/orders/common/decoders.rs index c9807483..16494479 100644 --- a/src/orders/common/decoders.rs +++ b/src/orders/common/decoders.rs @@ -1132,7 +1132,7 @@ fn decode_percent_change_condition(message: &mut ResponseMessage, is_conjunction #[allow(dead_code)] pub(crate) fn decode_open_order_proto(bytes: &[u8]) -> Result { - let p: crate::proto::OpenOrder = prost::Message::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p: crate::proto::OpenOrder = prost::Message::decode(bytes)?; let contract = p.contract.as_ref().map(crate::proto::decoders::decode_contract).unwrap_or_default(); let order = p.order.as_ref().map(crate::proto::decoders::decode_order).unwrap_or_default(); let order_state = p.order_state.as_ref().map(crate::proto::decoders::decode_order_state).unwrap_or_default(); @@ -1147,7 +1147,7 @@ pub(crate) fn decode_open_order_proto(bytes: &[u8]) -> Result #[allow(dead_code)] pub(crate) fn decode_order_status_proto(bytes: &[u8]) -> Result { - let p: crate::proto::OrderStatus = prost::Message::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p: crate::proto::OrderStatus = prost::Message::decode(bytes)?; Ok(OrderStatus { order_id: p.order_id.unwrap_or_default(), @@ -1166,7 +1166,7 @@ pub(crate) fn decode_order_status_proto(bytes: &[u8]) -> Result Result { - let p: crate::proto::ExecutionDetails = prost::Message::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p: crate::proto::ExecutionDetails = prost::Message::decode(bytes)?; Ok(ExecutionData { request_id: p.req_id.unwrap_or_default(), @@ -1177,7 +1177,7 @@ pub(crate) fn decode_execution_data_proto(bytes: &[u8]) -> Result Result { - let p: crate::proto::CompletedOrder = prost::Message::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p: crate::proto::CompletedOrder = prost::Message::decode(bytes)?; let contract = p.contract.as_ref().map(crate::proto::decoders::decode_contract).unwrap_or_default(); let order = p.order.as_ref().map(crate::proto::decoders::decode_order).unwrap_or_default(); let order_state = p.order_state.as_ref().map(crate::proto::decoders::decode_order_state).unwrap_or_default(); @@ -1192,7 +1192,7 @@ pub(crate) fn decode_completed_order_proto(bytes: &[u8]) -> Result Result { - let p: crate::proto::CommissionAndFeesReport = prost::Message::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p: crate::proto::CommissionAndFeesReport = prost::Message::decode(bytes)?; Ok(CommissionReport { execution_id: p.exec_id.unwrap_or_default(), diff --git a/src/proto/decoders.rs b/src/proto/decoders.rs index afede1f9..997c5a4c 100644 --- a/src/proto/decoders.rs +++ b/src/proto/decoders.rs @@ -532,7 +532,7 @@ pub fn decode_contract_details(proto_contract: &proto::Contract, proto_details: } pub fn decode_error_message(bytes: &[u8]) -> Result<(i32, i32, String, String), Error> { - let p = proto::ErrorMessage::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p = proto::ErrorMessage::decode(bytes)?; Ok(( p.id.unwrap_or_default(), p.error_code.unwrap_or_default(), diff --git a/src/scanner/common/decoders.rs b/src/scanner/common/decoders.rs index a691bcdd..f4bbe93f 100644 --- a/src/scanner/common/decoders.rs +++ b/src/scanner/common/decoders.rs @@ -63,7 +63,7 @@ pub(in crate::scanner) fn decode_scanner_data(mut message: ResponseMessage) -> R #[allow(dead_code)] pub(crate) fn decode_scanner_data_proto(bytes: &[u8]) -> Result, Error> { - let p = crate::proto::ScannerData::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p = crate::proto::ScannerData::decode(bytes)?; let mut results = Vec::with_capacity(p.scanner_data_element.len()); for elem in &p.scanner_data_element { diff --git a/src/wsh/common/decoders.rs b/src/wsh/common/decoders.rs index 847676b7..dd6e5868 100644 --- a/src/wsh/common/decoders.rs +++ b/src/wsh/common/decoders.rs @@ -26,7 +26,7 @@ pub(in crate::wsh) fn decode_wsh_event_data(mut message: ResponseMessage) -> Res #[allow(dead_code)] pub(crate) fn decode_wsh_metadata_proto(bytes: &[u8]) -> Result { - let p = crate::proto::WshMetaData::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p = crate::proto::WshMetaData::decode(bytes)?; Ok(WshMetadata { data_json: p.data_json.unwrap_or_default(), }) @@ -34,7 +34,7 @@ pub(crate) fn decode_wsh_metadata_proto(bytes: &[u8]) -> Result Result { - let p = crate::proto::WshEventData::decode(bytes).map_err(|e| Error::Simple(format!("protobuf decode error: {e}")))?; + let p = crate::proto::WshEventData::decode(bytes)?; Ok(WshEventData { data_json: p.data_json.unwrap_or_default(), }) From 6ee990993f0df91a9c44594376ce236e9e6249cc Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Fri, 10 Apr 2026 22:06:31 -1000 Subject: [PATCH 4/9] document protobuf decoder module structure in CLAUDE.md --- CLAUDE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CLAUDE.md b/CLAUDE.md index b336a500..92820bb0 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -45,7 +45,7 @@ Changes to both branches should be made via pull requests. 1. **Be explicit about feature coverage**: Default async, sync-only, and combined builds must compile when touched 2. **Test each configuration**: Run tests for default, sync-only, and `--all-features` -3. **Follow module structure**: Client methods live as `impl Client` blocks in domain modules (e.g., `accounts/sync.rs`), not in `client/sync.rs` or `client/async.rs`. Use `common/` for shared logic between sync/async +3. **Follow module structure**: Client methods live as `impl Client` blocks in domain modules (e.g., `accounts/sync.rs`), not in `client/sync.rs` or `client/async.rs`. Use `common/` for shared logic between sync/async. Protobuf decoders live in each domain's `common/decoders.rs`; shared proto→domain converters live in `proto/decoders.rs` 4. **Minimal comments**: Keep comments concise, avoid stating the obvious 5. **Run quality checks**: Before committing, run `cargo fmt`, `cargo clippy --all-targets -- -D warnings`, `cargo clippy --all-targets --features sync -- -D warnings`, and `cargo clippy --all-features` 6. **Fluent conditional orders**: Use helper functions (`price()`, `time()`, `margin()`, etc.) and method chaining (`.condition()`, `.and_condition()`, `.or_condition()`) for building conditional orders. See [docs/order-types.md](docs/order-types.md#conditional-orders-with-conditions) and [docs/api-patterns.md](docs/api-patterns.md#conditional-order-builder-pattern) for details From 69005a662f7fc24ba9303c71db89d3ecc66f167b Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Fri, 10 Apr 2026 22:14:16 -1000 Subject: [PATCH 5/9] deduplicate proto decoder helpers, extract tag_values converter --- src/accounts/common/decoders.rs | 13 ++-- src/market_data/historical/common/decoders.rs | 16 +---- src/market_data/realtime/common/decoders.rs | 21 +++---- src/proto/decoders.rs | 61 ++++++++----------- 4 files changed, 42 insertions(+), 69 deletions(-) diff --git a/src/accounts/common/decoders.rs b/src/accounts/common/decoders.rs index f451fff3..db906714 100644 --- a/src/accounts/common/decoders.rs +++ b/src/accounts/common/decoders.rs @@ -4,6 +4,7 @@ use prost::Message; use crate::contracts::{Contract, Currency, Exchange, SecurityType, Symbol}; use crate::messages::ResponseMessage; +use crate::proto::decoders::parse_f64 as parse_str_f64; use crate::{proto, server_versions, Error}; use super::super::{ @@ -265,7 +266,7 @@ pub(crate) fn decode_position_proto(bytes: &[u8]) -> Result { Ok(Position { account: p.account.unwrap_or_default(), contract, - position: p.position.as_deref().and_then(|s| s.parse::().ok()).unwrap_or_default(), + position: parse_str_f64(&p.position), average_cost: p.avg_cost.unwrap_or_default(), }) } @@ -287,7 +288,7 @@ pub(crate) fn decode_account_portfolio_value_proto(bytes: &[u8]) -> Result().ok()).unwrap_or_default(), + position: parse_str_f64(&p.position), market_price: p.market_price.unwrap_or_default(), market_value: p.market_value.unwrap_or_default(), average_cost: p.average_cost.unwrap_or_default(), @@ -302,8 +303,8 @@ pub(crate) fn decode_pnl_proto(bytes: &[u8]) -> Result { let p = proto::PnL::decode(bytes)?; Ok(PnL { daily_pnl: p.daily_pn_l.unwrap_or_default(), - unrealized_pnl: p.unrealized_pn_l.filter(|&v| v != f64::MAX), - realized_pnl: p.realized_pn_l.filter(|&v| v != f64::MAX), + unrealized_pnl: proto::decoders::optional_f64(p.unrealized_pn_l), + realized_pnl: proto::decoders::optional_f64(p.realized_pn_l), }) } @@ -311,7 +312,7 @@ pub(crate) fn decode_pnl_proto(bytes: &[u8]) -> Result { pub(crate) fn decode_pnl_single_proto(bytes: &[u8]) -> Result { let p = proto::PnLSingle::decode(bytes)?; Ok(PnLSingle { - position: p.position.as_deref().and_then(|s| s.parse::().ok()).unwrap_or_default(), + position: parse_str_f64(&p.position), daily_pnl: p.daily_pn_l.unwrap_or_default(), unrealized_pnl: p.unrealized_pn_l.unwrap_or_default(), realized_pnl: p.realized_pn_l.unwrap_or_default(), @@ -337,7 +338,7 @@ pub(crate) fn decode_position_multi_proto(bytes: &[u8]) -> Result().ok()).unwrap_or_default(), + position: parse_str_f64(&p.position), average_cost: p.avg_cost.unwrap_or_default(), model_code: p.model_code.unwrap_or_default(), }) diff --git a/src/market_data/historical/common/decoders.rs b/src/market_data/historical/common/decoders.rs index a7d5ba3b..4d621a77 100644 --- a/src/market_data/historical/common/decoders.rs +++ b/src/market_data/historical/common/decoders.rs @@ -340,21 +340,7 @@ fn parse_bar_date(text: &str, time_zone: &Tz) -> Result { use prost::Message; use crate::proto; - -#[allow(dead_code)] -fn parse_str_f64(opt: &Option) -> f64 { - opt.as_deref().and_then(|s| s.parse::().ok()).unwrap_or_default() -} - -#[allow(dead_code)] -fn parse_str_i32(opt: &Option) -> i32 { - opt.as_deref().and_then(|s| s.parse::().ok()).unwrap_or_default() -} - -#[allow(dead_code)] -fn ts(secs: i64) -> OffsetDateTime { - OffsetDateTime::from_unix_timestamp(secs).unwrap_or(OffsetDateTime::UNIX_EPOCH) -} +use crate::proto::decoders::{parse_f64 as parse_str_f64, parse_i32 as parse_str_i32, ts}; #[allow(dead_code)] pub(crate) fn decode_historical_data_proto(bytes: &[u8]) -> Result, Error> { diff --git a/src/market_data/realtime/common/decoders.rs b/src/market_data/realtime/common/decoders.rs index f62fe0b5..fa941230 100644 --- a/src/market_data/realtime/common/decoders.rs +++ b/src/market_data/realtime/common/decoders.rs @@ -2,6 +2,7 @@ use prost::Message; use crate::contracts::decode_option_computation; use crate::contracts::OptionComputation; +use crate::proto::decoders::optional_f64; use crate::subscriptions::DecoderContext; use crate::Error; use crate::{messages::ResponseMessage, server_versions}; @@ -320,21 +321,17 @@ pub(crate) fn decode_tick_generic_proto(bytes: &[u8]) -> Result Result { let msg = crate::proto::TickOptionComputation::decode(bytes)?; - fn optional(val: Option) -> Option { - val.filter(|&v| v != f64::MAX) - } - Ok(OptionComputation { field: TickType::from(msg.tick_type.unwrap_or_default()), tick_attribute: msg.tick_attrib, - implied_volatility: optional(msg.implied_vol), - delta: optional(msg.delta), - option_price: optional(msg.opt_price), - present_value_dividend: optional(msg.pv_dividend), - gamma: optional(msg.gamma), - vega: optional(msg.vega), - theta: optional(msg.theta), - underlying_price: optional(msg.und_price), + implied_volatility: optional_f64(msg.implied_vol), + delta: optional_f64(msg.delta), + option_price: optional_f64(msg.opt_price), + present_value_dividend: optional_f64(msg.pv_dividend), + gamma: optional_f64(msg.gamma), + vega: optional_f64(msg.vega), + theta: optional_f64(msg.theta), + underlying_price: optional_f64(msg.und_price), }) } diff --git a/src/proto/decoders.rs b/src/proto/decoders.rs index 997c5a4c..7a6bd3e7 100644 --- a/src/proto/decoders.rs +++ b/src/proto/decoders.rs @@ -14,24 +14,41 @@ use crate::Error; // === Helper functions === -fn s(opt: &Option) -> String { +pub(crate) fn s(opt: &Option) -> String { opt.clone().unwrap_or_default() } -fn parse_f64(opt: &Option) -> f64 { +pub(crate) fn parse_f64(opt: &Option) -> f64 { opt.as_deref().and_then(|s| s.parse::().ok()).unwrap_or_default() } -fn optional_f64(val: Option) -> Option { +pub(crate) fn parse_i32(opt: &Option) -> i32 { + opt.as_deref().and_then(|s| s.parse::().ok()).unwrap_or_default() +} + +pub(crate) fn optional_f64(val: Option) -> Option { val.filter(|&v| v != f64::MAX) } -fn optional_string_f64(opt: &Option) -> Option { +pub(crate) fn optional_string_f64(opt: &Option) -> Option { opt.as_deref() .and_then(|s| s.parse::().ok()) .and_then(|v| if v == f64::MAX { None } else { Some(v) }) } +pub(crate) fn ts(secs: i64) -> time::OffsetDateTime { + time::OffsetDateTime::from_unix_timestamp(secs).unwrap_or(time::OffsetDateTime::UNIX_EPOCH) +} + +pub(crate) fn tag_values(map: &std::collections::HashMap) -> Vec { + map.iter() + .map(|(k, v)| TagValue { + tag: k.clone(), + value: v.clone(), + }) + .collect() +} + // === Shared converters === pub fn decode_contract(proto: &proto::Contract) -> Contract { @@ -170,25 +187,11 @@ pub fn decode_order(proto: &proto::Order) -> Order { // algo orders order.algo_strategy = s(&proto.algo_strategy); - order.algo_params = proto - .algo_params - .iter() - .map(|(k, v)| TagValue { - tag: k.clone(), - value: v.clone(), - }) - .collect(); + order.algo_params = tag_values(&proto.algo_params); order.algo_id = s(&proto.algo_id); // combo orders - order.smart_combo_routing_params = proto - .smart_combo_routing_params - .iter() - .map(|(k, v)| TagValue { - tag: k.clone(), - value: v.clone(), - }) - .collect(); + order.smart_combo_routing_params = tag_values(&proto.smart_combo_routing_params); // processing control order.what_if = proto.what_if.unwrap_or_default(); @@ -222,14 +225,7 @@ pub fn decode_order(proto: &proto::Order) -> Order { order.not_held = proto.not_held.unwrap_or_default(); // order misc options - order.order_misc_options = proto - .order_misc_options - .iter() - .map(|(k, v)| TagValue { - tag: k.clone(), - value: v.clone(), - }) - .collect(); + order.order_misc_options = tag_values(&proto.order_misc_options); // solicited / randomize order.solicited = proto.solicited.unwrap_or_default(); @@ -461,14 +457,7 @@ pub fn decode_contract_details(proto_contract: &proto::Contract, proto_details: ev_rule: s(&proto_details.ev_rule), ev_multiplier: proto_details.ev_multiplier.unwrap_or_default(), agg_group: proto_details.agg_group.unwrap_or_default(), - sec_id_list: proto_details - .sec_id_list - .iter() - .map(|(k, v)| TagValue { - tag: k.clone(), - value: v.clone(), - }) - .collect(), + sec_id_list: tag_values(&proto_details.sec_id_list), under_symbol: s(&proto_details.under_symbol), under_security_type: s(&proto_details.under_sec_type), market_rule_ids: proto_details From be0bad9be14f010021b9b3d27c57487ba05c046f Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Fri, 10 Apr 2026 22:41:15 -1000 Subject: [PATCH 6/9] use shared helpers optional_f64/parse_f64 in order proto decoders --- src/orders/common/decoders.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/orders/common/decoders.rs b/src/orders/common/decoders.rs index 16494479..c731bbc6 100644 --- a/src/orders/common/decoders.rs +++ b/src/orders/common/decoders.rs @@ -1152,8 +1152,8 @@ pub(crate) fn decode_order_status_proto(bytes: &[u8]) -> Result Result Date: Fri, 10 Apr 2026 22:52:37 -1000 Subject: [PATCH 7/9] add tests for all remaining proto decoders (20 new tests) --- src/accounts/common/decoders.rs | 148 ++++++++++++++++++ src/display_groups/common/decoders.rs | 16 ++ src/market_data/historical/common/decoders.rs | 113 +++++++++++++ src/market_data/realtime/common/decoders.rs | 106 +++++++++++++ src/news/common/decoders.rs | 40 +++++ src/orders/common/decoders.rs | 74 +++++++++ src/scanner/common/decoders.rs | 53 +++++++ src/wsh/common/decoders.rs | 14 ++ 8 files changed, 564 insertions(+) diff --git a/src/accounts/common/decoders.rs b/src/accounts/common/decoders.rs index db906714..7e4c0c06 100644 --- a/src/accounts/common/decoders.rs +++ b/src/accounts/common/decoders.rs @@ -1277,4 +1277,152 @@ mod tests { assert_eq!(result.unrealized_pnl, Some(500.0)); assert_eq!(result.realized_pnl, None); // f64::MAX filtered out } + + #[test] + fn test_decode_account_value_proto() { + use prost::Message; + + let proto_msg = crate::proto::AccountValue { + key: Some("NetLiquidation".into()), + value: Some("100000".into()), + currency: Some("USD".into()), + account_name: Some("DU1234".into()), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = super::decode_account_value_proto(&bytes).unwrap(); + assert_eq!(result.key, "NetLiquidation"); + assert_eq!(result.value, "100000"); + assert_eq!(result.currency, "USD"); + assert_eq!(result.account, Some("DU1234".into())); + } + + #[test] + fn test_decode_account_portfolio_value_proto() { + use prost::Message; + + let proto_msg = crate::proto::PortfolioValue { + contract: Some(crate::proto::Contract { + con_id: Some(265598), + symbol: Some("AAPL".into()), + ..Default::default() + }), + position: Some("100".into()), + market_price: Some(150.0), + market_value: Some(15000.0), + average_cost: Some(145.0), + unrealized_pnl: Some(500.0), + realized_pnl: Some(0.0), + account_name: Some("DU1234".into()), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = super::decode_account_portfolio_value_proto(&bytes).unwrap(); + assert_eq!(result.contract.contract_id, 265598); + assert_eq!(result.position, 100.0); + assert_eq!(result.market_price, 150.0); + assert_eq!(result.account, Some("DU1234".into())); + } + + #[test] + fn test_decode_pnl_single_proto() { + use prost::Message; + + let proto_msg = crate::proto::PnLSingle { + req_id: Some(1), + position: Some("500".into()), + daily_pn_l: Some(1000.0), + unrealized_pn_l: Some(2000.0), + realized_pn_l: Some(500.0), + value: Some(75000.0), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = super::decode_pnl_single_proto(&bytes).unwrap(); + assert_eq!(result.position, 500.0); + assert_eq!(result.daily_pnl, 1000.0); + assert_eq!(result.unrealized_pnl, 2000.0); + assert_eq!(result.realized_pnl, 500.0); + assert_eq!(result.value, 75000.0); + } + + #[test] + fn test_decode_account_summary_proto() { + use prost::Message; + + let proto_msg = crate::proto::AccountSummary { + req_id: Some(1), + account: Some("DU1234".into()), + tag: Some("NetLiquidation".into()), + value: Some("100000".into()), + currency: Some("USD".into()), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = super::decode_account_summary_proto(&bytes).unwrap(); + assert_eq!(result.account, "DU1234"); + assert_eq!(result.tag, "NetLiquidation"); + assert_eq!(result.value, "100000"); + assert_eq!(result.currency, "USD"); + } + + #[test] + fn test_decode_position_multi_proto() { + use prost::Message; + + let proto_msg = crate::proto::PositionMulti { + req_id: Some(1), + account: Some("DU1234".into()), + contract: Some(crate::proto::Contract { + con_id: Some(265598), + symbol: Some("AAPL".into()), + ..Default::default() + }), + position: Some("50".into()), + avg_cost: Some(148.5), + model_code: Some("Tech".into()), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = super::decode_position_multi_proto(&bytes).unwrap(); + assert_eq!(result.account, "DU1234"); + assert_eq!(result.contract.contract_id, 265598); + assert_eq!(result.position, 50.0); + assert_eq!(result.average_cost, 148.5); + assert_eq!(result.model_code, "Tech"); + } + + #[test] + fn test_decode_account_multi_value_proto() { + use prost::Message; + + let proto_msg = crate::proto::AccountUpdateMulti { + req_id: Some(1), + account: Some("DU1234".into()), + model_code: Some("Tech".into()), + key: Some("NetLiquidation".into()), + value: Some("100000".into()), + currency: Some("USD".into()), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = super::decode_account_multi_value_proto(&bytes).unwrap(); + assert_eq!(result.account, "DU1234"); + assert_eq!(result.model_code, "Tech"); + assert_eq!(result.key, "NetLiquidation"); + assert_eq!(result.value, "100000"); + assert_eq!(result.currency, "USD"); + } } diff --git a/src/display_groups/common/decoders.rs b/src/display_groups/common/decoders.rs index a33cb6a6..aa3887ee 100644 --- a/src/display_groups/common/decoders.rs +++ b/src/display_groups/common/decoders.rs @@ -72,4 +72,20 @@ mod tests { let err_msg = format!("{:?}", result.unwrap_err()); assert!(err_msg.contains("unexpected message type")); } + + #[test] + fn test_decode_display_group_updated_proto() { + use prost::Message; + + let proto_msg = crate::proto::DisplayGroupUpdated { + req_id: Some(1), + contract_info: Some("265598@SMART".into()), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_display_group_updated_proto(&bytes).unwrap(); + assert_eq!(result.contract_info, "265598@SMART"); + } } diff --git a/src/market_data/historical/common/decoders.rs b/src/market_data/historical/common/decoders.rs index 4d621a77..11f5527a 100644 --- a/src/market_data/historical/common/decoders.rs +++ b/src/market_data/historical/common/decoders.rs @@ -809,4 +809,117 @@ mod tests { assert_eq!(ticks[1].size, 50); assert_eq!(ticks[1].exchange, "FINRA"); } + + #[test] + fn test_decode_head_timestamp_proto() { + use prost::Message; + + let proto_msg = crate::proto::HeadTimestamp { + req_id: Some(1), + head_timestamp: Some("1609459200".into()), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_head_timestamp_proto(&bytes).unwrap(); + assert_eq!(result, "1609459200"); + } + + #[test] + fn test_decode_real_time_bar_proto() { + use prost::Message; + + let proto_msg = crate::proto::RealTimeBarTick { + req_id: Some(1), + time: Some(1681133400), + open: Some(185.5), + high: Some(186.0), + low: Some(185.0), + close: Some(185.75), + volume: Some("1000".into()), + wap: Some("185.625".into()), + count: Some(150), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_real_time_bar_proto(&bytes).unwrap(); + assert_eq!(result.date, datetime!(2023-04-10 13:30:00 UTC)); + assert_eq!(result.open, 185.5); + assert_eq!(result.high, 186.0); + assert_eq!(result.low, 185.0); + assert_eq!(result.close, 185.75); + assert_eq!(result.volume, 1000.0); + assert_eq!(result.wap, 185.625); + assert_eq!(result.count, 150); + } + + #[test] + fn test_decode_historical_ticks_proto() { + use prost::Message; + + let proto_msg = crate::proto::HistoricalTicks { + req_id: Some(1), + historical_ticks: vec![ + crate::proto::HistoricalTick { + time: Some(1681133400), + price: Some(150.0), + size: Some("100".into()), + }, + crate::proto::HistoricalTick { + time: Some(1681133401), + price: Some(150.5), + size: Some("200".into()), + }, + ], + is_done: Some(false), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let (ticks, done) = decode_historical_ticks_proto(&bytes).unwrap(); + assert_eq!(ticks.len(), 2); + assert!(!done); + assert_eq!(ticks[0].timestamp, datetime!(2023-04-10 13:30:00 UTC)); + assert_eq!(ticks[0].price, 150.0); + assert_eq!(ticks[0].size, 100); + } + + #[test] + fn test_decode_historical_ticks_bid_ask_proto() { + use prost::Message; + + let proto_msg = crate::proto::HistoricalTicksBidAsk { + req_id: Some(1), + historical_ticks_bid_ask: vec![crate::proto::HistoricalTickBidAsk { + time: Some(1681133400), + tick_attrib_bid_ask: Some(crate::proto::TickAttribBidAsk { + bid_past_low: Some(true), + ask_past_high: Some(false), + }), + price_bid: Some(149.0), + price_ask: Some(151.0), + size_bid: Some("100".into()), + size_ask: Some("200".into()), + }], + is_done: Some(true), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let (ticks, done) = decode_historical_ticks_bid_ask_proto(&bytes).unwrap(); + assert_eq!(ticks.len(), 1); + assert!(done); + assert_eq!(ticks[0].timestamp, datetime!(2023-04-10 13:30:00 UTC)); + assert!(ticks[0].tick_attribute_bid_ask.bid_past_low); + assert!(!ticks[0].tick_attribute_bid_ask.ask_past_high); + assert_eq!(ticks[0].price_bid, 149.0); + assert_eq!(ticks[0].price_ask, 151.0); + assert_eq!(ticks[0].size_bid, 100); + assert_eq!(ticks[0].size_ask, 200); + } } diff --git a/src/market_data/realtime/common/decoders.rs b/src/market_data/realtime/common/decoders.rs index fa941230..a988339f 100644 --- a/src/market_data/realtime/common/decoders.rs +++ b/src/market_data/realtime/common/decoders.rs @@ -949,5 +949,111 @@ mod tests { assert_eq!(result.price, 149.50); assert_eq!(result.size, 200.0); } + + #[test] + fn test_decode_tick_string_proto() { + let proto_msg = crate::proto::TickString { + req_id: Some(1), + tick_type: Some(45), // LastTimestamp + value: Some("1681133400".into()), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_tick_string_proto(&bytes).unwrap(); + assert_eq!(result.tick_type, TickType::LastTimestamp); + assert_eq!(result.value, "1681133400"); + } + + #[test] + fn test_decode_tick_generic_proto() { + let proto_msg = crate::proto::TickGeneric { + req_id: Some(1), + tick_type: Some(49), // Halted + value: Some(0.0), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_tick_generic_proto(&bytes).unwrap(); + assert_eq!(result.tick_type, TickType::Halted); + assert_eq!(result.value, 0.0); + } + + #[test] + fn test_decode_tick_option_computation_proto() { + let proto_msg = crate::proto::TickOptionComputation { + req_id: Some(1), + tick_type: Some(13), // ModelOption + tick_attrib: Some(1), + implied_vol: Some(0.25), + delta: Some(0.5), + opt_price: Some(5.0), + pv_dividend: Some(0.1), + gamma: Some(0.03), + vega: Some(0.15), + theta: Some(-0.05), + und_price: Some(150.0), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_tick_option_computation_proto(&bytes).unwrap(); + assert_eq!(result.field, TickType::ModelOption); + assert_eq!(result.tick_attribute, Some(1)); + assert_eq!(result.implied_volatility, Some(0.25)); + assert_eq!(result.delta, Some(0.5)); + assert_eq!(result.option_price, Some(5.0)); + assert_eq!(result.present_value_dividend, Some(0.1)); + assert_eq!(result.gamma, Some(0.03)); + assert_eq!(result.vega, Some(0.15)); + assert_eq!(result.theta, Some(-0.05)); + assert_eq!(result.underlying_price, Some(150.0)); + + // f64::MAX should become None + let proto_msg2 = crate::proto::TickOptionComputation { + req_id: Some(1), + tick_type: Some(13), + implied_vol: Some(f64::MAX), + ..Default::default() + }; + + let mut bytes2 = Vec::new(); + proto_msg2.encode(&mut bytes2).unwrap(); + + let result2 = decode_tick_option_computation_proto(&bytes2).unwrap(); + assert_eq!(result2.implied_volatility, None); + } + + #[test] + fn test_decode_market_depth_l2_proto() { + let proto_msg = crate::proto::MarketDepthL2 { + req_id: Some(1), + market_depth_data: Some(crate::proto::MarketDepthData { + position: Some(2), + operation: Some(1), + side: Some(0), + price: Some(151.25), + size: Some("300".into()), + market_maker: Some("GSCO".into()), + is_smart_depth: Some(true), + }), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_market_depth_l2_proto(&bytes).unwrap(); + assert_eq!(result.position, 2); + assert_eq!(result.operation, 1); + assert_eq!(result.side, 0); + assert_eq!(result.price, 151.25); + assert_eq!(result.size, 300.0); + assert_eq!(result.market_maker, "GSCO"); + assert!(result.smart_depth); + } } } diff --git a/src/news/common/decoders.rs b/src/news/common/decoders.rs index d4d92312..aa4b3ff7 100644 --- a/src/news/common/decoders.rs +++ b/src/news/common/decoders.rs @@ -183,4 +183,44 @@ mod tests { assert_eq!(result.message, "Market closed early"); assert_eq!(result.exchange, "NYSE"); } + + #[test] + fn test_decode_news_article_proto() { + use prost::Message; + + let proto_msg = crate::proto::NewsArticle { + req_id: Some(1), + article_type: Some(0), + article_text: Some("Full article text here".into()), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_news_article_proto(&bytes).unwrap(); + assert_eq!(result.article_type, ArticleType::Text); + assert_eq!(result.article_text, "Full article text here"); + } + + #[test] + fn test_decode_historical_news_proto() { + use prost::Message; + + let proto_msg = crate::proto::HistoricalNews { + req_id: Some(1), + time: Some("2023-04-10 13:30:00.000".into()), + provider_code: Some("BRFG".into()), + article_id: Some("BRFG$12345".into()), + headline: Some("Market Update".into()), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_historical_news_proto(&bytes).unwrap(); + assert_eq!(result.provider_code, "BRFG"); + assert_eq!(result.article_id, "BRFG$12345"); + assert_eq!(result.headline, "Market Update"); + assert_ne!(result.time, OffsetDateTime::UNIX_EPOCH); + } } diff --git a/src/orders/common/decoders.rs b/src/orders/common/decoders.rs index c731bbc6..9bc4e736 100644 --- a/src/orders/common/decoders.rs +++ b/src/orders/common/decoders.rs @@ -2494,4 +2494,78 @@ mod tests { assert_eq!(result.yields, None); // f64::MAX filtered out assert_eq!(result.yield_redemption_date, "20260101"); } + + #[test] + fn test_decode_execution_data_proto() { + use prost::Message; + + let proto_msg = crate::proto::ExecutionDetails { + req_id: Some(42), + contract: Some(crate::proto::Contract { + con_id: Some(265598), + symbol: Some("AAPL".into()), + sec_type: Some("STK".into()), + ..Default::default() + }), + execution: Some(crate::proto::Execution { + order_id: Some(100), + exec_id: Some("exec001".into()), + time: Some("20260101 12:00:00".into()), + acct_number: Some("DU1234".into()), + side: Some("BOT".into()), + shares: Some("50".into()), + price: Some(152.5), + perm_id: Some(99999), + ..Default::default() + }), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_execution_data_proto(&bytes).unwrap(); + assert_eq!(result.request_id, 42); + assert_eq!(result.contract.contract_id, 265598); + assert_eq!(result.execution.execution_id, "exec001"); + assert_eq!(result.execution.shares, 50.0); + assert_eq!(result.execution.price, 152.5); + assert_eq!(result.execution.perm_id, 99999); + } + + #[test] + fn test_decode_completed_order_proto() { + use prost::Message; + + let proto_msg = crate::proto::CompletedOrder { + contract: Some(crate::proto::Contract { + con_id: Some(265598), + symbol: Some("AAPL".into()), + sec_type: Some("STK".into()), + ..Default::default() + }), + order: Some(crate::proto::Order { + order_id: Some(200), + action: Some("SELL".into()), + total_quantity: Some("200".into()), + order_type: Some("MKT".into()), + ..Default::default() + }), + order_state: Some(crate::proto::OrderState { + status: Some("Filled".into()), + completed_time: Some("20260101 12:00:00".into()), + completed_status: Some("Filled".into()), + ..Default::default() + }), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_completed_order_proto(&bytes).unwrap(); + assert_eq!(result.order_id, 200); + assert_eq!(result.contract.contract_id, 265598); + assert_eq!(result.contract.symbol, Symbol::from("AAPL")); + assert_eq!(result.order.action, Action::Sell); + assert_eq!(result.order_state.completed_time, "20260101 12:00:00"); + } } diff --git a/src/scanner/common/decoders.rs b/src/scanner/common/decoders.rs index f4bbe93f..740dbeec 100644 --- a/src/scanner/common/decoders.rs +++ b/src/scanner/common/decoders.rs @@ -84,3 +84,56 @@ pub(crate) fn decode_scanner_data_proto(bytes: &[u8]) -> Result Ok(results) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_decode_scanner_data_proto() { + use prost::Message; + + let proto_msg = crate::proto::ScannerData { + req_id: Some(1), + scanner_data_element: vec![ + crate::proto::ScannerDataElement { + rank: Some(0), + contract: Some(crate::proto::Contract { + con_id: Some(265598), + symbol: Some("AAPL".into()), + sec_type: Some("STK".into()), + ..Default::default() + }), + market_name: Some("NMS".into()), + distance: Some("1.5".into()), + benchmark: Some("".into()), + projection: Some("".into()), + combo_key: Some("".into()), + }, + crate::proto::ScannerDataElement { + rank: Some(1), + contract: Some(crate::proto::Contract { + con_id: Some(76792991), + symbol: Some("TSLA".into()), + sec_type: Some("STK".into()), + ..Default::default() + }), + market_name: Some("NMS".into()), + distance: None, + benchmark: None, + projection: None, + combo_key: None, + }, + ], + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let results = decode_scanner_data_proto(&bytes).unwrap(); + assert_eq!(results.len(), 2); + assert_eq!(results[0].rank, 0); + assert_eq!(results[0].contract_details.contract.contract_id, 265598); + assert_eq!(results[0].contract_details.market_name, "NMS"); + } +} diff --git a/src/wsh/common/decoders.rs b/src/wsh/common/decoders.rs index dd6e5868..e1079beb 100644 --- a/src/wsh/common/decoders.rs +++ b/src/wsh/common/decoders.rs @@ -67,4 +67,18 @@ mod tests { let result = decode_wsh_metadata_proto(&bytes).unwrap(); assert_eq!(result.data_json, r#"{"key":"value"}"#); } + + #[test] + fn test_decode_wsh_event_data_proto() { + let proto_msg = crate::proto::WshEventData { + req_id: Some(1), + data_json: Some(r#"{"event":"earnings"}"#.into()), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_wsh_event_data_proto(&bytes).unwrap(); + assert_eq!(result.data_json, r#"{"event":"earnings"}"#); + } } From 4578414a7086697f965403a3bc0b0504ae54b792 Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Fri, 10 Apr 2026 22:53:51 -1000 Subject: [PATCH 8/9] add test coverage requirement to CLAUDE.md --- CLAUDE.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CLAUDE.md b/CLAUDE.md index 92820bb0..394ec6f4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -53,6 +53,7 @@ Changes to both branches should be made via pull requests. 8. **Single responsibility**: One responsibility per function/module; split orchestration from business logic 9. **Composition**: Single responsibility per struct; use builders for complex construction; max 3 params per function (use builder if 4+) 10. **Never use `block_on` in async code**: Do not use `futures::executor::block_on()` inside async contexts — it blocks tokio worker threads and risks deadlocks. Use atomics (`AtomicI32`, etc.) for lock-free access to rarely-written values, or make the function `async` and `.await` the lock +11. **Every new function needs a test**: Before opening a PR, verify every new `pub`/`pub(crate)` function has a corresponding unit test. Review test coverage as a final step — missing tests should block the PR See [docs/code-style.md](docs/code-style.md#design-principles) for detailed design guidelines. From 81049dffd63def087b48a968f791df89d5084272 Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Fri, 10 Apr 2026 23:13:06 -1000 Subject: [PATCH 9/9] collapse contracts decoders/ directory back to single decoders.rs --- .../common/{decoders/text.rs => decoders.rs} | 61 +++++++++++++++++++ src/contracts/common/decoders/mod.rs | 4 -- src/contracts/common/decoders/proto.rs | 61 ------------------- 3 files changed, 61 insertions(+), 65 deletions(-) rename src/contracts/common/{decoders/text.rs => decoders.rs} (93%) delete mode 100644 src/contracts/common/decoders/mod.rs delete mode 100644 src/contracts/common/decoders/proto.rs diff --git a/src/contracts/common/decoders/text.rs b/src/contracts/common/decoders.rs similarity index 93% rename from src/contracts/common/decoders/text.rs rename to src/contracts/common/decoders.rs index 322f1c3b..7e3f304e 100644 --- a/src/contracts/common/decoders/text.rs +++ b/src/contracts/common/decoders.rs @@ -749,3 +749,64 @@ mod tests { assert!(cd.ineligibility_reasons.is_empty()); } } + +// === Protobuf decoders === + +use prost::Message; + +#[allow(dead_code)] +pub(crate) fn decode_contract_data_proto(bytes: &[u8]) -> Result { + let p: crate::proto::ContractData = Message::decode(bytes)?; + let default_contract = crate::proto::Contract::default(); + let default_details = crate::proto::ContractDetails::default(); + let proto_contract = p.contract.as_ref().unwrap_or(&default_contract); + let proto_details = p.contract_details.as_ref().unwrap_or(&default_details); + Ok(crate::proto::decoders::decode_contract_details(proto_contract, proto_details)) +} + +#[cfg(test)] +mod proto_tests { + use super::*; + use prost::Message; + + #[test] + fn test_decode_contract_data_proto() { + let proto_msg = crate::proto::ContractData { + req_id: Some(1), + contract: Some(crate::proto::Contract { + con_id: Some(265598), + symbol: Some("AAPL".into()), + sec_type: Some("STK".into()), + exchange: Some("SMART".into()), + currency: Some("USD".into()), + local_symbol: Some("AAPL".into()), + trading_class: Some("NMS".into()), + ..Default::default() + }), + contract_details: Some(crate::proto::ContractDetails { + market_name: Some("NMS".into()), + min_tick: Some("0.01".into()), + long_name: Some("APPLE INC".into()), + industry: Some("Technology".into()), + category: Some("Computers".into()), + subcategory: Some("Consumer Electronics".into()), + ..Default::default() + }), + }; + + let mut bytes = Vec::new(); + proto_msg.encode(&mut bytes).unwrap(); + + let result = decode_contract_data_proto(&bytes).unwrap(); + assert_eq!(result.contract.contract_id, 265598); + assert_eq!(result.contract.symbol.to_string(), "AAPL"); + assert_eq!(result.contract.currency.to_string(), "USD"); + assert_eq!(result.contract.local_symbol, "AAPL"); + assert_eq!(result.market_name, "NMS"); + assert_eq!(result.min_tick, 0.01); + assert_eq!(result.long_name, "APPLE INC"); + assert_eq!(result.industry, "Technology"); + assert_eq!(result.category, "Computers"); + assert_eq!(result.subcategory, "Consumer Electronics"); + } +} diff --git a/src/contracts/common/decoders/mod.rs b/src/contracts/common/decoders/mod.rs deleted file mode 100644 index c8159840..00000000 --- a/src/contracts/common/decoders/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub(crate) mod proto; -mod text; - -pub(crate) use text::*; diff --git a/src/contracts/common/decoders/proto.rs b/src/contracts/common/decoders/proto.rs deleted file mode 100644 index 29f4b152..00000000 --- a/src/contracts/common/decoders/proto.rs +++ /dev/null @@ -1,61 +0,0 @@ -use prost::Message; - -use crate::contracts::ContractDetails; -use crate::Error; - -#[allow(dead_code)] -pub(crate) fn decode_contract_data_proto(bytes: &[u8]) -> Result { - let p: crate::proto::ContractData = Message::decode(bytes)?; - let default_contract = crate::proto::Contract::default(); - let default_details = crate::proto::ContractDetails::default(); - let proto_contract = p.contract.as_ref().unwrap_or(&default_contract); - let proto_details = p.contract_details.as_ref().unwrap_or(&default_details); - Ok(crate::proto::decoders::decode_contract_details(proto_contract, proto_details)) -} - -#[cfg(test)] -mod tests { - use super::*; - use prost::Message; - - #[test] - fn test_decode_contract_data_proto() { - let proto_msg = crate::proto::ContractData { - req_id: Some(1), - contract: Some(crate::proto::Contract { - con_id: Some(265598), - symbol: Some("AAPL".into()), - sec_type: Some("STK".into()), - exchange: Some("SMART".into()), - currency: Some("USD".into()), - local_symbol: Some("AAPL".into()), - trading_class: Some("NMS".into()), - ..Default::default() - }), - contract_details: Some(crate::proto::ContractDetails { - market_name: Some("NMS".into()), - min_tick: Some("0.01".into()), - long_name: Some("APPLE INC".into()), - industry: Some("Technology".into()), - category: Some("Computers".into()), - subcategory: Some("Consumer Electronics".into()), - ..Default::default() - }), - }; - - let mut bytes = Vec::new(); - proto_msg.encode(&mut bytes).unwrap(); - - let result = decode_contract_data_proto(&bytes).unwrap(); - assert_eq!(result.contract.contract_id, 265598); - assert_eq!(result.contract.symbol.to_string(), "AAPL"); - assert_eq!(result.contract.currency.to_string(), "USD"); - assert_eq!(result.contract.local_symbol, "AAPL"); - assert_eq!(result.market_name, "NMS"); - assert_eq!(result.min_tick, 0.01); - assert_eq!(result.long_name, "APPLE INC"); - assert_eq!(result.industry, "Technology"); - assert_eq!(result.category, "Computers"); - assert_eq!(result.subcategory, "Consumer Electronics"); - } -}