Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ fn maybe_apply_hotlink_ice_from_server(resp: &WsResponse) {
.filter(|v| !v.is_empty())
{
env::set_var("SYFTBOX_HOTLINK_TURN_PASS", pass);
crate::logging::info("hotlink TURN pass from server: [set]".to_string());
crate::logging::info("hotlink TURN pass from server: [set]");
}
}
}
Expand Down
97 changes: 60 additions & 37 deletions rust/src/hotlink_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ const HOTLINK_WEBRTC_BUFFERED_HIGH_MAX: usize = 16 * 1024 * 1024;
const HOTLINK_WEBRTC_BACKPRESSURE_WAIT_MS_DEFAULT: u64 = 1500;
const HOTLINK_WEBRTC_BACKPRESSURE_WAIT_MS_MAX: u64 = 10_000;
const HOTLINK_WEBRTC_BACKPRESSURE_POLL_MS: u64 = 5;
const HOTLINK_WEBRTC_ERR_OUTBOUND_TOO_LARGE: &str = "outbound packet larger than maximum message size";
const HOTLINK_WEBRTC_ERR_OUTBOUND_TOO_LARGE: &str =
"outbound packet larger than maximum message size";
const HOTLINK_TELEMETRY_FLUSH_MS: u64 = 1000;
const HOTLINK_BENCH_STRICT_ENV: &str = "SYFTBOX_HOTLINK_BENCH_STRICT";

Expand Down Expand Up @@ -187,7 +188,10 @@ impl HotlinkManager {
if !active_ok {
writers.insert(key.to_string(), entry);
if hotlink_debug_enabled() {
crate::logging::info(format!("hotlink tcp writer promoted standby key={}", key));
crate::logging::info(format!(
"hotlink tcp writer promoted standby key={}",
key
));
}
return true;
}
Expand Down Expand Up @@ -405,7 +409,11 @@ impl HotlinkManager {
let short_id = &id[..8.min(id.len())];
let peer = Self::peer_from_path(&s.path);
let channel = Self::channel_from_path(&s.path);
let wrtc = s.webrtc.as_ref().map(|w| Self::webrtc_state_str(w)).unwrap_or("none");
let wrtc = s
.webrtc
.as_ref()
.map(|w| Self::webrtc_state_str(w))
.unwrap_or("none");
inbound_list.push(json!({
"sid": short_id,
"peer": peer,
Expand All @@ -428,7 +436,11 @@ impl HotlinkManager {
} else {
"pending"
};
let wrtc = o.webrtc.as_ref().map(|w| Self::webrtc_state_str(w)).unwrap_or("none");
let wrtc = o
.webrtc
.as_ref()
.map(|w| Self::webrtc_state_str(w))
.unwrap_or("none");
outbound_list.push(json!({
"sid": short_id,
"peer": peer,
Expand Down Expand Up @@ -458,21 +470,32 @@ impl HotlinkManager {

let in_count = sessions.as_ref().map(|s| s.len()).unwrap_or(0);
let out_count = outbound.as_ref().map(|o| o.len()).unwrap_or(0);
let out_accepted = outbound.as_ref().map(|o| o.values().filter(|v| v.accepted).count()).unwrap_or(0);
let out_accepted = outbound
.as_ref()
.map(|o| o.values().filter(|v| v.accepted).count())
.unwrap_or(0);
let out_pending = out_count - out_accepted;

// Count WebRTC connected sessions (both directions)
let mut wrtc_connected = 0u64;
if let Ok(ref sess) = sessions {
for s in sess.values() {
if s.webrtc.as_ref().map(|w| w.ready_flag.load(Ordering::Relaxed)).unwrap_or(false) {
if s.webrtc
.as_ref()
.map(|w| w.ready_flag.load(Ordering::Relaxed))
.unwrap_or(false)
{
wrtc_connected += 1;
}
}
}
if let Ok(ref out) = outbound {
for o in out.values() {
if o.webrtc.as_ref().map(|w| w.ready_flag.load(Ordering::Relaxed)).unwrap_or(false) {
if o.webrtc
.as_ref()
.map(|w| w.ready_flag.load(Ordering::Relaxed))
.unwrap_or(false)
{
wrtc_connected += 1;
}
}
Expand Down Expand Up @@ -721,11 +744,14 @@ impl HotlinkManager {
marker_path.display()
));
}
proxies.insert(channel_key, TcpProxyInfo {
port: info.port,
from_email: info.from_email.clone(),
to_email: info.to_email.clone(),
});
proxies.insert(
channel_key,
TcpProxyInfo {
port: info.port,
from_email: info.from_email.clone(),
to_email: info.to_email.clone(),
},
);
let manager_clone = manager.clone();
tokio::spawn(async move {
manager_clone.run_tcp_proxy(marker_path).await;
Expand Down Expand Up @@ -826,8 +852,7 @@ impl HotlinkManager {
// Use directional outbound path owned by local user so that the
// server write-ACL check always passes. Each party sends on its own
// namespace (`local_email/_mpc/local_pid_to_peer_pid/...`).
let peer_inbound_key =
peer_inbound_tcp_key(&rel_marker, &info, local_email.as_deref());
let peer_inbound_key = peer_inbound_tcp_key(&rel_marker, &info, local_email.as_deref());
let outbound_key = local_outbound_tcp_key(&rel_marker, &info, local_email.as_deref())
.unwrap_or_else(|| channel_key.clone());
let port = info.port;
Expand Down Expand Up @@ -1081,7 +1106,8 @@ impl HotlinkManager {
if is_tcp_proxy_path(&path) {
crate::logging::info(format!(
"hotlink sending accept for tcp proxy: session={} path={}",
&session_id[..8], path
&session_id[..8],
path
));
if let Err(err) = self.send_accept(&session_id).await {
crate::logging::error(format!(
Expand Down Expand Up @@ -1243,12 +1269,10 @@ impl HotlinkManager {
None => frame.path.clone(),
};
let mut reorder = self.tcp_reorder.lock().await;
let buf = reorder
.entry(reorder_key)
.or_insert_with(|| TcpReorderBuf {
next_seq: 1,
pending: BTreeMap::new(),
});
let buf = reorder.entry(reorder_key).or_insert_with(|| TcpReorderBuf {
next_seq: 1,
pending: BTreeMap::new(),
});
buf.pending.insert(frame.seq, frame.payload);
let mut ready = Vec::new();
while let Some(data) = buf.pending.remove(&buf.next_seq) {
Expand Down Expand Up @@ -2053,7 +2077,9 @@ impl HotlinkManager {
if let Some(id) = existing_session.clone() {
let adopted = {
let out = self.outbound.read().await;
out.get(&id).map(|e| e.adopted_from_inbound).unwrap_or(false)
out.get(&id)
.map(|e| e.adopted_from_inbound)
.unwrap_or(false)
};
if adopted {
self.remove_outbound(&id).await;
Expand Down Expand Up @@ -2204,7 +2230,8 @@ impl HotlinkManager {
let id = Uuid::new_v4().to_string();
crate::logging::info(format!(
"hotlink session new: session={} path={}",
&id[..8], rel_path
&id[..8],
rel_path
));
let outbound = HotlinkOutbound {
id: id.clone(),
Expand Down Expand Up @@ -2303,10 +2330,7 @@ impl HotlinkManager {
}
};

if hotlink_debug_enabled()
&& is_tcp_proxy_path(&rel_path)
&& (seq <= 3 || seq % 500 == 0)
{
if hotlink_debug_enabled() && is_tcp_proxy_path(&rel_path) && (seq <= 3 || seq % 500 == 0) {
crate::logging::info(format!(
"hotlink send data: path={} session={} seq={} bytes={}",
rel_path,
Expand All @@ -2333,11 +2357,7 @@ impl HotlinkManager {
.await
{
Ok(Some(())) => {
self.record_tx(
payload_len,
send_started.elapsed().as_millis() as u64,
true,
);
self.record_tx(payload_len, send_started.elapsed().as_millis() as u64, true);
return Ok(());
}
Ok(None) => {
Expand Down Expand Up @@ -2370,9 +2390,7 @@ impl HotlinkManager {
return Err(anyhow::anyhow!(err));
}
if hotlink_debug_enabled() {
crate::logging::info(format!(
"hotlink webrtc send err: {e:?}"
));
crate::logging::info(format!("hotlink webrtc send err: {e:?}"));
}
if !p2p_only {
self.mark_ws_fallback(&session_id, &rel_path).await;
Expand Down Expand Up @@ -2765,6 +2783,7 @@ impl HotlinkManager {
Ok(())
}

#[allow(dead_code)]
async fn send_close(&self, session_id: &str, reason: &str) -> Result<()> {
let id = Uuid::new_v4().to_string();
if hotlink_debug_enabled() {
Expand Down Expand Up @@ -2811,7 +2830,12 @@ fn hotlink_tcp_proxy_chunk_size() -> usize {
std::env::var("SYFTBOX_HOTLINK_TCP_PROXY_CHUNK_SIZE")
.ok()
.and_then(|v| v.trim().parse::<usize>().ok())
.map(|v| v.clamp(HOTLINK_TCP_PROXY_CHUNK_SIZE_MIN, HOTLINK_TCP_PROXY_CHUNK_SIZE_MAX))
.map(|v| {
v.clamp(
HOTLINK_TCP_PROXY_CHUNK_SIZE_MIN,
HOTLINK_TCP_PROXY_CHUNK_SIZE_MAX,
)
})
.unwrap_or(HOTLINK_TCP_PROXY_CHUNK_SIZE_DEFAULT)
}

Expand Down Expand Up @@ -3103,7 +3127,6 @@ fn ice_servers() -> Vec<RTCIceServer> {
} else {
String::new()
},
..Default::default()
}
})
.collect()
Expand Down
2 changes: 2 additions & 0 deletions rust/src/wsproto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pub struct HotlinkOpen {
pub session_id: String,
pub path: String,
pub from: Option<String>,
#[allow(dead_code)]
pub to: Option<String>,
}

Expand Down Expand Up @@ -121,6 +122,7 @@ pub struct HotlinkClose {
pub struct HotlinkSignal {
pub session_id: String,
pub kind: String,
#[allow(dead_code)]
pub addrs: Vec<String>,
#[allow(dead_code)]
pub token: String,
Expand Down