From 442160d70417badf57d3a89c2e8e437f9c69c287 Mon Sep 17 00:00:00 2001 From: lc Date: Wed, 12 Nov 2025 19:40:51 +0800 Subject: [PATCH 01/24] add webrtc stream --- Cargo.toml | 5 + examples/webrtc.rs | 243 ++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/socket_client.rs | 6 + src/stream.rs | 13 ++- src/webrtc.rs | 269 +++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 536 insertions(+), 1 deletion(-) create mode 100644 examples/webrtc.rs create mode 100644 src/webrtc.rs diff --git a/Cargo.toml b/Cargo.toml index b5a70f203..e952e86d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ rustls-pki-types = "1.11" rustls-native-certs = "0.8" webpki-roots = "1.0.4" async-recursion = "1.1" +webrtc = "0.14.0" [target.'cfg(not(any(target_os = "android", target_os = "ios")))'.dependencies] mac_address = "1.1" @@ -70,6 +71,10 @@ machine-uid = { git = "https://github.com/rustdesk-org/machine-uid" } [build-dependencies] protobuf-codegen = { version = "3.7" } +[dev-dependencies] +clap = "4.5.51" +webrtc-signal = "0.1.1" + [target.'cfg(target_os = "windows")'.dependencies] winapi = { version = "0.3", features = [ "winuser", diff --git a/examples/webrtc.rs b/examples/webrtc.rs new file mode 100644 index 000000000..abd7a0b4d --- /dev/null +++ b/examples/webrtc.rs @@ -0,0 +1,243 @@ +use std::io::Write; +use std::sync::Arc; + +use bytes::{Bytes, BytesMut}; + +use clap::{Arg, Command}; +use anyhow::Result; +use tokio::time::Duration; + +use webrtc::api::APIBuilder; +use webrtc::api::setting_engine::SettingEngine; +use webrtc::data_channel::RTCDataChannel; +use webrtc::ice_transport::ice_server::RTCIceServer; +use webrtc::peer_connection::configuration::RTCConfiguration; +use webrtc::peer_connection::math_rand_alpha; +use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState; +use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; + +use webrtc_signal::{self as signal}; + +// example from https://github.com/webrtc-rs/webrtc/tree/master/examples/examples/data-channels +#[tokio::main] +async fn main() -> Result<()> { + let mut app = Command::new("data-channels") + .version("0.1.0") + .author("Rain Liu ") + .about("An example of Data-Channels.") + .arg( + Arg::new("FULLHELP") + .help("Prints more detailed help information") + .long("fullhelp"), + ) + .arg( + Arg::new("debug") + .long("debug") + .short('d') + .help("Prints debug log information"), + ); + + let matches = app.clone().get_matches(); + + if matches.contains_id("FULLHELP") { + app.print_long_help().unwrap(); + std::process::exit(0); + } + + let debug = matches.contains_id("debug"); + if debug { + env_logger::Builder::new() + .format(|buf, record| { + writeln!( + buf, + "{}:{} [{}] {} - {}", + record.file().unwrap_or("unknown"), + record.line().unwrap_or(0), + record.level(), + chrono::Local::now().format("%H:%M:%S.%6f"), + record.args() + ) + }) + .filter(None, log::LevelFilter::Trace) + .init(); + } + + // Everything below is the WebRTC-rs API! Thanks for using it ❤️. + // Create a SettingEngine and enable Detach + let mut s = SettingEngine::default(); + s.detach_data_channels(); + + // Create the API object + let api = APIBuilder::new() + .with_setting_engine(s) + .build(); + + // Prepare the configuration + let config = RTCConfiguration { + ice_servers: vec![RTCIceServer { + urls: vec!["stun:stun.l.google.com:19302".to_owned()], + ..Default::default() + }], + ..Default::default() + }; + + // Create a new RTCPeerConnection + let peer_connection = Arc::new(api.new_peer_connection(config).await?); + + let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1); + + let bootstrap = peer_connection.create_data_channel("bootstrap", None).await?; + let bootstrap_clone = Arc::clone(&bootstrap); + bootstrap.on_open(Box::new(move || { + println!("Data channel bootstrap open."); + Box::pin(async move { + let _raw = match bootstrap_clone.detach().await { + Ok(raw) => raw, + Err(err) => { + println!("data channel detach got err: {err}"); + return; + } + }; + }) + })); + + // Set the handler for Peer connection state + // This will notify you when the peer has connected/disconnected + peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { + println!("Peer Connection State has changed: {s}"); + + if s == RTCPeerConnectionState::Failed { + // Wait until PeerConnection has had no network activity for 30 seconds or another failure. + // It may be reconnected using an ICE Restart. + // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. + // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. + println!("Peer Connection has gone to failed exiting"); + let _ = done_tx.try_send(()); + } + + Box::pin(async {}) + })); + + + // Register data channel creation handling + peer_connection.on_data_channel(Box::new(move |d: Arc| { + let d_label = d.label().to_owned(); + let d_id = d.id(); + println!("New DataChannel {d_label} {d_id}"); + + // Register channel opening handling + Box::pin(async move { + let d2 = Arc::clone(&d); + let d3 = Arc::clone(&d); + let d_label2 = d_label.clone(); + let d_id2 = d_id; + d.on_open(Box::new(move || { + println!("Data channel '{d_label2}'-'{d_id2}' open."); + + Box::pin(async move { + tokio::spawn(async move { + let _ = read_loop(d2).await; + }); + + // Handle writing to the data channel + tokio::spawn(async move { + let _ = write_loop(d3).await; + }); + }) + })); + }) + })); + + // Wait for the offer to be pasted + println!("Wait for the offer to be pasted"); + let line = signal::must_read_stdin()?; + let desc_data = signal::decode(line.as_str())?; + let offer = serde_json::from_str::(&desc_data)?; + + // Set the remote SessionDescription + peer_connection.set_remote_description(offer).await?; + + // Create an answer + let answer = peer_connection.create_answer(None).await?; + + // Create channel that is blocked until ICE Gathering is complete + let mut gather_complete = peer_connection.gathering_complete_promise().await; + + // Sets the LocalDescription, and starts our UDP listeners + peer_connection.set_local_description(answer).await?; + + // Block until ICE Gathering is complete, disabling trickle ICE + // we do this because we only can exchange one signaling message + // in a production application you should exchange ICE Candidates via OnICECandidate + let _ = gather_complete.recv().await; + + // Output the answer in base64 so we can paste it in browser + if let Some(local_desc) = peer_connection.local_description().await { + let json_str = serde_json::to_string(&local_desc)?; + println!("{json_str}"); + let b64 = signal::encode(&json_str); + println!("--------------------- Copy the below base64 to browser --------------------"); + println!("{b64}"); + } else { + println!("generate local_description failed!"); + } + + println!("Press ctrl-c to stop"); + tokio::select! { + _ = done_rx.recv() => { + println!("received done signal!"); + } + _ = tokio::signal::ctrl_c() => { + println!(); + } + }; + + peer_connection.close().await?; + + Ok(()) +} + +// read_loop shows how to read from the datachannel directly +async fn read_loop(dc: Arc) -> Result<()> { + let mut buffer = BytesMut::zeroed(4096); + loop { + let d = dc.detach().await?; + println!("RTCDatachannel detach ok"); + let n = match d.read(&mut buffer).await { + Ok(n) => n, + Err(err) => { + println!("Datachannel closed; Exit the read_loop: {err}"); + return Ok(()); + } + }; + + if n == 0 { + println!("Datachannel read 0 byte; Exit the read_loop"); + return Ok(()); + } + println!( + "Message from DataChannel: {}", + String::from_utf8(buffer[..n].to_vec())? + ); + } +} + +// write_loop shows how to write to the datachannel directly +async fn write_loop(d: Arc) -> Result<()> { + let mut result = Result::::Ok(0); + while result.is_ok() { + let timeout = tokio::time::sleep(Duration::from_secs(5)); + tokio::pin!(timeout); + + tokio::select! { + _ = timeout.as_mut() =>{ + let message = math_rand_alpha(15); + println!("Sending '{message}'"); + result = d.send(&Bytes::from(message)).await.map_err(Into::into); + } + }; + } + println!("Datachannel write not ok; Exit the write_loop"); + + Ok(()) +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 851e4b10b..15042904d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,6 +59,7 @@ pub mod fingerprint; pub use flexi_logger; pub mod stream; pub mod websocket; +pub mod webrtc; #[cfg(any(target_os = "android", target_os = "ios"))] pub use rustls_platform_verifier; pub use stream::Stream; diff --git a/src/socket_client.rs b/src/socket_client.rs index f0e5b056a..c6db932f5 100644 --- a/src/socket_client.rs +++ b/src/socket_client.rs @@ -3,6 +3,7 @@ use crate::{ tcp::FramedStream, udp::FramedSocket, websocket::{self, check_ws, is_ws_endpoint}, + webrtc::{self, is_webrtc_endpoint}, ResultType, Stream, }; use anyhow::Context; @@ -129,6 +130,11 @@ pub async fn connect_tcp< target: T, ms_timeout: u64, ) -> ResultType { + if is_webrtc_endpoint(&target.to_string()) { + return Ok(Stream::WebRTC( + webrtc::WebRTCStream::new(&target.to_string(), ms_timeout).await?, + )); + } let target_str = check_ws(&target.to_string()); if is_ws_endpoint(&target_str) { return Ok(Stream::WebSocket( diff --git a/src/stream.rs b/src/stream.rs index 987d9be13..16db2c5e1 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,10 +1,11 @@ -use crate::{config, tcp, websocket, ResultType}; +use crate::{config, tcp, websocket, webrtc, ResultType}; use sodiumoxide::crypto::secretbox::Key; use std::net::SocketAddr; use tokio::net::TcpStream; // support Websocket and tcp. pub enum Stream { + WebRTC(webrtc::WebRTCStream), WebSocket(websocket::WsFramedStream), Tcp(tcp::FramedStream), } @@ -13,6 +14,7 @@ impl Stream { #[inline] pub fn set_send_timeout(&mut self, ms: u64) { match self { + Stream::WebRTC(s) => s.set_send_timeout(ms), Stream::WebSocket(s) => s.set_send_timeout(ms), Stream::Tcp(s) => s.set_send_timeout(ms), } @@ -21,6 +23,7 @@ impl Stream { #[inline] pub fn set_raw(&mut self) { match self { + Stream::WebRTC(s) => s.set_raw(), Stream::WebSocket(s) => s.set_raw(), Stream::Tcp(s) => s.set_raw(), } @@ -29,6 +32,7 @@ impl Stream { #[inline] pub async fn send_bytes(&mut self, bytes: bytes::Bytes) -> ResultType<()> { match self { + Stream::WebRTC(s) => s.send_bytes(bytes).await, Stream::WebSocket(s) => s.send_bytes(bytes).await, Stream::Tcp(s) => s.send_bytes(bytes).await, } @@ -37,6 +41,7 @@ impl Stream { #[inline] pub async fn send_raw(&mut self, bytes: Vec) -> ResultType<()> { match self { + Stream::WebRTC(s) => s.send_raw(bytes).await, Stream::WebSocket(s) => s.send_raw(bytes).await, Stream::Tcp(s) => s.send_raw(bytes).await, } @@ -45,6 +50,7 @@ impl Stream { #[inline] pub fn set_key(&mut self, key: Key) { match self { + Stream::WebRTC(s) => s.set_key(key), Stream::WebSocket(s) => s.set_key(key), Stream::Tcp(s) => s.set_key(key), } @@ -53,6 +59,7 @@ impl Stream { #[inline] pub fn is_secured(&self) -> bool { match self { + Stream::WebRTC(s) => s.is_secured(), Stream::WebSocket(s) => s.is_secured(), Stream::Tcp(s) => s.is_secured(), } @@ -64,6 +71,7 @@ impl Stream { timeout: u64, ) -> Option> { match self { + Stream::WebRTC(s) => s.next_timeout(timeout).await, Stream::WebSocket(s) => s.next_timeout(timeout).await, Stream::Tcp(s) => s.next_timeout(timeout).await, } @@ -87,6 +95,7 @@ impl Stream { #[inline] pub async fn send(&mut self, msg: &impl protobuf::Message) -> ResultType<()> { match self { + Self::WebRTC(s) => s.send(msg).await, Self::WebSocket(ws) => ws.send(msg).await, Self::Tcp(tcp) => tcp.send(msg).await, } @@ -96,6 +105,7 @@ impl Stream { #[inline] pub async fn next(&mut self) -> Option> { match self { + Self::WebRTC(s) => s.next().await, Self::WebSocket(ws) => ws.next().await, Self::Tcp(tcp) => tcp.next().await, } @@ -104,6 +114,7 @@ impl Stream { #[inline] pub fn local_addr(&self) -> SocketAddr { match self { + Self::WebRTC(s) => s.local_addr(), Self::WebSocket(ws) => ws.local_addr(), Self::Tcp(tcp) => tcp.local_addr(), } diff --git a/src/webrtc.rs b/src/webrtc.rs new file mode 100644 index 000000000..5b61eeead --- /dev/null +++ b/src/webrtc.rs @@ -0,0 +1,269 @@ +use std::sync::{Arc}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::io::{Error, ErrorKind}; +use std::time::Duration; +use std::collections::HashMap; + +use webrtc::api::APIBuilder; +use webrtc::api::setting_engine::SettingEngine; +use webrtc::data_channel::RTCDataChannel; +use webrtc::data_channel::data_channel_state::RTCDataChannelState; +use webrtc::ice_transport::ice_server::RTCIceServer; +use webrtc::peer_connection::RTCPeerConnection; +use webrtc::peer_connection::configuration::RTCConfiguration; +use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState; +use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; + +use crate::{ + protobuf::Message, + sodiumoxide::crypto::secretbox::Key, + ResultType, +}; +use bytes::{Bytes, BytesMut}; +use tokio::{time::timeout}; +use tokio::sync::Notify; +use tokio::sync::Mutex; +use base64::Engine; +use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; + +pub struct WebRTCStream { + pc: Arc, + stream: Arc, + notify: Arc, + send_timeout: u64, +} + +/// message size limit for Chromium +const DATA_CHANNEL_BUFFER_SIZE: u16 = u16::MAX; + +lazy_static::lazy_static! { + static ref SESSIONS: Arc::>> = Default::default(); +} + +impl Clone for WebRTCStream { + fn clone(&self) -> Self { + WebRTCStream { + pc: self.pc.clone(), + stream: self.stream.clone(), + notify: self.notify.clone(), + send_timeout: self.send_timeout, + } + } +} + +impl WebRTCStream { + + pub fn get_remote_offer(endpoint: &str) -> Option { + // Ensure the endpoint starts with the "webrtc://" prefix + if !endpoint.starts_with("webrtc://") { + return None; + } + + // Extract the Base64-encoded SDP part + let encoded_sdp = &endpoint["webrtc://".len()..]; + + // Decode the Base64 string + let decoded_bytes = BASE64_STANDARD.decode(encoded_sdp).ok()?; + let decoded_sdp = String::from_utf8(decoded_bytes).ok()?; + + Some(decoded_sdp) + } + + pub async fn new>( + webrtc_endpoint: T, + ms_timeout: u64, + ) -> ResultType { + log::debug!("Start webrtc with endpoint: {}", webrtc_endpoint.as_ref()); + let remote_offer: String = match Self::get_remote_offer(webrtc_endpoint.as_ref()) { + Some(offer) => offer, + None => { + return Err(Error::new( + ErrorKind::InvalidInput, + "Invalid WebRTC endpoint format", + ).into()); + } + }; + + let key = remote_offer.to_string(); + let mut lock = SESSIONS.lock().await; + let contains = lock.contains_key(&key); + if contains { + log::debug!("Start webrtc with cached peer"); + return Ok(lock.get(&key).unwrap().clone()); + } + + log::debug!("Start webrtc with offer: {}", remote_offer); + // Create a SettingEngine and enable Detach + let mut s = SettingEngine::default(); + s.detach_data_channels(); + + // Create the API object + let api = APIBuilder::new() + .with_setting_engine(s) + .build(); + + // Prepare the configuration + let config = RTCConfiguration { + ice_servers: vec![RTCIceServer { + urls: vec!["stun:stun.cloudflare.com:3478".to_owned()], + ..Default::default() + }], + ..Default::default() + }; + + let notify = Arc::new(Notify::new()); + let notify_tx = notify.clone(); + // Create a new RTCPeerConnection + let peer_connection = Arc::new(api.new_peer_connection(config).await?); + let bootstrap = peer_connection.create_data_channel("bootstrap", None).await?; + bootstrap.on_open(Box::new(move || { + log::debug!("Data channel bootstrap open."); + notify_tx.notify_waiters(); + Box::pin(async {}) + })); + + // This will notify you when the peer has connected/disconnected + let notify_tx2 = notify.clone(); + peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { + log::debug!("Peer Connection State has changed: {}", s); + if s == RTCPeerConnectionState::Disconnected { + notify_tx2.notify_waiters(); + } + + // TODO clear SESSIONS entry? + Box::pin(async {}) + })); + + let offer = serde_json::from_str::(&remote_offer)?; + // Set the remote SessionDescription + peer_connection.set_remote_description(offer).await?; + // Create an answer + let answer = peer_connection.create_answer(None).await?; + // Create channel that is blocked until ICE Gathering is complete + let mut gather_complete = peer_connection.gathering_complete_promise().await; + // Sets the LocalDescription, and starts our UDP listeners + peer_connection.set_local_description(answer).await?; + let _ = gather_complete.recv().await; + + let ds = WebRTCStream { + pc: peer_connection, + stream: bootstrap, + notify: notify, + send_timeout: ms_timeout, + }; + + // log the answer + match ds.get_local_endpoint().await { + Some(local_endpoint) => log::debug!("WebRTC local endpoint: {}", local_endpoint), + None => log::debug!("WebRTC local endpoint: "), + } + + lock.insert(key, ds.clone()); + Ok(ds) + } + + #[inline] + pub async fn get_local_endpoint(&self) -> Option { + if let Some(local_desc) = self.pc.local_description().await { + let sdp = serde_json::to_string(&local_desc).ok()?; + Some(format!("webrtc://{}", BASE64_STANDARD.encode(sdp))) + } else { + None + } + } + + #[inline] + pub fn set_raw(&mut self) { + // not-supported + } + + #[inline] + pub fn local_addr(&self) -> SocketAddr { + SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0) + } + + #[inline] + pub fn set_send_timeout(&mut self, ms: u64) { + self.send_timeout = ms; + } + + #[inline] + pub fn set_key(&mut self, _key: Key) { + // not-supported + } + + #[inline] + pub fn is_secured(&self) -> bool { + true + } + + #[inline] + pub async fn send(&mut self, msg: &impl Message) -> ResultType<()> { + self.send_raw(msg.write_to_bytes()?).await + } + + #[inline] + pub async fn send_raw(&mut self, msg: Vec) -> ResultType<()> { + self.send_bytes(Bytes::from(msg)).await + } + + pub async fn send_bytes(&mut self, bytes: Bytes) -> ResultType<()> { + // wait for connected or disconnected + self.notify.notified().await; + self.stream.send(&bytes).await?; + Ok(()) + } + + #[inline] + pub async fn next(&mut self) -> Option> { + // wait for connected or disconnected + self.notify.notified().await; + if self.stream.ready_state() != RTCDataChannelState::Open { + return Some(Err(Error::new( + ErrorKind::Other, + "data channel is closed", + ))); + } + + // TODO reuse buffer? + let mut buffer = BytesMut::zeroed(DATA_CHANNEL_BUFFER_SIZE as usize); + let dc = self.stream.detach().await.ok()?; + let n = match dc.read(&mut buffer).await { + Ok(n) => n, + Err(err) => { + return Some(Err(Error::new( + ErrorKind::Other, + format!("data channel read error: {}", err), + ))); + } + }; + if n == 0 { + return Some(Err(Error::new( + ErrorKind::Other, + "data channel read exited with 0 bytes", + ))); + } + buffer.truncate(n); + Some(Ok(buffer)) + } + + #[inline] + pub async fn next_timeout(&mut self, ms: u64) -> Option> { + match timeout(Duration::from_millis(ms), self.next()).await { + Ok(res) => res, + Err(_) => None, + } + } +} + +pub fn is_webrtc_endpoint(endpoint: &str) -> bool { + // use sdp base64 json string as endpoint, or prefix webrtc: + endpoint.starts_with("webrtc://") +} + +#[cfg(test)] +mod tests { + #[test] + fn test_dc() { + } +} From 8ae4651bc7466c192443f29e7e05029819c36708 Mon Sep 17 00:00:00 2001 From: lc Date: Thu, 13 Nov 2025 16:53:04 +0800 Subject: [PATCH 02/24] make webrtc-rs optional feature --- Cargo.toml | 8 +- examples/webrtc.rs | 218 ++++++++++--------------------------------- src/lib.rs | 5 + src/socket_client.rs | 4 +- src/webrtc.rs | 153 ++++++++++++++++++------------ src/webrtc_dummy.rs | 67 +++++++++++++ 6 files changed, 223 insertions(+), 232 deletions(-) create mode 100644 src/webrtc_dummy.rs diff --git a/Cargo.toml b/Cargo.toml index e952e86d2..72f09192b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,10 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["webrtc"] +webrtc = ["dep:webrtc"] + [dependencies] # new flexi_logger failed on rustc 1.75 flexi_logger = { version = "0.27", features = ["async"] } @@ -61,7 +65,7 @@ rustls-pki-types = "1.11" rustls-native-certs = "0.8" webpki-roots = "1.0.4" async-recursion = "1.1" -webrtc = "0.14.0" +webrtc = { version = "0.14.0", optional = true } [target.'cfg(not(any(target_os = "android", target_os = "ios")))'.dependencies] mac_address = "1.1" @@ -73,7 +77,7 @@ protobuf-codegen = { version = "3.7" } [dev-dependencies] clap = "4.5.51" -webrtc-signal = "0.1.1" +webrtc = "0.14.0" [target.'cfg(target_os = "windows")'.dependencies] winapi = { version = "0.3", features = [ diff --git a/examples/webrtc.rs b/examples/webrtc.rs index abd7a0b4d..5a5e909d7 100644 --- a/examples/webrtc.rs +++ b/examples/webrtc.rs @@ -1,51 +1,37 @@ -use std::io::Write; -use std::sync::Arc; +extern crate hbb_common; -use bytes::{Bytes, BytesMut}; +use std::io::Write; +use bytes::Bytes; use clap::{Arg, Command}; use anyhow::Result; use tokio::time::Duration; -use webrtc::api::APIBuilder; -use webrtc::api::setting_engine::SettingEngine; -use webrtc::data_channel::RTCDataChannel; -use webrtc::ice_transport::ice_server::RTCIceServer; -use webrtc::peer_connection::configuration::RTCConfiguration; use webrtc::peer_connection::math_rand_alpha; -use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState; -use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; -use webrtc_signal::{self as signal}; - -// example from https://github.com/webrtc-rs/webrtc/tree/master/examples/examples/data-channels #[tokio::main] async fn main() -> Result<()> { - let mut app = Command::new("data-channels") - .version("0.1.0") - .author("Rain Liu ") - .about("An example of Data-Channels.") - .arg( - Arg::new("FULLHELP") - .help("Prints more detailed help information") - .long("fullhelp"), - ) + let app = Command::new("webrtc-stream") + .about("An example of webrtc stream using hbb_common and webrtc-rs") .arg( Arg::new("debug") .long("debug") .short('d') + .action(clap::ArgAction::SetTrue) .help("Prints debug log information"), + ) + .arg( + Arg::new("offer") + .long("offer") + .short('o') + .help("set offer from other endpoint"), ); let matches = app.clone().get_matches(); - if matches.contains_id("FULLHELP") { - app.print_long_help().unwrap(); - std::process::exit(0); - } - let debug = matches.contains_id("debug"); if debug { + println!("Debug log enabled"); env_logger::Builder::new() .format(|buf, record| { writeln!( @@ -58,173 +44,67 @@ async fn main() -> Result<()> { record.args() ) }) - .filter(None, log::LevelFilter::Trace) + .filter(None, log::LevelFilter::Debug) .init(); } - // Everything below is the WebRTC-rs API! Thanks for using it ❤️. - // Create a SettingEngine and enable Detach - let mut s = SettingEngine::default(); - s.detach_data_channels(); - - // Create the API object - let api = APIBuilder::new() - .with_setting_engine(s) - .build(); - - // Prepare the configuration - let config = RTCConfiguration { - ice_servers: vec![RTCIceServer { - urls: vec!["stun:stun.l.google.com:19302".to_owned()], - ..Default::default() - }], - ..Default::default() + let remote_endpoint = if let Some(endpoint) = matches.get_one::("offer") { + endpoint.to_string() + } else { + "".to_string() }; - // Create a new RTCPeerConnection - let peer_connection = Arc::new(api.new_peer_connection(config).await?); + let webrtc_stream = hbb_common::webrtc::WebRTCStream::new(&remote_endpoint, 30000).await?; + // Print the offer to be sent to the other peer + webrtc_stream.get_local_endpoint().await; - let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1); - - let bootstrap = peer_connection.create_data_channel("bootstrap", None).await?; - let bootstrap_clone = Arc::clone(&bootstrap); - bootstrap.on_open(Box::new(move || { - println!("Data channel bootstrap open."); - Box::pin(async move { - let _raw = match bootstrap_clone.detach().await { - Ok(raw) => raw, - Err(err) => { - println!("data channel detach got err: {err}"); - return; - } - }; - }) - })); - - // Set the handler for Peer connection state - // This will notify you when the peer has connected/disconnected - peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { - println!("Peer Connection State has changed: {s}"); - - if s == RTCPeerConnectionState::Failed { - // Wait until PeerConnection has had no network activity for 30 seconds or another failure. - // It may be reconnected using an ICE Restart. - // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. - // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. - println!("Peer Connection has gone to failed exiting"); - let _ = done_tx.try_send(()); - } - - Box::pin(async {}) - })); - - - // Register data channel creation handling - peer_connection.on_data_channel(Box::new(move |d: Arc| { - let d_label = d.label().to_owned(); - let d_id = d.id(); - println!("New DataChannel {d_label} {d_id}"); - - // Register channel opening handling - Box::pin(async move { - let d2 = Arc::clone(&d); - let d3 = Arc::clone(&d); - let d_label2 = d_label.clone(); - let d_id2 = d_id; - d.on_open(Box::new(move || { - println!("Data channel '{d_label2}'-'{d_id2}' open."); - - Box::pin(async move { - tokio::spawn(async move { - let _ = read_loop(d2).await; - }); - - // Handle writing to the data channel - tokio::spawn(async move { - let _ = write_loop(d3).await; - }); - }) - })); - }) - })); - - // Wait for the offer to be pasted - println!("Wait for the offer to be pasted"); - let line = signal::must_read_stdin()?; - let desc_data = signal::decode(line.as_str())?; - let offer = serde_json::from_str::(&desc_data)?; - - // Set the remote SessionDescription - peer_connection.set_remote_description(offer).await?; - - // Create an answer - let answer = peer_connection.create_answer(None).await?; - - // Create channel that is blocked until ICE Gathering is complete - let mut gather_complete = peer_connection.gathering_complete_promise().await; - - // Sets the LocalDescription, and starts our UDP listeners - peer_connection.set_local_description(answer).await?; - - // Block until ICE Gathering is complete, disabling trickle ICE - // we do this because we only can exchange one signaling message - // in a production application you should exchange ICE Candidates via OnICECandidate - let _ = gather_complete.recv().await; - - // Output the answer in base64 so we can paste it in browser - if let Some(local_desc) = peer_connection.local_description().await { - let json_str = serde_json::to_string(&local_desc)?; - println!("{json_str}"); - let b64 = signal::encode(&json_str); - println!("--------------------- Copy the below base64 to browser --------------------"); - println!("{b64}"); - } else { - println!("generate local_description failed!"); + if remote_endpoint.is_empty() { + // Wait for the answer to be pasted + println!("Wait for the answer to be pasted"); + // readline blocking + let line = std::io::stdin() + .lines() + .next() + .ok_or_else(|| anyhow::anyhow!("No input received"))??; + webrtc_stream.set_remote_endpoint(&line).await?; } + let s1 = hbb_common::Stream::WebRTC(webrtc_stream.clone()); + tokio::spawn(async move { + let _ = read_loop(s1).await; + }); + + let s2 = hbb_common::Stream::WebRTC(webrtc_stream.clone()); + tokio::spawn(async move { + let _ = write_loop(s2).await; + }); + println!("Press ctrl-c to stop"); tokio::select! { - _ = done_rx.recv() => { - println!("received done signal!"); - } _ = tokio::signal::ctrl_c() => { println!(); } }; - peer_connection.close().await?; - Ok(()) } // read_loop shows how to read from the datachannel directly -async fn read_loop(dc: Arc) -> Result<()> { - let mut buffer = BytesMut::zeroed(4096); +async fn read_loop(mut stream: hbb_common::Stream) -> Result<()> { loop { - let d = dc.detach().await?; - println!("RTCDatachannel detach ok"); - let n = match d.read(&mut buffer).await { - Ok(n) => n, - Err(err) => { - println!("Datachannel closed; Exit the read_loop: {err}"); - return Ok(()); - } - }; - - if n == 0 { - println!("Datachannel read 0 byte; Exit the read_loop"); + let Some(res) = stream.next().await else { + println!("Datachannel closed; Exit the read_loop"); return Ok(()); - } - println!( - "Message from DataChannel: {}", - String::from_utf8(buffer[..n].to_vec())? + }; + println!("Message from DataChannel: {}", + String::from_utf8(res.unwrap().to_vec())? ); } } // write_loop shows how to write to the datachannel directly -async fn write_loop(d: Arc) -> Result<()> { - let mut result = Result::::Ok(0); +async fn write_loop(mut stream: hbb_common::Stream) -> Result<()> { + let mut result = Result::<()>::Ok(()); while result.is_ok() { let timeout = tokio::time::sleep(Duration::from_secs(5)); tokio::pin!(timeout); @@ -233,7 +113,7 @@ async fn write_loop(d: Arc) -> Result<()> { _ = timeout.as_mut() =>{ let message = math_rand_alpha(15); println!("Sending '{message}'"); - result = d.send(&Bytes::from(message)).await.map_err(Into::into); + result = stream.send_bytes(Bytes::from(message)).await; } }; } diff --git a/src/lib.rs b/src/lib.rs index 15042904d..5d3e60007 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,7 +59,12 @@ pub mod fingerprint; pub use flexi_logger; pub mod stream; pub mod websocket; +#[cfg(feature = "webrtc")] pub mod webrtc; +#[cfg(not(feature = "webrtc"))] +pub mod webrtc_dummy; +#[cfg(not(feature = "webrtc"))] +pub use webrtc_dummy as webrtc; #[cfg(any(target_os = "android", target_os = "ios"))] pub use rustls_platform_verifier; pub use stream::Stream; diff --git a/src/socket_client.rs b/src/socket_client.rs index c6db932f5..1f568ff3f 100644 --- a/src/socket_client.rs +++ b/src/socket_client.rs @@ -3,9 +3,10 @@ use crate::{ tcp::FramedStream, udp::FramedSocket, websocket::{self, check_ws, is_ws_endpoint}, - webrtc::{self, is_webrtc_endpoint}, ResultType, Stream, }; +#[cfg(feature = "webrtc")] +use crate::webrtc::{self, is_webrtc_endpoint}; use anyhow::Context; use std::{net::SocketAddr, sync::Arc}; use tokio::net::{ToSocketAddrs, UdpSocket}; @@ -130,6 +131,7 @@ pub async fn connect_tcp< target: T, ms_timeout: u64, ) -> ResultType { + #[cfg(feature = "webrtc")] if is_webrtc_endpoint(&target.to_string()) { return Ok(Stream::WebRTC( webrtc::WebRTCStream::new(&target.to_string(), ms_timeout).await?, diff --git a/src/webrtc.rs b/src/webrtc.rs index 5b61eeead..b276e775e 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc}; +use std::sync::Arc; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::io::{Error, ErrorKind}; use std::time::Duration; @@ -13,23 +13,25 @@ use webrtc::peer_connection::RTCPeerConnection; use webrtc::peer_connection::configuration::RTCConfiguration; use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState; use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; +use webrtc::ice::mdns::MulticastDnsMode; use crate::{ protobuf::Message, sodiumoxide::crypto::secretbox::Key, ResultType, }; -use bytes::{Bytes, BytesMut}; -use tokio::{time::timeout}; -use tokio::sync::Notify; -use tokio::sync::Mutex; + use base64::Engine; use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; +use bytes::{Bytes, BytesMut}; +use tokio::time::timeout; +use tokio::sync::watch; +use tokio::sync::Mutex; pub struct WebRTCStream { pc: Arc, stream: Arc, - notify: Arc, + state_notify: watch::Receiver, send_timeout: u64, } @@ -45,7 +47,7 @@ impl Clone for WebRTCStream { WebRTCStream { pc: self.pc.clone(), stream: self.stream.clone(), - notify: self.notify.clone(), + state_notify: self.state_notify.clone(), send_timeout: self.send_timeout, } } @@ -53,38 +55,40 @@ impl Clone for WebRTCStream { impl WebRTCStream { - pub fn get_remote_offer(endpoint: &str) -> Option { + pub fn get_remote_offer(endpoint: &str) -> ResultType { // Ensure the endpoint starts with the "webrtc://" prefix if !endpoint.starts_with("webrtc://") { - return None; + return Err(Error::new(ErrorKind::InvalidInput, "Invalid WebRTC endpoint format").into()); } // Extract the Base64-encoded SDP part let encoded_sdp = &endpoint["webrtc://".len()..]; - // Decode the Base64 string - let decoded_bytes = BASE64_STANDARD.decode(encoded_sdp).ok()?; - let decoded_sdp = String::from_utf8(decoded_bytes).ok()?; - - Some(decoded_sdp) + let decoded_bytes = BASE64_STANDARD.decode(encoded_sdp).map_err(|_| + Error::new(ErrorKind::InvalidInput, "Failed to decode Base64 SDP") + )?; + Ok(String::from_utf8(decoded_bytes).map_err(|_| { + Error::new(ErrorKind::InvalidInput, "Failed to convert decoded bytes to UTF-8") + })?) } - pub async fn new>( - webrtc_endpoint: T, + pub fn sdp_to_endpoint(sdp: &str) -> String { + let encoded_sdp = BASE64_STANDARD.encode(sdp); + format!("webrtc://{}", encoded_sdp) + } + + pub async fn new( + remote_endpoint: &str, ms_timeout: u64, ) -> ResultType { - log::debug!("Start webrtc with endpoint: {}", webrtc_endpoint.as_ref()); - let remote_offer: String = match Self::get_remote_offer(webrtc_endpoint.as_ref()) { - Some(offer) => offer, - None => { - return Err(Error::new( - ErrorKind::InvalidInput, - "Invalid WebRTC endpoint format", - ).into()); - } + log::debug!("New webrtc stream with endpoint: {}", remote_endpoint); + let remote_offer = if remote_endpoint.is_empty() { + "".into() + } else { + Self::get_remote_offer(remote_endpoint)? }; - let key = remote_offer.to_string(); + let mut key = remote_offer.clone(); let mut lock = SESSIONS.lock().await; let contains = lock.contains_key(&key); if contains { @@ -92,10 +96,10 @@ impl WebRTCStream { return Ok(lock.get(&key).unwrap().clone()); } - log::debug!("Start webrtc with offer: {}", remote_offer); // Create a SettingEngine and enable Detach let mut s = SettingEngine::default(); s.detach_data_channels(); + s.set_ice_multicast_dns_mode(MulticastDnsMode::Disabled); // Create the API object let api = APIBuilder::new() @@ -111,67 +115,96 @@ impl WebRTCStream { ..Default::default() }; - let notify = Arc::new(Notify::new()); - let notify_tx = notify.clone(); + let (notify_tx, notify_rx) = watch::channel(false); + let on_open_notify = notify_tx.clone(); // Create a new RTCPeerConnection let peer_connection = Arc::new(api.new_peer_connection(config).await?); - let bootstrap = peer_connection.create_data_channel("bootstrap", None).await?; - bootstrap.on_open(Box::new(move || { + let data_channel = peer_connection.create_data_channel("bootstrap", None).await?; + data_channel.on_open(Box::new(move || { log::debug!("Data channel bootstrap open."); - notify_tx.notify_waiters(); + let _ = on_open_notify.send(true); Box::pin(async {}) })); // This will notify you when the peer has connected/disconnected - let notify_tx2 = notify.clone(); + let on_connection_notify = notify_tx.clone(); peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { log::debug!("Peer Connection State has changed: {}", s); if s == RTCPeerConnectionState::Disconnected { - notify_tx2.notify_waiters(); + let _ = on_connection_notify.send(true); } // TODO clear SESSIONS entry? Box::pin(async {}) })); - let offer = serde_json::from_str::(&remote_offer)?; - // Set the remote SessionDescription - peer_connection.set_remote_description(offer).await?; - // Create an answer - let answer = peer_connection.create_answer(None).await?; - // Create channel that is blocked until ICE Gathering is complete - let mut gather_complete = peer_connection.gathering_complete_promise().await; - // Sets the LocalDescription, and starts our UDP listeners - peer_connection.set_local_description(answer).await?; - let _ = gather_complete.recv().await; + // Register data channel creation handling + let on_open_notify2 = notify_tx.clone(); + peer_connection.on_data_channel(Box::new(move |dc: Arc| { + let d_label = dc.label().to_owned(); + log::debug!("Remote data channel {}", d_label); + let notify = on_open_notify2.clone(); + Box::pin(async move { + dc.on_open(Box::new(move || { + let _ = notify.send(true); + Box::pin(async {}) + })); + }) + })); - let ds = WebRTCStream { + if remote_offer.is_empty() { + let sdp = peer_connection.create_offer(None).await?; + let mut gather_complete = peer_connection.gathering_complete_promise().await; + peer_connection.set_local_description(sdp.clone()).await?; + let _ = gather_complete.recv().await; + + let final_sdp = peer_connection.local_description().await.ok_or_else(|| { + Error::new(ErrorKind::Other, "Failed to get local description after gathering") + })?; + key = serde_json::to_string(&final_sdp).unwrap_or_default(); + log::debug!("Start webrtc with local: {}", key); + } else { + let sdp = serde_json::from_str::(&remote_offer)?; + peer_connection.set_remote_description(sdp).await?; + let answer = peer_connection.create_answer(None).await?; + let mut gather_complete = peer_connection.gathering_complete_promise().await; + peer_connection.set_local_description(answer).await?; + let _ = gather_complete.recv().await; + log::debug!("Start webrtc with remote: {}", remote_offer); + } + + let webrtc_stream = WebRTCStream { pc: peer_connection, - stream: bootstrap, - notify: notify, + stream: data_channel, + state_notify: notify_rx, send_timeout: ms_timeout, }; - // log the answer - match ds.get_local_endpoint().await { - Some(local_endpoint) => log::debug!("WebRTC local endpoint: {}", local_endpoint), - None => log::debug!("WebRTC local endpoint: "), - } - - lock.insert(key, ds.clone()); - Ok(ds) + lock.insert(key, webrtc_stream.clone()); + Ok(webrtc_stream) } #[inline] pub async fn get_local_endpoint(&self) -> Option { if let Some(local_desc) = self.pc.local_description().await { - let sdp = serde_json::to_string(&local_desc).ok()?; - Some(format!("webrtc://{}", BASE64_STANDARD.encode(sdp))) + let sdp = serde_json::to_string(&local_desc).unwrap_or_default(); + let endpoint = Self::sdp_to_endpoint(&sdp); + log::debug!("WebRTC get local endpoint: {}", endpoint); + Some(endpoint) } else { None } } + #[inline] + pub async fn set_remote_endpoint(&self, endpoint: &str) -> ResultType<()> { + let offer = Self::get_remote_offer(endpoint)?; + log::debug!("WebRTC set remote sdp: {}", offer); + let sdp = serde_json::from_str::(&offer)?; + self.pc.set_remote_description(sdp).await?; + Ok(()) + } + #[inline] pub fn set_raw(&mut self) { // not-supported @@ -208,8 +241,7 @@ impl WebRTCStream { } pub async fn send_bytes(&mut self, bytes: Bytes) -> ResultType<()> { - // wait for connected or disconnected - self.notify.notified().await; + let _ = self.state_notify.changed().await; self.stream.send(&bytes).await?; Ok(()) } @@ -217,7 +249,7 @@ impl WebRTCStream { #[inline] pub async fn next(&mut self) -> Option> { // wait for connected or disconnected - self.notify.notified().await; + let _ = self.state_notify.changed().await; if self.stream.ready_state() != RTCDataChannelState::Open { return Some(Err(Error::new( ErrorKind::Other, @@ -243,6 +275,7 @@ impl WebRTCStream { "data channel read exited with 0 bytes", ))); } + log::debug!("WebRTCStream read {} bytes", n); buffer.truncate(n); Some(Ok(buffer)) } diff --git a/src/webrtc_dummy.rs b/src/webrtc_dummy.rs new file mode 100644 index 000000000..a6f9344ba --- /dev/null +++ b/src/webrtc_dummy.rs @@ -0,0 +1,67 @@ +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::io::Error; + +use bytes::{Bytes, BytesMut}; + +use crate::{ + protobuf::Message, + sodiumoxide::crypto::secretbox::Key, + ResultType, +}; + +pub struct WebRTCStream { + // mock struct +} + +impl WebRTCStream { + + #[inline] + pub fn set_raw(&mut self) { + } + + #[inline] + pub fn local_addr(&self) -> SocketAddr { + SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0) + } + + #[inline] + pub fn set_send_timeout(&mut self, _ms: u64) { + } + + #[inline] + pub fn set_key(&mut self, _key: Key) { + } + + #[inline] + pub fn is_secured(&self) -> bool { + false + } + + #[inline] + pub async fn send(&mut self, _msg: &impl Message) -> ResultType<()> { + Ok(()) + } + + #[inline] + pub async fn send_raw(&mut self, _msg: Vec) -> ResultType<()> { + Ok(()) + } + + pub async fn send_bytes(&mut self, _bytes: Bytes) -> ResultType<()> { + Ok(()) + } + + #[inline] + pub async fn next(&mut self) -> Option> { + None + } + + #[inline] + pub async fn next_timeout(&mut self, _ms: u64) -> Option> { + None + } +} + +pub fn is_webrtc_endpoint(_endpoint: &str) -> bool { + false +} From 4cea3a77699d9d314f18dbcba791b33f765c5dae Mon Sep 17 00:00:00 2001 From: lc Date: Thu, 13 Nov 2025 20:59:32 +0800 Subject: [PATCH 03/24] better example support local dc stream fix read/write issue clear sessions after close --- examples/webrtc.rs | 28 ++++++--- src/webrtc.rs | 139 +++++++++++++++++++++++++++++---------------- 2 files changed, 108 insertions(+), 59 deletions(-) diff --git a/examples/webrtc.rs b/examples/webrtc.rs index 5a5e909d7..317a7f502 100644 --- a/examples/webrtc.rs +++ b/examples/webrtc.rs @@ -44,7 +44,7 @@ async fn main() -> Result<()> { record.args() ) }) - .filter(None, log::LevelFilter::Debug) + .filter(Some("hbb_common"), log::LevelFilter::Debug) .init(); } @@ -56,17 +56,23 @@ async fn main() -> Result<()> { let webrtc_stream = hbb_common::webrtc::WebRTCStream::new(&remote_endpoint, 30000).await?; // Print the offer to be sent to the other peer - webrtc_stream.get_local_endpoint().await; + let local_endpoint = webrtc_stream.get_local_endpoint().await?; if remote_endpoint.is_empty() { + println!(); // Wait for the answer to be pasted - println!("Wait for the answer to be pasted"); + println!( + "Start new terminal run: \n{} \ncopy remote endpoint and paste here", + format!("cargo r --example webrtc -- --offer {}", local_endpoint) + ); // readline blocking let line = std::io::stdin() .lines() .next() .ok_or_else(|| anyhow::anyhow!("No input received"))??; webrtc_stream.set_remote_endpoint(&line).await?; + } else { + println!("Copy local endpoint and paste to the other peer: \n{}", local_endpoint); } let s1 = hbb_common::Stream::WebRTC(webrtc_stream.clone()); @@ -93,16 +99,20 @@ async fn main() -> Result<()> { async fn read_loop(mut stream: hbb_common::Stream) -> Result<()> { loop { let Some(res) = stream.next().await else { - println!("Datachannel closed; Exit the read_loop"); + println!("WebRTC stream closed; Exit the read_loop"); return Ok(()); }; - println!("Message from DataChannel: {}", + if res.is_err() { + println!("WebRTC stream read error: {}; Exit the read_loop", res.err().unwrap()); + return Ok(()); + } + println!("Message from stream: {}", String::from_utf8(res.unwrap().to_vec())? ); } } -// write_loop shows how to write to the datachannel directly +// write_loop shows how to write to the webrtc stream directly async fn write_loop(mut stream: hbb_common::Stream) -> Result<()> { let mut result = Result::<()>::Ok(()); while result.is_ok() { @@ -112,12 +122,12 @@ async fn write_loop(mut stream: hbb_common::Stream) -> Result<()> { tokio::select! { _ = timeout.as_mut() =>{ let message = math_rand_alpha(15); - println!("Sending '{message}'"); - result = stream.send_bytes(Bytes::from(message)).await; + result = stream.send_bytes(Bytes::from(message.clone())).await; + println!("Sent '{message}' {}", result.is_ok()); } }; } - println!("Datachannel write not ok; Exit the write_loop"); + println!("WebRTC stream write failed; Exit the write_loop"); Ok(()) } \ No newline at end of file diff --git a/src/webrtc.rs b/src/webrtc.rs index b276e775e..3a3ed011b 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -7,7 +7,6 @@ use std::collections::HashMap; use webrtc::api::APIBuilder; use webrtc::api::setting_engine::SettingEngine; use webrtc::data_channel::RTCDataChannel; -use webrtc::data_channel::data_channel_state::RTCDataChannelState; use webrtc::ice_transport::ice_server::RTCIceServer; use webrtc::peer_connection::RTCPeerConnection; use webrtc::peer_connection::configuration::RTCConfiguration; @@ -30,7 +29,7 @@ use tokio::sync::Mutex; pub struct WebRTCStream { pc: Arc, - stream: Arc, + stream: Arc>>, state_notify: watch::Receiver, send_timeout: u64, } @@ -77,11 +76,24 @@ impl WebRTCStream { format!("webrtc://{}", encoded_sdp) } + async fn get_key_for_peer(pc: &Arc) -> String { + if let Some(local_desc) = pc.local_description().await { + if local_desc.sdp_type != webrtc::peer_connection::sdp::sdp_type::RTCSdpType::Offer { + let Some(remote_desc) = pc.remote_description().await else { + return "".into(); + }; + return serde_json::to_string(&remote_desc).unwrap_or_default(); + } + return serde_json::to_string(&local_desc).unwrap_or_default(); + } + "".into() + } + pub async fn new( remote_endpoint: &str, ms_timeout: u64, ) -> ResultType { - log::debug!("New webrtc stream with endpoint: {}", remote_endpoint); + log::debug!("New webrtc stream to endpoint: {}", remote_endpoint); let remote_offer = if remote_endpoint.is_empty() { "".into() } else { @@ -116,35 +128,61 @@ impl WebRTCStream { }; let (notify_tx, notify_rx) = watch::channel(false); - let on_open_notify = notify_tx.clone(); + let dc_open_notify = notify_tx.clone(); // Create a new RTCPeerConnection - let peer_connection = Arc::new(api.new_peer_connection(config).await?); - let data_channel = peer_connection.create_data_channel("bootstrap", None).await?; - data_channel.on_open(Box::new(move || { - log::debug!("Data channel bootstrap open."); - let _ = on_open_notify.send(true); + let pc = Arc::new(api.new_peer_connection(config).await?); + let bootstrap_dc = if remote_offer.is_empty() { + // Create a data channel with label "bootstrap" + pc.create_data_channel("bootstrap", None).await? + } else { + // Wait for the data channel to be created by the remote peer + // Here we create a dummy data channel to satisfy the type system + Arc::new(RTCDataChannel::default()) + }; + bootstrap_dc.on_open(Box::new(move || { + log::debug!("Local data channel bootstrap open."); + let _ = dc_open_notify.send(true); Box::pin(async {}) })); + let stream = Arc::new(Mutex::new(bootstrap_dc.clone())); + // This will notify you when the peer has connected/disconnected let on_connection_notify = notify_tx.clone(); - peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { - log::debug!("Peer Connection State has changed: {}", s); - if s == RTCPeerConnectionState::Disconnected { - let _ = on_connection_notify.send(true); - } - - // TODO clear SESSIONS entry? - Box::pin(async {}) + let stream_for_close = stream.clone(); + let pc_for_close = pc.clone(); + pc.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { + let stream_for_close2 = stream_for_close.clone(); + let on_connection_notify2 = on_connection_notify.clone(); + let pc_for_close2 = pc_for_close.clone(); + Box::pin(async move { + log::debug!("Peer connection state : {}", s); + if s == RTCPeerConnectionState::Disconnected { + let _ = on_connection_notify2.send(true); + log::debug!("WebRTC session closing due to disconnected"); + let _ = stream_for_close2.lock().await.close().await; + log::debug!("WebRTC session stream closed"); + } else if s == RTCPeerConnectionState::Failed || s == RTCPeerConnectionState::Closed { + let mut lock = SESSIONS.lock().await; + let key = WebRTCStream::get_key_for_peer(&pc_for_close2).await; + log::debug!("WebRTC session removing key from cache: {}", key); + lock.remove(&key); + } + }) })); // Register data channel creation handling - let on_open_notify2 = notify_tx.clone(); - peer_connection.on_data_channel(Box::new(move |dc: Arc| { + let remote_dc_open_notify = notify_tx.clone(); + let stream_for_dc = stream.clone(); + pc.on_data_channel(Box::new(move |dc: Arc| { let d_label = dc.label().to_owned(); - log::debug!("Remote data channel {}", d_label); - let notify = on_open_notify2.clone(); + let notify = remote_dc_open_notify.clone(); + let stream_for_dc_clone = stream_for_dc.clone(); + log::debug!("Remote data channel {} ready", d_label); Box::pin(async move { + let mut stream_lock = stream_for_dc_clone.lock().await; + *stream_lock = dc.clone(); + drop(stream_lock); dc.on_open(Box::new(move || { let _ = notify.send(true); Box::pin(async {}) @@ -152,30 +190,28 @@ impl WebRTCStream { }) })); + // process offer/answer if remote_offer.is_empty() { - let sdp = peer_connection.create_offer(None).await?; - let mut gather_complete = peer_connection.gathering_complete_promise().await; - peer_connection.set_local_description(sdp.clone()).await?; + let sdp = pc.create_offer(None).await?; + let mut gather_complete = pc.gathering_complete_promise().await; + pc.set_local_description(sdp.clone()).await?; let _ = gather_complete.recv().await; - let final_sdp = peer_connection.local_description().await.ok_or_else(|| { - Error::new(ErrorKind::Other, "Failed to get local description after gathering") - })?; - key = serde_json::to_string(&final_sdp).unwrap_or_default(); + key = Self::get_key_for_peer(&pc).await; log::debug!("Start webrtc with local: {}", key); } else { let sdp = serde_json::from_str::(&remote_offer)?; - peer_connection.set_remote_description(sdp).await?; - let answer = peer_connection.create_answer(None).await?; - let mut gather_complete = peer_connection.gathering_complete_promise().await; - peer_connection.set_local_description(answer).await?; + pc.set_remote_description(sdp).await?; + let answer = pc.create_answer(None).await?; + let mut gather_complete = pc.gathering_complete_promise().await; + pc.set_local_description(answer).await?; let _ = gather_complete.recv().await; log::debug!("Start webrtc with remote: {}", remote_offer); } let webrtc_stream = WebRTCStream { - pc: peer_connection, - stream: data_channel, + pc, + stream, state_notify: notify_rx, send_timeout: ms_timeout, }; @@ -185,14 +221,13 @@ impl WebRTCStream { } #[inline] - pub async fn get_local_endpoint(&self) -> Option { + pub async fn get_local_endpoint(&self) -> ResultType { if let Some(local_desc) = self.pc.local_description().await { let sdp = serde_json::to_string(&local_desc).unwrap_or_default(); let endpoint = Self::sdp_to_endpoint(&sdp); - log::debug!("WebRTC get local endpoint: {}", endpoint); - Some(endpoint) + Ok(endpoint) } else { - None + Err(anyhow::anyhow!("Local description is not set")) } } @@ -240,29 +275,33 @@ impl WebRTCStream { self.send_bytes(Bytes::from(msg)).await } - pub async fn send_bytes(&mut self, bytes: Bytes) -> ResultType<()> { + #[inline] + async fn wait_for_connect_result(&mut self) { + if *self.state_notify.borrow() { + return; + } let _ = self.state_notify.changed().await; - self.stream.send(&bytes).await?; + } + + pub async fn send_bytes(&mut self, bytes: Bytes) -> ResultType<()> { + self.wait_for_connect_result().await; + let stream = self.stream.lock().await.clone(); + stream.send(&bytes).await?; Ok(()) } #[inline] pub async fn next(&mut self) -> Option> { - // wait for connected or disconnected - let _ = self.state_notify.changed().await; - if self.stream.ready_state() != RTCDataChannelState::Open { - return Some(Err(Error::new( - ErrorKind::Other, - "data channel is closed", - ))); - } + self.wait_for_connect_result().await; + let stream = self.stream.lock().await.clone(); // TODO reuse buffer? let mut buffer = BytesMut::zeroed(DATA_CHANNEL_BUFFER_SIZE as usize); - let dc = self.stream.detach().await.ok()?; + let dc = stream.detach().await.ok()?; let n = match dc.read(&mut buffer).await { Ok(n) => n, Err(err) => { + self.pc.close().await.ok(); return Some(Err(Error::new( ErrorKind::Other, format!("data channel read error: {}", err), @@ -270,12 +309,12 @@ impl WebRTCStream { } }; if n == 0 { + self.pc.close().await.ok(); return Some(Err(Error::new( ErrorKind::Other, "data channel read exited with 0 bytes", ))); } - log::debug!("WebRTCStream read {} bytes", n); buffer.truncate(n); Some(Ok(buffer)) } From 3e1e58747af2d272d01c64a1c2a11e828de1faf6 Mon Sep 17 00:00:00 2001 From: RustDesk <71636191+rustdesk@users.noreply.github.com> Date: Thu, 13 Nov 2025 22:15:04 +0800 Subject: [PATCH 04/24] Update Cargo.toml --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 3332b243f..7e74c7d20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["webrtc"] +default = [] webrtc = ["dep:webrtc"] [dependencies] From f5f78c84d552f35f86410b2271944e74e44fe612 Mon Sep 17 00:00:00 2001 From: lc Date: Thu, 13 Nov 2025 23:06:40 +0800 Subject: [PATCH 05/24] remove unwraps --- examples/webrtc.rs | 14 ++++++++------ src/webrtc.rs | 14 +++++++++++++- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/examples/webrtc.rs b/examples/webrtc.rs index 317a7f502..3b9162c75 100644 --- a/examples/webrtc.rs +++ b/examples/webrtc.rs @@ -102,13 +102,15 @@ async fn read_loop(mut stream: hbb_common::Stream) -> Result<()> { println!("WebRTC stream closed; Exit the read_loop"); return Ok(()); }; - if res.is_err() { - println!("WebRTC stream read error: {}; Exit the read_loop", res.err().unwrap()); - return Ok(()); + match res { + Err(e) => { + println!("WebRTC stream read error: {}; Exit the read_loop", e); + return Ok(()); + } + Ok(data) => { + println!("Message from stream: {}", String::from_utf8(data.to_vec())?); + } } - println!("Message from stream: {}", - String::from_utf8(res.unwrap().to_vec())? - ); } } diff --git a/src/webrtc.rs b/src/webrtc.rs index 3a3ed011b..e80597075 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -105,7 +105,19 @@ impl WebRTCStream { let contains = lock.contains_key(&key); if contains { log::debug!("Start webrtc with cached peer"); - return Ok(lock.get(&key).unwrap().clone()); + return Ok(lock[&key].clone()); + } + // ...existing code... + Self::get_remote_offer(remote_endpoint)? + }; + + let key = remote_offer.clone(); + let mut lock = SESSIONS.lock().await; + if let Some(cached_stream) = lock.get(&key) { + if !key.is_empty() { + log::debug!("Start webrtc with cached peer"); + return Ok(cached_stream.clone()); + } } // Create a SettingEngine and enable Detach From f9e70f3d46fcf413af76714475a9d79a5003a588 Mon Sep 17 00:00:00 2001 From: lc Date: Thu, 13 Nov 2025 23:11:33 +0800 Subject: [PATCH 06/24] remove typo --- src/webrtc.rs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/webrtc.rs b/src/webrtc.rs index e80597075..dc1e62167 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -100,17 +100,6 @@ impl WebRTCStream { Self::get_remote_offer(remote_endpoint)? }; - let mut key = remote_offer.clone(); - let mut lock = SESSIONS.lock().await; - let contains = lock.contains_key(&key); - if contains { - log::debug!("Start webrtc with cached peer"); - return Ok(lock[&key].clone()); - } - // ...existing code... - Self::get_remote_offer(remote_endpoint)? - }; - let key = remote_offer.clone(); let mut lock = SESSIONS.lock().await; if let Some(cached_stream) = lock.get(&key) { From c406111c668a8f50f339a319ff7bb7acdcd56bd1 Mon Sep 17 00:00:00 2001 From: lc Date: Thu, 13 Nov 2025 23:22:41 +0800 Subject: [PATCH 07/24] make compiler happy --- src/webrtc_dummy.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/webrtc_dummy.rs b/src/webrtc_dummy.rs index a6f9344ba..7e2e142f8 100644 --- a/src/webrtc_dummy.rs +++ b/src/webrtc_dummy.rs @@ -13,8 +13,32 @@ pub struct WebRTCStream { // mock struct } +impl Clone for WebRTCStream { + fn clone(&self) -> Self { + WebRTCStream { + } + } +} + impl WebRTCStream { + pub async fn new( + _: &str, + _: u64, + ) -> ResultType { + Ok(Self {}) + } + + #[inline] + pub async fn get_local_endpoint(&self) -> ResultType { + Ok(String::new()) + } + + #[inline] + pub async fn set_remote_endpoint(&self, _: &str) -> ResultType<()> { + Ok(()) + } + #[inline] pub fn set_raw(&mut self) { } From 67ad83a2b2924fa891693fbb0d3b94cad4fff22d Mon Sep 17 00:00:00 2001 From: lc Date: Thu, 13 Nov 2025 23:25:53 +0800 Subject: [PATCH 08/24] fix webrtc example when webrtc disabled --- examples/webrtc.rs | 2 +- src/webrtc.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/webrtc.rs b/examples/webrtc.rs index 3b9162c75..b7f3f0665 100644 --- a/examples/webrtc.rs +++ b/examples/webrtc.rs @@ -63,7 +63,7 @@ async fn main() -> Result<()> { // Wait for the answer to be pasted println!( "Start new terminal run: \n{} \ncopy remote endpoint and paste here", - format!("cargo r --example webrtc -- --offer {}", local_endpoint) + format!("cargo r --features webrtc --example webrtc -- --offer {}", local_endpoint) ); // readline blocking let line = std::io::stdin() diff --git a/src/webrtc.rs b/src/webrtc.rs index dc1e62167..597acf363 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -100,7 +100,7 @@ impl WebRTCStream { Self::get_remote_offer(remote_endpoint)? }; - let key = remote_offer.clone(); + let mut key = remote_offer.clone(); let mut lock = SESSIONS.lock().await; if let Some(cached_stream) = lock.get(&key) { if !key.is_empty() { From f8d1d4207d6266509026371b4d98f67dca43a02f Mon Sep 17 00:00:00 2001 From: lc Date: Fri, 14 Nov 2025 01:09:08 +0800 Subject: [PATCH 09/24] move out dummy webrtc mod --- examples/webrtc.rs | 22 +++++++--- examples/webrtc_dummy.rs | 46 ++++++++++++++++++++ src/lib.rs | 4 -- src/stream.rs | 15 ++++++- src/webrtc_dummy.rs | 91 ---------------------------------------- 5 files changed, 77 insertions(+), 101 deletions(-) create mode 100644 examples/webrtc_dummy.rs delete mode 100644 src/webrtc_dummy.rs diff --git a/examples/webrtc.rs b/examples/webrtc.rs index b7f3f0665..40d4cdfbd 100644 --- a/examples/webrtc.rs +++ b/examples/webrtc.rs @@ -1,5 +1,12 @@ extern crate hbb_common; +#[cfg(feature = "webrtc")] +use hbb_common::webrtc::WebRTCStream; +#[cfg(not(feature = "webrtc"))] +mod webrtc_dummy; +#[cfg(not(feature = "webrtc"))] +use crate::webrtc_dummy::WebRTCStream; + use std::io::Write; use bytes::Bytes; @@ -11,6 +18,11 @@ use webrtc::peer_connection::math_rand_alpha; #[tokio::main] async fn main() -> Result<()> { + #[cfg(not(feature = "webrtc"))] + if true { + println!("The webrtc feature is not enabled. Please enable the webrtc feature to run this example."); + return Ok(()); + } let app = Command::new("webrtc-stream") .about("An example of webrtc stream using hbb_common and webrtc-rs") .arg( @@ -54,7 +66,7 @@ async fn main() -> Result<()> { "".to_string() }; - let webrtc_stream = hbb_common::webrtc::WebRTCStream::new(&remote_endpoint, 30000).await?; + let webrtc_stream = WebRTCStream::new(&remote_endpoint, 30000).await?; // Print the offer to be sent to the other peer let local_endpoint = webrtc_stream.get_local_endpoint().await?; @@ -75,12 +87,12 @@ async fn main() -> Result<()> { println!("Copy local endpoint and paste to the other peer: \n{}", local_endpoint); } - let s1 = hbb_common::Stream::WebRTC(webrtc_stream.clone()); + let s1 = webrtc_stream.clone(); tokio::spawn(async move { let _ = read_loop(s1).await; }); - let s2 = hbb_common::Stream::WebRTC(webrtc_stream.clone()); + let s2 = webrtc_stream.clone(); tokio::spawn(async move { let _ = write_loop(s2).await; }); @@ -96,7 +108,7 @@ async fn main() -> Result<()> { } // read_loop shows how to read from the datachannel directly -async fn read_loop(mut stream: hbb_common::Stream) -> Result<()> { +async fn read_loop(mut stream: WebRTCStream) -> Result<()> { loop { let Some(res) = stream.next().await else { println!("WebRTC stream closed; Exit the read_loop"); @@ -115,7 +127,7 @@ async fn read_loop(mut stream: hbb_common::Stream) -> Result<()> { } // write_loop shows how to write to the webrtc stream directly -async fn write_loop(mut stream: hbb_common::Stream) -> Result<()> { +async fn write_loop(mut stream: WebRTCStream) -> Result<()> { let mut result = Result::<()>::Ok(()); while result.is_ok() { let timeout = tokio::time::sleep(Duration::from_secs(5)); diff --git a/examples/webrtc_dummy.rs b/examples/webrtc_dummy.rs new file mode 100644 index 000000000..cd8d1b0d6 --- /dev/null +++ b/examples/webrtc_dummy.rs @@ -0,0 +1,46 @@ +use std::io::Error; + +use bytes::BytesMut; + +use hbb_common::ResultType; + +pub struct WebRTCStream { + // mock struct +} + +impl Clone for WebRTCStream { + fn clone(&self) -> Self { + WebRTCStream { + } + } +} + +impl WebRTCStream { + + pub async fn new( + _: &str, + _: u64, + ) -> ResultType { + Ok(Self {}) + } + + #[inline] + pub async fn get_local_endpoint(&self) -> ResultType { + Ok(String::new()) + } + + #[inline] + pub async fn set_remote_endpoint(&self, _: &str) -> ResultType<()> { + Ok(()) + } + + #[inline] + pub async fn send_bytes(&mut self, _: bytes::Bytes) -> ResultType<()> { + Ok(()) + } + + #[inline] + pub async fn next(&mut self) -> Option> { + None + } +} diff --git a/src/lib.rs b/src/lib.rs index 5d3e60007..372f4caa7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,10 +61,6 @@ pub mod stream; pub mod websocket; #[cfg(feature = "webrtc")] pub mod webrtc; -#[cfg(not(feature = "webrtc"))] -pub mod webrtc_dummy; -#[cfg(not(feature = "webrtc"))] -pub use webrtc_dummy as webrtc; #[cfg(any(target_os = "android", target_os = "ios"))] pub use rustls_platform_verifier; pub use stream::Stream; diff --git a/src/stream.rs b/src/stream.rs index 16db2c5e1..6a5ac858d 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,10 +1,13 @@ -use crate::{config, tcp, websocket, webrtc, ResultType}; +use crate::{config, tcp, websocket, ResultType}; +#[cfg(feature = "webrtc")] +use crate::webrtc; use sodiumoxide::crypto::secretbox::Key; use std::net::SocketAddr; use tokio::net::TcpStream; // support Websocket and tcp. pub enum Stream { + #[cfg(feature = "webrtc")] WebRTC(webrtc::WebRTCStream), WebSocket(websocket::WsFramedStream), Tcp(tcp::FramedStream), @@ -14,6 +17,7 @@ impl Stream { #[inline] pub fn set_send_timeout(&mut self, ms: u64) { match self { + #[cfg(feature = "webrtc")] Stream::WebRTC(s) => s.set_send_timeout(ms), Stream::WebSocket(s) => s.set_send_timeout(ms), Stream::Tcp(s) => s.set_send_timeout(ms), @@ -23,6 +27,7 @@ impl Stream { #[inline] pub fn set_raw(&mut self) { match self { + #[cfg(feature = "webrtc")] Stream::WebRTC(s) => s.set_raw(), Stream::WebSocket(s) => s.set_raw(), Stream::Tcp(s) => s.set_raw(), @@ -32,6 +37,7 @@ impl Stream { #[inline] pub async fn send_bytes(&mut self, bytes: bytes::Bytes) -> ResultType<()> { match self { + #[cfg(feature = "webrtc")] Stream::WebRTC(s) => s.send_bytes(bytes).await, Stream::WebSocket(s) => s.send_bytes(bytes).await, Stream::Tcp(s) => s.send_bytes(bytes).await, @@ -41,6 +47,7 @@ impl Stream { #[inline] pub async fn send_raw(&mut self, bytes: Vec) -> ResultType<()> { match self { + #[cfg(feature = "webrtc")] Stream::WebRTC(s) => s.send_raw(bytes).await, Stream::WebSocket(s) => s.send_raw(bytes).await, Stream::Tcp(s) => s.send_raw(bytes).await, @@ -50,6 +57,7 @@ impl Stream { #[inline] pub fn set_key(&mut self, key: Key) { match self { + #[cfg(feature = "webrtc")] Stream::WebRTC(s) => s.set_key(key), Stream::WebSocket(s) => s.set_key(key), Stream::Tcp(s) => s.set_key(key), @@ -59,6 +67,7 @@ impl Stream { #[inline] pub fn is_secured(&self) -> bool { match self { + #[cfg(feature = "webrtc")] Stream::WebRTC(s) => s.is_secured(), Stream::WebSocket(s) => s.is_secured(), Stream::Tcp(s) => s.is_secured(), @@ -71,6 +80,7 @@ impl Stream { timeout: u64, ) -> Option> { match self { + #[cfg(feature = "webrtc")] Stream::WebRTC(s) => s.next_timeout(timeout).await, Stream::WebSocket(s) => s.next_timeout(timeout).await, Stream::Tcp(s) => s.next_timeout(timeout).await, @@ -95,6 +105,7 @@ impl Stream { #[inline] pub async fn send(&mut self, msg: &impl protobuf::Message) -> ResultType<()> { match self { + #[cfg(feature = "webrtc")] Self::WebRTC(s) => s.send(msg).await, Self::WebSocket(ws) => ws.send(msg).await, Self::Tcp(tcp) => tcp.send(msg).await, @@ -105,6 +116,7 @@ impl Stream { #[inline] pub async fn next(&mut self) -> Option> { match self { + #[cfg(feature = "webrtc")] Self::WebRTC(s) => s.next().await, Self::WebSocket(ws) => ws.next().await, Self::Tcp(tcp) => tcp.next().await, @@ -114,6 +126,7 @@ impl Stream { #[inline] pub fn local_addr(&self) -> SocketAddr { match self { + #[cfg(feature = "webrtc")] Self::WebRTC(s) => s.local_addr(), Self::WebSocket(ws) => ws.local_addr(), Self::Tcp(tcp) => tcp.local_addr(), diff --git a/src/webrtc_dummy.rs b/src/webrtc_dummy.rs deleted file mode 100644 index 7e2e142f8..000000000 --- a/src/webrtc_dummy.rs +++ /dev/null @@ -1,91 +0,0 @@ -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::io::Error; - -use bytes::{Bytes, BytesMut}; - -use crate::{ - protobuf::Message, - sodiumoxide::crypto::secretbox::Key, - ResultType, -}; - -pub struct WebRTCStream { - // mock struct -} - -impl Clone for WebRTCStream { - fn clone(&self) -> Self { - WebRTCStream { - } - } -} - -impl WebRTCStream { - - pub async fn new( - _: &str, - _: u64, - ) -> ResultType { - Ok(Self {}) - } - - #[inline] - pub async fn get_local_endpoint(&self) -> ResultType { - Ok(String::new()) - } - - #[inline] - pub async fn set_remote_endpoint(&self, _: &str) -> ResultType<()> { - Ok(()) - } - - #[inline] - pub fn set_raw(&mut self) { - } - - #[inline] - pub fn local_addr(&self) -> SocketAddr { - SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0) - } - - #[inline] - pub fn set_send_timeout(&mut self, _ms: u64) { - } - - #[inline] - pub fn set_key(&mut self, _key: Key) { - } - - #[inline] - pub fn is_secured(&self) -> bool { - false - } - - #[inline] - pub async fn send(&mut self, _msg: &impl Message) -> ResultType<()> { - Ok(()) - } - - #[inline] - pub async fn send_raw(&mut self, _msg: Vec) -> ResultType<()> { - Ok(()) - } - - pub async fn send_bytes(&mut self, _bytes: Bytes) -> ResultType<()> { - Ok(()) - } - - #[inline] - pub async fn next(&mut self) -> Option> { - None - } - - #[inline] - pub async fn next_timeout(&mut self, _ms: u64) -> Option> { - None - } -} - -pub fn is_webrtc_endpoint(_endpoint: &str) -> bool { - false -} From 47dc73de1e68248699acfa524766f317be2f91d7 Mon Sep 17 00:00:00 2001 From: RustDesk <71636191+rustdesk@users.noreply.github.com> Date: Fri, 14 Nov 2025 11:46:59 +0800 Subject: [PATCH 10/24] Update src/webrtc.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/webrtc.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/webrtc.rs b/src/webrtc.rs index 597acf363..6046b7ea9 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -259,6 +259,8 @@ impl WebRTCStream { #[inline] pub fn set_key(&mut self, _key: Key) { // not-supported + // WebRTC uses built-in DTLS encryption for secure communication. + // DTLS handles key exchange and encryption automatically, so explicit key management is not required. } #[inline] From 5dcfea1ee4950c1076fffaec7de86ce6c7fc5e8f Mon Sep 17 00:00:00 2001 From: lc Date: Fri, 14 Nov 2025 16:02:51 +0800 Subject: [PATCH 11/24] support send_timeout --- examples/webrtc.rs | 16 +++++-- src/webrtc.rs | 113 ++++++++++++++++++++++++++++----------------- 2 files changed, 81 insertions(+), 48 deletions(-) diff --git a/examples/webrtc.rs b/examples/webrtc.rs index 40d4cdfbd..e9af42383 100644 --- a/examples/webrtc.rs +++ b/examples/webrtc.rs @@ -8,10 +8,10 @@ mod webrtc_dummy; use crate::webrtc_dummy::WebRTCStream; use std::io::Write; -use bytes::Bytes; -use clap::{Arg, Command}; use anyhow::Result; +use bytes::Bytes; +use clap::{Arg, Command}; use tokio::time::Duration; use webrtc::peer_connection::math_rand_alpha; @@ -75,7 +75,10 @@ async fn main() -> Result<()> { // Wait for the answer to be pasted println!( "Start new terminal run: \n{} \ncopy remote endpoint and paste here", - format!("cargo r --features webrtc --example webrtc -- --offer {}", local_endpoint) + format!( + "cargo r --features webrtc --example webrtc -- --offer {}", + local_endpoint + ) ); // readline blocking let line = std::io::stdin() @@ -84,7 +87,10 @@ async fn main() -> Result<()> { .ok_or_else(|| anyhow::anyhow!("No input received"))??; webrtc_stream.set_remote_endpoint(&line).await?; } else { - println!("Copy local endpoint and paste to the other peer: \n{}", local_endpoint); + println!( + "Copy local endpoint and paste to the other peer: \n{}", + local_endpoint + ); } let s1 = webrtc_stream.clone(); @@ -144,4 +150,4 @@ async fn write_loop(mut stream: WebRTCStream) -> Result<()> { println!("WebRTC stream write failed; Exit the write_loop"); Ok(()) -} \ No newline at end of file +} diff --git a/src/webrtc.rs b/src/webrtc.rs index 6046b7ea9..8ffb91ba7 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -1,31 +1,29 @@ -use std::sync::Arc; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::io::{Error, ErrorKind}; -use std::time::Duration; use std::collections::HashMap; +use std::io::{Error, ErrorKind}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::Arc; +use std::time::Duration; -use webrtc::api::APIBuilder; use webrtc::api::setting_engine::SettingEngine; +use webrtc::api::APIBuilder; use webrtc::data_channel::RTCDataChannel; +use webrtc::ice::mdns::MulticastDnsMode; use webrtc::ice_transport::ice_server::RTCIceServer; -use webrtc::peer_connection::RTCPeerConnection; use webrtc::peer_connection::configuration::RTCConfiguration; use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState; use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; -use webrtc::ice::mdns::MulticastDnsMode; +use webrtc::peer_connection::RTCPeerConnection; -use crate::{ - protobuf::Message, - sodiumoxide::crypto::secretbox::Key, - ResultType, -}; - -use base64::Engine; use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; +use base64::Engine; use bytes::{Bytes, BytesMut}; -use tokio::time::timeout; use tokio::sync::watch; use tokio::sync::Mutex; +use tokio::time::timeout; + +use crate::protobuf::Message; +use crate::sodiumoxide::crypto::secretbox::Key; +use crate::ResultType; pub struct WebRTCStream { pc: Arc, @@ -53,29 +51,36 @@ impl Clone for WebRTCStream { } impl WebRTCStream { - - pub fn get_remote_offer(endpoint: &str) -> ResultType { + #[inline] + fn get_remote_offer(endpoint: &str) -> ResultType { // Ensure the endpoint starts with the "webrtc://" prefix if !endpoint.starts_with("webrtc://") { - return Err(Error::new(ErrorKind::InvalidInput, "Invalid WebRTC endpoint format").into()); + return Err( + Error::new(ErrorKind::InvalidInput, "Invalid WebRTC endpoint format").into(), + ); } // Extract the Base64-encoded SDP part let encoded_sdp = &endpoint["webrtc://".len()..]; // Decode the Base64 string - let decoded_bytes = BASE64_STANDARD.decode(encoded_sdp).map_err(|_| - Error::new(ErrorKind::InvalidInput, "Failed to decode Base64 SDP") - )?; + let decoded_bytes = BASE64_STANDARD + .decode(encoded_sdp) + .map_err(|_| Error::new(ErrorKind::InvalidInput, "Failed to decode Base64 SDP"))?; Ok(String::from_utf8(decoded_bytes).map_err(|_| { - Error::new(ErrorKind::InvalidInput, "Failed to convert decoded bytes to UTF-8") + Error::new( + ErrorKind::InvalidInput, + "Failed to convert decoded bytes to UTF-8", + ) })?) } - pub fn sdp_to_endpoint(sdp: &str) -> String { + #[inline] + fn sdp_to_endpoint(sdp: &str) -> String { let encoded_sdp = BASE64_STANDARD.encode(sdp); format!("webrtc://{}", encoded_sdp) } + #[inline] async fn get_key_for_peer(pc: &Arc) -> String { if let Some(local_desc) = pc.local_description().await { if local_desc.sdp_type != webrtc::peer_connection::sdp::sdp_type::RTCSdpType::Offer { @@ -89,10 +94,7 @@ impl WebRTCStream { "".into() } - pub async fn new( - remote_endpoint: &str, - ms_timeout: u64, - ) -> ResultType { + pub async fn new(remote_endpoint: &str, ms_timeout: u64) -> ResultType { log::debug!("New webrtc stream to endpoint: {}", remote_endpoint); let remote_offer = if remote_endpoint.is_empty() { "".into() @@ -115,9 +117,7 @@ impl WebRTCStream { s.set_ice_multicast_dns_mode(MulticastDnsMode::Disabled); // Create the API object - let api = APIBuilder::new() - .with_setting_engine(s) - .build(); + let api = APIBuilder::new().with_setting_engine(s).build(); // Prepare the configuration let config = RTCConfiguration { @@ -158,16 +158,25 @@ impl WebRTCStream { let pc_for_close2 = pc_for_close.clone(); Box::pin(async move { log::debug!("Peer connection state : {}", s); - if s == RTCPeerConnectionState::Disconnected { - let _ = on_connection_notify2.send(true); - log::debug!("WebRTC session closing due to disconnected"); - let _ = stream_for_close2.lock().await.close().await; - log::debug!("WebRTC session stream closed"); - } else if s == RTCPeerConnectionState::Failed || s == RTCPeerConnectionState::Closed { - let mut lock = SESSIONS.lock().await; - let key = WebRTCStream::get_key_for_peer(&pc_for_close2).await; - log::debug!("WebRTC session removing key from cache: {}", key); - lock.remove(&key); + match s { + RTCPeerConnectionState::Disconnected + | RTCPeerConnectionState::Failed + | RTCPeerConnectionState::Closed => { + let _ = on_connection_notify2.send(true); + log::debug!("WebRTC session closing due to disconnected"); + let _ = stream_for_close2.lock().await.close().await; + log::debug!("WebRTC session stream closed"); + + let mut lock = SESSIONS.lock().await; + let key = WebRTCStream::get_key_for_peer(&pc_for_close2).await; + lock.remove(&key); + log::debug!( + "WebRTC session removed key from cache: {} current len: {}", + key, + lock.len() + ); + } + _ => {} } }) })); @@ -287,7 +296,26 @@ impl WebRTCStream { } pub async fn send_bytes(&mut self, bytes: Bytes) -> ResultType<()> { - self.wait_for_connect_result().await; + if self.send_timeout > 0 { + match timeout( + Duration::from_millis(self.send_timeout), + self.wait_for_connect_result(), + ) + .await + { + Ok(_) => {} + Err(_) => { + self.pc.close().await.ok(); + return Err(Error::new( + ErrorKind::TimedOut, + "WebRTC send wait for connect timeout", + ) + .into()); + } + } + } else { + self.wait_for_connect_result().await; + } let stream = self.stream.lock().await.clone(); stream.send(&bytes).await?; Ok(()) @@ -339,6 +367,5 @@ pub fn is_webrtc_endpoint(endpoint: &str) -> bool { #[cfg(test)] mod tests { #[test] - fn test_dc() { - } + fn test_dc() {} } From 6463ba0e5241e3988e645a83c1938bad6e8890fa Mon Sep 17 00:00:00 2001 From: lichon Date: Fri, 14 Nov 2025 16:05:38 +0800 Subject: [PATCH 12/24] Update examples/webrtc_dummy.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- examples/webrtc_dummy.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/webrtc_dummy.rs b/examples/webrtc_dummy.rs index cd8d1b0d6..88ac9beb2 100644 --- a/examples/webrtc_dummy.rs +++ b/examples/webrtc_dummy.rs @@ -4,6 +4,8 @@ use bytes::BytesMut; use hbb_common::ResultType; +/// Dummy implementation of WebRTCStream used when the `webrtc` feature is disabled. +/// This struct allows the code to compile and run without actual WebRTC functionality. pub struct WebRTCStream { // mock struct } From 955e49dc4b8365b7ad27752119e5175f4e1a5f35 Mon Sep 17 00:00:00 2001 From: lc Date: Fri, 14 Nov 2025 20:32:32 +0800 Subject: [PATCH 13/24] use webrtc sdp fingerprint as session key --- examples/webrtc_dummy.rs | 11 +- src/webrtc.rs | 327 ++++++++++++++++++++++++++++++++------- 2 files changed, 274 insertions(+), 64 deletions(-) diff --git a/examples/webrtc_dummy.rs b/examples/webrtc_dummy.rs index 88ac9beb2..78d549034 100644 --- a/examples/webrtc_dummy.rs +++ b/examples/webrtc_dummy.rs @@ -12,17 +12,12 @@ pub struct WebRTCStream { impl Clone for WebRTCStream { fn clone(&self) -> Self { - WebRTCStream { - } + WebRTCStream {} } } impl WebRTCStream { - - pub async fn new( - _: &str, - _: u64, - ) -> ResultType { + pub async fn new(_: &str, _: u64) -> ResultType { Ok(Self {}) } @@ -46,3 +41,5 @@ impl WebRTCStream { None } } + +fn main() {} diff --git a/src/webrtc.rs b/src/webrtc.rs index 8ffb91ba7..7fda0775d 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -81,17 +81,46 @@ impl WebRTCStream { } #[inline] - async fn get_key_for_peer(pc: &Arc) -> String { - if let Some(local_desc) = pc.local_description().await { - if local_desc.sdp_type != webrtc::peer_connection::sdp::sdp_type::RTCSdpType::Offer { - let Some(remote_desc) = pc.remote_description().await else { - return "".into(); - }; - return serde_json::to_string(&remote_desc).unwrap_or_default(); + fn get_key_for_sdp(sdp: &RTCSessionDescription) -> ResultType { + let binding = sdp.unmarshal()?; + let Some(fingerprint) = binding.attribute("fingerprint") else { + // find fingerprint attribute in media descriptions + for media in &binding.media_descriptions { + if media.media_name.media != "application" { + continue; + } + if let Some(fp) = media + .attributes + .iter() + .find(|x| x.key == "fingerprint") + .and_then(|x| x.value.clone()) + { + return Ok(fp); + } } - return serde_json::to_string(&local_desc).unwrap_or_default(); + return Err(anyhow::anyhow!("SDP fingerprint attribute not found")); + }; + Ok(fingerprint.to_string()) + } + + #[inline] + fn get_key_for_sdp_json(sdp_json: &str) -> ResultType { + if sdp_json.is_empty() { + return Ok("".to_string()); } - "".into() + let sdp = serde_json::from_str::(&sdp_json)?; + Self::get_key_for_sdp(&sdp) + } + + #[inline] + async fn get_key_for_peer(pc: &Arc, is_local: bool) -> ResultType { + let Some(desc) = (match is_local { + true => pc.local_description().await, + false => pc.remote_description().await, + }) else { + return Err(anyhow::anyhow!("PeerConnection description is not set")); + }; + Self::get_key_for_sdp(&desc) } pub async fn new(remote_endpoint: &str, ms_timeout: u64) -> ResultType { @@ -102,7 +131,7 @@ impl WebRTCStream { Self::get_remote_offer(remote_endpoint)? }; - let mut key = remote_offer.clone(); + let mut key = Self::get_key_for_sdp_json(&remote_offer)?; let mut lock = SESSIONS.lock().await; if let Some(cached_stream) = lock.get(&key) { if !key.is_empty() { @@ -128,98 +157,110 @@ impl WebRTCStream { ..Default::default() }; + let start_local_offer = remote_offer.is_empty(); let (notify_tx, notify_rx) = watch::channel(false); - let dc_open_notify = notify_tx.clone(); // Create a new RTCPeerConnection let pc = Arc::new(api.new_peer_connection(config).await?); - let bootstrap_dc = if remote_offer.is_empty() { + let bootstrap_dc = if start_local_offer { + let dc_open_notify = notify_tx.clone(); // Create a data channel with label "bootstrap" - pc.create_data_channel("bootstrap", None).await? + let dc = pc.create_data_channel("bootstrap", None).await?; + dc.on_open(Box::new(move || { + log::debug!("Local data channel bootstrap open."); + let _ = dc_open_notify.send(true); + Box::pin(async {}) + })); + dc } else { // Wait for the data channel to be created by the remote peer // Here we create a dummy data channel to satisfy the type system Arc::new(RTCDataChannel::default()) }; - bootstrap_dc.on_open(Box::new(move || { - log::debug!("Local data channel bootstrap open."); - let _ = dc_open_notify.send(true); - Box::pin(async {}) - })); - let stream = Arc::new(Mutex::new(bootstrap_dc.clone())); + let stream = Arc::new(Mutex::new(bootstrap_dc)); + if !start_local_offer { + // Register data channel creation handling + let dc_open_notify = notify_tx.clone(); + let stream_for_dc = stream.clone(); + pc.on_data_channel(Box::new(move |dc: Arc| { + let d_label = dc.label().to_owned(); + let dc_open_notify2 = dc_open_notify.clone(); + let stream_for_dc_clone = stream_for_dc.clone(); + log::debug!("Remote data channel {} ready", d_label); + Box::pin(async move { + let mut stream_lock = stream_for_dc_clone.lock().await; + *stream_lock = dc.clone(); + drop(stream_lock); + dc.on_open(Box::new(move || { + let _ = dc_open_notify2.send(true); + Box::pin(async {}) + })); + }) + })); + } // This will notify you when the peer has connected/disconnected - let on_connection_notify = notify_tx.clone(); let stream_for_close = stream.clone(); let pc_for_close = pc.clone(); pc.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { let stream_for_close2 = stream_for_close.clone(); - let on_connection_notify2 = on_connection_notify.clone(); + let on_connection_notify = notify_tx.clone(); let pc_for_close2 = pc_for_close.clone(); Box::pin(async move { - log::debug!("Peer connection state : {}", s); + log::debug!("WebRTC session peer connection state: {}", s); match s { RTCPeerConnectionState::Disconnected | RTCPeerConnectionState::Failed | RTCPeerConnectionState::Closed => { - let _ = on_connection_notify2.send(true); + let _ = on_connection_notify.send(true); log::debug!("WebRTC session closing due to disconnected"); let _ = stream_for_close2.lock().await.close().await; log::debug!("WebRTC session stream closed"); let mut lock = SESSIONS.lock().await; - let key = WebRTCStream::get_key_for_peer(&pc_for_close2).await; - lock.remove(&key); - log::debug!( - "WebRTC session removed key from cache: {} current len: {}", - key, - lock.len() - ); + match Self::get_key_for_peer(&pc_for_close2, start_local_offer).await.ok() { + Some(k) => { + lock.remove(&k); + log::debug!( + "WebRTC session removed key from cache: {} current len: {}", + k, + lock.len() + ); + } + None => return, + } } _ => {} } }) })); - // Register data channel creation handling - let remote_dc_open_notify = notify_tx.clone(); - let stream_for_dc = stream.clone(); - pc.on_data_channel(Box::new(move |dc: Arc| { - let d_label = dc.label().to_owned(); - let notify = remote_dc_open_notify.clone(); - let stream_for_dc_clone = stream_for_dc.clone(); - log::debug!("Remote data channel {} ready", d_label); - Box::pin(async move { - let mut stream_lock = stream_for_dc_clone.lock().await; - *stream_lock = dc.clone(); - drop(stream_lock); - dc.on_open(Box::new(move || { - let _ = notify.send(true); - Box::pin(async {}) - })); - }) - })); - // process offer/answer - if remote_offer.is_empty() { + if start_local_offer { let sdp = pc.create_offer(None).await?; let mut gather_complete = pc.gathering_complete_promise().await; pc.set_local_description(sdp.clone()).await?; let _ = gather_complete.recv().await; - key = Self::get_key_for_peer(&pc).await; - log::debug!("Start webrtc with local: {}", key); + log::debug!("local offer:\n{}", sdp.sdp); + // get local sdp key + key = Self::get_key_for_sdp(&sdp)?; + log::debug!("Start webrtc with local key: {}", key); } else { let sdp = serde_json::from_str::(&remote_offer)?; - pc.set_remote_description(sdp).await?; + pc.set_remote_description(sdp.clone()).await?; let answer = pc.create_answer(None).await?; let mut gather_complete = pc.gathering_complete_promise().await; pc.set_local_description(answer).await?; let _ = gather_complete.recv().await; - log::debug!("Start webrtc with remote: {}", remote_offer); + + log::debug!("remote offer:\n{}", sdp.sdp); + // get remote sdp key + key = Self::get_key_for_sdp(&sdp)?; + log::debug!("Start webrtc with remote key: {}", key); } - let webrtc_stream = WebRTCStream { + let webrtc_stream = Self { pc, stream, state_notify: notify_rx, @@ -237,7 +278,7 @@ impl WebRTCStream { let endpoint = Self::sdp_to_endpoint(&sdp); Ok(endpoint) } else { - Err(anyhow::anyhow!("Local description is not set")) + Err(anyhow::anyhow!("Local desc is not set")) } } @@ -366,6 +407,178 @@ pub fn is_webrtc_endpoint(endpoint: &str) -> bool { #[cfg(test)] mod tests { + use crate::webrtc::WebRTCStream; + use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; + #[test] - fn test_dc() {} + fn test_webrtc_session_key() { + let mut sdp_str = "".to_owned(); + assert_eq!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer(sdp_str).unwrap_or_default() + ) + .unwrap_or_default(), + "" + ); + + sdp_str = "v=0 +o=- 7400546379179479477 208696200 IN IP4 0.0.0.0 +s=- +t=0 0 +a=fingerprint:sha-256 97:52:D6:1F:1E:87:6C:DA:B8:21:95:64:A5:85:89:FA:02:71:C7:4D:B3:FD:25:92:40:FB:6B:65:24:3C:79:88 +a=group:BUNDLE 0 +a=extmap-allow-mixed +m=application 9 UDP/DTLS/SCTP webrtc-datachannel +c=IN IP4 0.0.0.0 +a=setup:actpass +a=mid:0 +a=sendrecv +a=sctp-port:5000 +a=ice-ufrag:RMWjjpXfpXbDPdMz +a=ice-pwd:BtIqlWHfwhsJdFiBROeLuEbNmYfHxRfT".to_owned(); + assert_eq!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer(sdp_str).unwrap_or_default() + ).unwrap_or_default(), + "sha-256 97:52:D6:1F:1E:87:6C:DA:B8:21:95:64:A5:85:89:FA:02:71:C7:4D:B3:FD:25:92:40:FB:6B:65:24:3C:79:88" + ); + + sdp_str = "v=0 +o=- 7400546379179479477 208696200 IN IP4 0.0.0.0 +s=- +t=0 0 +a=group:BUNDLE 0 +a=extmap-allow-mixed +m=application 9 UDP/DTLS/SCTP webrtc-datachannel +c=IN IP4 0.0.0.0 +a=fingerprint:sha-256 97:52:D6:1F:1E:87:6C:DA:B8:21:95:64:A5:85:89:FA:02:71:C7:4D:B3:FD:25:92:40:FB:6B:65:24:3C:79:88 +a=setup:actpass +a=mid:0 +a=sendrecv +a=sctp-port:5000 +a=ice-ufrag:RMWjjpXfpXbDPdMz +a=ice-pwd:BtIqlWHfwhsJdFiBROeLuEbNmYfHxRfT".to_owned(); + assert_eq!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer(sdp_str).unwrap_or_default() + ).unwrap_or_default(), + "sha-256 97:52:D6:1F:1E:87:6C:DA:B8:21:95:64:A5:85:89:FA:02:71:C7:4D:B3:FD:25:92:40:FB:6B:65:24:3C:79:88" + ); + + sdp_str = "v=0 +o=- 7400546379179479477 208696200 IN IP4 0.0.0.0 +s=- +t=0 0 +a=group:BUNDLE 0 +a=extmap-allow-mixed +m=application 9 UDP/DTLS/SCTP webrtc-datachannel +c=IN IP4 0.0.0.0 +a=setup:actpass +a=mid:0 +a=sendrecv +a=sctp-port:5000 +a=ice-ufrag:RMWjjpXfpXbDPdMz +a=ice-pwd:BtIqlWHfwhsJdFiBROeLuEbNmYfHxRfT" + .to_owned(); + assert!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer(sdp_str).unwrap_or_default() + ) + .is_err(), + "can not find fingerprint attribute" + ); + + sdp_str = "v=0 +o=- 7400546379179479477 208696200 IN IP4 0.0.0.0 +s=- +t=0 0 +a=group:BUNDLE 0 +a=extmap-allow-mixed +m=audio 9 UDP/DTLS/SCTP webrtc-datachannel +c=IN IP4 0.0.0.0 +a=fingerprint:sha-256 97:52:D6:1F:1E:87:6C:DA:B8:21:95:64:A5:85:89:FA:02:71:C7:4D:B3:FD:25:92:40:FB:6B:65:24:3C:79:88 +a=setup:actpass +a=mid:0 +a=sendrecv +a=sctp-port:5000 +a=ice-ufrag:RMWjjpXfpXbDPdMz +a=ice-pwd:BtIqlWHfwhsJdFiBROeLuEbNmYfHxRfT".to_owned(); + assert!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer(sdp_str).unwrap_or_default() + ) + .is_err(), + "can not find datachannel fingerprint attribute" + ); + + assert!( + WebRTCStream::get_key_for_sdp( + &RTCSessionDescription::offer("".to_owned()).unwrap_or_default() + ) + .is_err(), + "invalid sdp should error" + ); + + assert!( + WebRTCStream::get_key_for_sdp_json("{}").is_err(), + "empty sdp json should error" + ); + + assert!( + WebRTCStream::get_key_for_sdp_json("{ss}").is_err(), + "invalid sdp json should error" + ); + + let endpoint = "webrtc://eyJ0eXBlIjoiYW5zd2VyIiwic2RwIjoidj0wXHJcbm89LSA0MTA1NDk3NTY2NDgyMTQzODEwIDYwMzk1NzQw\ +MCBJTiBJUDQgMC4wLjAuMFxyXG5zPS1cclxudD0wIDBcclxuYT1maW5nZXJwcmludDpzaGEtMjU2IDYxOjYwOjc0OjQwOjI4OkNFOjBCOjBDOjc1OjRCOj\ +EwOjlBOkVFOjc3OkY1OjQ0OjU3Ojg0OjUxOkRCOjA0OjkyOjRBOjEwOjFDOjRFOjVGOjdFOkYxOkIzOjcxOjIyXHJcbmE9Z3JvdXA6QlVORExFIDBcclxu\ +YT1leHRtYXAtYWxsb3ctbWl4ZWRcclxubT1hcHBsaWNhdGlvbiA5IFVEUC9EVExTL1NDVFAgd2VicnRjLWRhdGFjaGFubmVsXHJcbmM9SU4gSVA0IDAuMC\ +4wLjBcclxuYT1zZXR1cDphY3RpdmVcclxuYT1taWQ6MFxyXG5hPXNlbmRyZWN2XHJcbmE9c2N0cC1wb3J0OjUwMDBcclxuYT1pY2UtdWZyYWc6SHlnU1Rr\ +V2RsRlpHRG1XWlxyXG5hPWljZS1wd2Q6SkJneFZWaGZveVhHdHZha1VWcnBQeHVOSVpMU3llS1pcclxuYT1jYW5kaWRhdGU6OTYzOTg4MzQ4IDEgdWRwID\ +IxMzA3MDY0MzEgMTkyLjE2OC4xLjIgNjQwMDcgdHlwIGhvc3RcclxuYT1jYW5kaWRhdGU6OTYzOTg4MzQ4IDIgdWRwIDIxMzA3MDY0MzEgMTkyLjE2OC4x\ +LjIgNjQwMDcgdHlwIGhvc3RcclxuYT1jYW5kaWRhdGU6MTg2MTA0NTE5MCAxIHVkcCAxNjk0NDk4ODE1IDE0LjIxMi42OC4xMiAyNzAwNCB0eXAgc3JmbH\ +ggcmFkZHIgMC4wLjAuMCBycG9ydCA2NDAwOFxyXG5hPWNhbmRpZGF0ZToxODYxMDQ1MTkwIDIgdWRwIDE2OTQ0OTg4MTUgMTQuMjEyLjY4LjEyIDI3MDA0\ +IHR5cCBzcmZseCByYWRkciAwLjAuMC4wIHJwb3J0IDY0MDA4XHJcbmE9ZW5kLW9mLWNhbmRpZGF0ZXNcclxuIn0=".to_owned(); + assert_eq!( + WebRTCStream::get_key_for_sdp_json( + &WebRTCStream::get_remote_offer(&endpoint).unwrap_or_default() + ).unwrap_or_default(), + "sha-256 61:60:74:40:28:CE:0B:0C:75:4B:10:9A:EE:77:F5:44:57:84:51:DB:04:92:4A:10:1C:4E:5F:7E:F1:B3:71:22" + ); + } + + #[tokio::test] + async fn test_webrtc_new_stream() { + let mut endpoint = "webrtc://sdfsdf".to_owned(); + assert!( + WebRTCStream::new(&endpoint, 10000).await.is_err(), + "invalid webrtc endpoint should error" + ); + + endpoint = "wss://sdfsdf".to_owned(); + assert!( + WebRTCStream::new(&endpoint, 10000).await.is_err(), + "invalid webrtc endpoint should error" + ); + + assert!( + WebRTCStream::new("", 10000).await.is_ok(), + "local webrtc endpoint should ok" + ); + + endpoint = "webrtc://eyJ0eXBlIjoiYW5zd2VyIiwic2RwIjoidj0wXHJcbm89LSA0MTA1NDk3NTY2NDgyMTQzODEwIDYwMzk1NzQw\ +MCBJTiBJUDQgMC4wLjAuMFxyXG5zPS1cclxudD0wIDBcclxuYT1maW5nZXJwcmludDpzaGEtMjU2IDYxOjYwOjc0OjQwOjI4OkNFOjBCOjBDOjc1OjRCOj\ +EwOjlBOkVFOjc3OkY1OjQ0OjU3Ojg0OjUxOkRCOjA0OjkyOjRBOjEwOjFDOjRFOjVGOjdFOkYxOkIzOjcxOjIyXHJcbmE9Z3JvdXA6QlVORExFIDBcclxu\ +YT1leHRtYXAtYWxsb3ctbWl4ZWRcclxubT1hcHBsaWNhdGlvbiA5IFVEUC9EVExTL1NDVFAgd2VicnRjLWRhdGFjaGFubmVsXHJcbmM9SU4gSVA0IDAuMC\ +4wLjBcclxuYT1zZXR1cDphY3RpdmVcclxuYT1taWQ6MFxyXG5hPXNlbmRyZWN2XHJcbmE9c2N0cC1wb3J0OjUwMDBcclxuYT1pY2UtdWZyYWc6SHlnU1Rr\ +V2RsRlpHRG1XWlxyXG5hPWljZS1wd2Q6SkJneFZWaGZveVhHdHZha1VWcnBQeHVOSVpMU3llS1pcclxuYT1jYW5kaWRhdGU6OTYzOTg4MzQ4IDEgdWRwID\ +IxMzA3MDY0MzEgMTkyLjE2OC4xLjIgNjQwMDcgdHlwIGhvc3RcclxuYT1jYW5kaWRhdGU6OTYzOTg4MzQ4IDIgdWRwIDIxMzA3MDY0MzEgMTkyLjE2OC4x\ +LjIgNjQwMDcgdHlwIGhvc3RcclxuYT1jYW5kaWRhdGU6MTg2MTA0NTE5MCAxIHVkcCAxNjk0NDk4ODE1IDE0LjIxMi42OC4xMiAyNzAwNCB0eXAgc3JmbH\ +ggcmFkZHIgMC4wLjAuMCBycG9ydCA2NDAwOFxyXG5hPWNhbmRpZGF0ZToxODYxMDQ1MTkwIDIgdWRwIDE2OTQ0OTg4MTUgMTQuMjEyLjY4LjEyIDI3MDA0\ +IHR5cCBzcmZseCByYWRkciAwLjAuMC4wIHJwb3J0IDY0MDA4XHJcbmE9ZW5kLW9mLWNhbmRpZGF0ZXNcclxuIn0=".to_owned(); + assert!( + WebRTCStream::new(&endpoint, 10000).await.is_err(), + "connect to an 'answer' webrtc endpoint should error" + ); + } } From 3a919aef545feeaecd79e7cd9b4f40eb1bbafe0c Mon Sep 17 00:00:00 2001 From: lc Date: Sat, 15 Nov 2025 00:27:09 +0800 Subject: [PATCH 14/24] minor change --- examples/webrtc.rs | 5 ++++- examples/webrtc_dummy.rs | 2 +- src/webrtc.rs | 18 +++++++++++------- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/examples/webrtc.rs b/examples/webrtc.rs index e9af42383..80e405a68 100644 --- a/examples/webrtc.rs +++ b/examples/webrtc.rs @@ -20,7 +20,10 @@ use webrtc::peer_connection::math_rand_alpha; async fn main() -> Result<()> { #[cfg(not(feature = "webrtc"))] if true { - println!("The webrtc feature is not enabled. Please enable the webrtc feature to run this example."); + println!( + "The webrtc feature is not enabled. \ + Please enable the webrtc feature to run this example." + ); return Ok(()); } let app = Command::new("webrtc-stream") diff --git a/examples/webrtc_dummy.rs b/examples/webrtc_dummy.rs index 78d549034..e40699099 100644 --- a/examples/webrtc_dummy.rs +++ b/examples/webrtc_dummy.rs @@ -42,4 +42,4 @@ impl WebRTCStream { } } -fn main() {} +fn _main() {} diff --git a/src/webrtc.rs b/src/webrtc.rs index 7fda0775d..5c0bc4c9c 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -218,8 +218,8 @@ impl WebRTCStream { log::debug!("WebRTC session stream closed"); let mut lock = SESSIONS.lock().await; - match Self::get_key_for_peer(&pc_for_close2, start_local_offer).await.ok() { - Some(k) => { + match Self::get_key_for_peer(&pc_for_close2, start_local_offer).await { + Ok(k) => { lock.remove(&k); log::debug!( "WebRTC session removed key from cache: {} current len: {}", @@ -227,7 +227,7 @@ impl WebRTCStream { lock.len() ); } - None => return, + Err(_e) => {} } } _ => {} @@ -421,7 +421,8 @@ mod tests { "" ); - sdp_str = "v=0 + sdp_str = "\ +v=0 o=- 7400546379179479477 208696200 IN IP4 0.0.0.0 s=- t=0 0 @@ -443,7 +444,8 @@ a=ice-pwd:BtIqlWHfwhsJdFiBROeLuEbNmYfHxRfT".to_owned(); "sha-256 97:52:D6:1F:1E:87:6C:DA:B8:21:95:64:A5:85:89:FA:02:71:C7:4D:B3:FD:25:92:40:FB:6B:65:24:3C:79:88" ); - sdp_str = "v=0 + sdp_str = "\ +v=0 o=- 7400546379179479477 208696200 IN IP4 0.0.0.0 s=- t=0 0 @@ -465,7 +467,8 @@ a=ice-pwd:BtIqlWHfwhsJdFiBROeLuEbNmYfHxRfT".to_owned(); "sha-256 97:52:D6:1F:1E:87:6C:DA:B8:21:95:64:A5:85:89:FA:02:71:C7:4D:B3:FD:25:92:40:FB:6B:65:24:3C:79:88" ); - sdp_str = "v=0 + sdp_str = "\ +v=0 o=- 7400546379179479477 208696200 IN IP4 0.0.0.0 s=- t=0 0 @@ -488,7 +491,8 @@ a=ice-pwd:BtIqlWHfwhsJdFiBROeLuEbNmYfHxRfT" "can not find fingerprint attribute" ); - sdp_str = "v=0 + sdp_str = "\ +v=0 o=- 7400546379179479477 208696200 IN IP4 0.0.0.0 s=- t=0 0 From b10a96b7bce37081a65291b455d6617a382ac652 Mon Sep 17 00:00:00 2001 From: lichon Date: Sat, 15 Nov 2025 16:04:59 +0800 Subject: [PATCH 15/24] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- examples/webrtc.rs | 1 + examples/webrtc_dummy.rs | 2 +- src/webrtc.rs | 10 +++++----- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/examples/webrtc.rs b/examples/webrtc.rs index 80e405a68..df4a7225e 100644 --- a/examples/webrtc.rs +++ b/examples/webrtc.rs @@ -14,6 +14,7 @@ use bytes::Bytes; use clap::{Arg, Command}; use tokio::time::Duration; +#[cfg(feature = "webrtc")] use webrtc::peer_connection::math_rand_alpha; #[tokio::main] diff --git a/examples/webrtc_dummy.rs b/examples/webrtc_dummy.rs index e40699099..c826d64a7 100644 --- a/examples/webrtc_dummy.rs +++ b/examples/webrtc_dummy.rs @@ -42,4 +42,4 @@ impl WebRTCStream { } } -fn _main() {} + diff --git a/src/webrtc.rs b/src/webrtc.rs index 5c0bc4c9c..f2b0dc2d0 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -32,7 +32,8 @@ pub struct WebRTCStream { send_timeout: u64, } -/// message size limit for Chromium +/// Standard maximum message size for WebRTC data channels (RFC 8831, 65535 bytes). +/// Most browsers, including Chromium, enforce this protocol limit. const DATA_CHANNEL_BUFFER_SIZE: u16 = u16::MAX; lazy_static::lazy_static! { @@ -222,9 +223,8 @@ impl WebRTCStream { Ok(k) => { lock.remove(&k); log::debug!( - "WebRTC session removed key from cache: {} current len: {}", - k, - lock.len() + "WebRTC session removed key from cache: {}", + k ); } Err(_e) => {} @@ -274,7 +274,7 @@ impl WebRTCStream { #[inline] pub async fn get_local_endpoint(&self) -> ResultType { if let Some(local_desc) = self.pc.local_description().await { - let sdp = serde_json::to_string(&local_desc).unwrap_or_default(); + let sdp = serde_json::to_string(&local_desc)?; let endpoint = Self::sdp_to_endpoint(&sdp); Ok(endpoint) } else { From 483cf9d225165986d91bf63ef7196092e2a51e1e Mon Sep 17 00:00:00 2001 From: lichon Date: Sat, 15 Nov 2025 16:04:59 +0800 Subject: [PATCH 16/24] Apply suggestions from code review --- src/webrtc.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/webrtc.rs b/src/webrtc.rs index f2b0dc2d0..39797298e 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -36,6 +36,8 @@ pub struct WebRTCStream { /// Most browsers, including Chromium, enforce this protocol limit. const DATA_CHANNEL_BUFFER_SIZE: u16 = u16::MAX; +const DEFAULT_ICE_SERVER: &str = "stun:stun.cloudflare.com:3478"; + lazy_static::lazy_static! { static ref SESSIONS: Arc::>> = Default::default(); } @@ -133,13 +135,14 @@ impl WebRTCStream { }; let mut key = Self::get_key_for_sdp_json(&remote_offer)?; - let mut lock = SESSIONS.lock().await; - if let Some(cached_stream) = lock.get(&key) { + let sessions_lock = SESSIONS.lock().await; + if let Some(cached_stream) = sessions_lock.get(&key) { if !key.is_empty() { log::debug!("Start webrtc with cached peer"); return Ok(cached_stream.clone()); } } + drop(sessions_lock); // Create a SettingEngine and enable Detach let mut s = SettingEngine::default(); @@ -152,7 +155,7 @@ impl WebRTCStream { // Prepare the configuration let config = RTCConfiguration { ice_servers: vec![RTCIceServer { - urls: vec!["stun:stun.cloudflare.com:3478".to_owned()], + urls: vec![DEFAULT_ICE_SERVER.to_string()], ..Default::default() }], ..Default::default() @@ -218,14 +221,11 @@ impl WebRTCStream { let _ = stream_for_close2.lock().await.close().await; log::debug!("WebRTC session stream closed"); - let mut lock = SESSIONS.lock().await; + let mut sessions_lock = SESSIONS.lock().await; match Self::get_key_for_peer(&pc_for_close2, start_local_offer).await { Ok(k) => { - lock.remove(&k); - log::debug!( - "WebRTC session removed key from cache: {}", - k - ); + sessions_lock.remove(&k); + log::debug!("WebRTC session removed key: {}", k); } Err(_e) => {} } @@ -266,8 +266,7 @@ impl WebRTCStream { state_notify: notify_rx, send_timeout: ms_timeout, }; - - lock.insert(key, webrtc_stream.clone()); + SESSIONS.lock().await.insert(key, webrtc_stream.clone()); Ok(webrtc_stream) } From 0da5d379fc4198c7ec196019242d22e01f28112c Mon Sep 17 00:00:00 2001 From: lc Date: Sun, 16 Nov 2025 04:04:25 +0800 Subject: [PATCH 17/24] support turn relay config, and force_relay option --- examples/webrtc.rs | 2 +- src/socket_client.rs | 2 +- src/webrtc.rs | 122 +++++++++++++++++++++++++++++++++++++++---- 3 files changed, 113 insertions(+), 13 deletions(-) diff --git a/examples/webrtc.rs b/examples/webrtc.rs index df4a7225e..3d5da7360 100644 --- a/examples/webrtc.rs +++ b/examples/webrtc.rs @@ -70,7 +70,7 @@ async fn main() -> Result<()> { "".to_string() }; - let webrtc_stream = WebRTCStream::new(&remote_endpoint, 30000).await?; + let webrtc_stream = WebRTCStream::new(&remote_endpoint, false, 30000).await?; // Print the offer to be sent to the other peer let local_endpoint = webrtc_stream.get_local_endpoint().await?; diff --git a/src/socket_client.rs b/src/socket_client.rs index 1f568ff3f..0e898b88f 100644 --- a/src/socket_client.rs +++ b/src/socket_client.rs @@ -134,7 +134,7 @@ pub async fn connect_tcp< #[cfg(feature = "webrtc")] if is_webrtc_endpoint(&target.to_string()) { return Ok(Stream::WebRTC( - webrtc::WebRTCStream::new(&target.to_string(), ms_timeout).await?, + webrtc::WebRTCStream::new(&target.to_string(), false, ms_timeout).await?, )); } let target_str = check_ws(&target.to_string()); diff --git a/src/webrtc.rs b/src/webrtc.rs index 39797298e..7cdfc25a2 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -11,6 +11,7 @@ use webrtc::ice::mdns::MulticastDnsMode; use webrtc::ice_transport::ice_server::RTCIceServer; use webrtc::peer_connection::configuration::RTCConfiguration; use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState; +use webrtc::peer_connection::policy::ice_transport_policy::RTCIceTransportPolicy; use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; use webrtc::peer_connection::RTCPeerConnection; @@ -20,7 +21,9 @@ use bytes::{Bytes, BytesMut}; use tokio::sync::watch; use tokio::sync::Mutex; use tokio::time::timeout; +use url::Url; +use crate::config; use crate::protobuf::Message; use crate::sodiumoxide::crypto::secretbox::Key; use crate::ResultType; @@ -36,7 +39,15 @@ pub struct WebRTCStream { /// Most browsers, including Chromium, enforce this protocol limit. const DATA_CHANNEL_BUFFER_SIZE: u16 = u16::MAX; -const DEFAULT_ICE_SERVER: &str = "stun:stun.cloudflare.com:3478"; +// use 3 public STUN servers to find out the NAT type, 2 must be the same address but different ports +// https://stackoverflow.com/questions/72805316/determine-nat-mapping-behaviour-using-two-stun-servers +// luckily nextcloud supports two ports for STUN +// unluckily webrtc-rs does not use the same port to do the STUN request +static DEFAULT_ICE_SERVERS: [&str; 3] = [ + "stun:stun.cloudflare.com:3478", + "stun:stun.nextcloud.com:3478", + "stun:stun.nextcloud.com:443", +]; lazy_static::lazy_static! { static ref SESSIONS: Arc::>> = Default::default(); @@ -126,7 +137,35 @@ impl WebRTCStream { Self::get_key_for_sdp(&desc) } - pub async fn new(remote_endpoint: &str, ms_timeout: u64) -> ResultType { + #[inline] + fn get_turn_server_from_url(url: &str) -> Option { + // standard url format with turn scheme: turn://user:pass@host:port + match Url::parse(url) { + Ok(u) => { + if u.scheme() == "turn" { + Some(RTCIceServer { + urls: vec![format!( + "turn:{}:{}", + u.host_str().unwrap_or_default(), + u.port().unwrap_or(3478) + )], + username: u.username().to_string(), + credential: u.password().unwrap_or_default().to_string(), + ..Default::default() + }) + } else { + None + } + } + Err(_) => None, + } + } + + pub async fn new( + remote_endpoint: &str, + force_relay: bool, + ms_timeout: u64, + ) -> ResultType { log::debug!("New webrtc stream to endpoint: {}", remote_endpoint); let remote_offer = if remote_endpoint.is_empty() { "".into() @@ -144,6 +183,7 @@ impl WebRTCStream { } drop(sessions_lock); + let start_local_offer = remote_offer.is_empty(); // Create a SettingEngine and enable Detach let mut s = SettingEngine::default(); s.detach_data_channels(); @@ -151,17 +191,29 @@ impl WebRTCStream { // Create the API object let api = APIBuilder::new().with_setting_engine(s).build(); + let mut ice_servers = vec![RTCIceServer { + urls: DEFAULT_ICE_SERVERS.iter().map(|s| s.to_string()).collect(), + ..Default::default() + }]; + if start_local_offer { + // only offer needs TURN server + let relay_server = config::Config::get_option(config::keys::OPTION_RELAY_SERVER); + if let Some(turn_server) = Self::get_turn_server_from_url(&relay_server) { + ice_servers.push(turn_server); + } + } // Prepare the configuration let config = RTCConfiguration { - ice_servers: vec![RTCIceServer { - urls: vec![DEFAULT_ICE_SERVER.to_string()], - ..Default::default() - }], + ice_servers, + ice_transport_policy: if force_relay { + RTCIceTransportPolicy::Relay + } else { + RTCIceTransportPolicy::All + }, ..Default::default() }; - let start_local_offer = remote_offer.is_empty(); let (notify_tx, notify_rx) = watch::channel(false); // Create a new RTCPeerConnection let pc = Arc::new(api.new_peer_connection(config).await?); @@ -409,6 +461,54 @@ mod tests { use crate::webrtc::WebRTCStream; use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; + #[test] + fn test_webrtc_turn_url() { + assert_eq!( + WebRTCStream::get_turn_server_from_url("turn://example.com:3478") + .unwrap_or_default() + .urls[0], + "turn:example.com:3478" + ); + + assert_eq!( + WebRTCStream::get_turn_server_from_url("turn://example.com") + .unwrap_or_default() + .urls[0], + "turn:example.com:3478" + ); + + assert_eq!( + WebRTCStream::get_turn_server_from_url("turn://123@example.com") + .unwrap_or_default() + .username, + "123" + ); + + assert_eq!( + WebRTCStream::get_turn_server_from_url("turn://123@example.com") + .unwrap_or_default() + .credential, + "" + ); + + assert_eq!( + WebRTCStream::get_turn_server_from_url("turn://123:321@example.com") + .unwrap_or_default() + .credential, + "321" + ); + + assert_eq!( + WebRTCStream::get_turn_server_from_url("stun://example.com:3478"), + None + ); + + assert_eq!( + WebRTCStream::get_turn_server_from_url("http://123:123@example.com:3478"), + None + ); + } + #[test] fn test_webrtc_session_key() { let mut sdp_str = "".to_owned(); @@ -554,18 +654,18 @@ IHR5cCBzcmZseCByYWRkciAwLjAuMC4wIHJwb3J0IDY0MDA4XHJcbmE9ZW5kLW9mLWNhbmRpZGF0ZXNc async fn test_webrtc_new_stream() { let mut endpoint = "webrtc://sdfsdf".to_owned(); assert!( - WebRTCStream::new(&endpoint, 10000).await.is_err(), + WebRTCStream::new(&endpoint, false, 10000).await.is_err(), "invalid webrtc endpoint should error" ); endpoint = "wss://sdfsdf".to_owned(); assert!( - WebRTCStream::new(&endpoint, 10000).await.is_err(), + WebRTCStream::new(&endpoint, false, 10000).await.is_err(), "invalid webrtc endpoint should error" ); assert!( - WebRTCStream::new("", 10000).await.is_ok(), + WebRTCStream::new("", false, 10000).await.is_ok(), "local webrtc endpoint should ok" ); @@ -580,7 +680,7 @@ LjIgNjQwMDcgdHlwIGhvc3RcclxuYT1jYW5kaWRhdGU6MTg2MTA0NTE5MCAxIHVkcCAxNjk0NDk4ODE1 ggcmFkZHIgMC4wLjAuMCBycG9ydCA2NDAwOFxyXG5hPWNhbmRpZGF0ZToxODYxMDQ1MTkwIDIgdWRwIDE2OTQ0OTg4MTUgMTQuMjEyLjY4LjEyIDI3MDA0\ IHR5cCBzcmZseCByYWRkciAwLjAuMC4wIHJwb3J0IDY0MDA4XHJcbmE9ZW5kLW9mLWNhbmRpZGF0ZXNcclxuIn0=".to_owned(); assert!( - WebRTCStream::new(&endpoint, 10000).await.is_err(), + WebRTCStream::new(&endpoint, false, 10000).await.is_err(), "connect to an 'answer' webrtc endpoint should error" ); } From 7cb29b1117d8ee942e48ce305e952680e8ed3d7e Mon Sep 17 00:00:00 2001 From: lc Date: Sun, 16 Nov 2025 18:43:31 +0800 Subject: [PATCH 18/24] add ice-servers config --- src/config.rs | 2 ++ src/socket_client.rs | 4 +-- src/webrtc.rs | 82 ++++++++++++++++++++++++++++++-------------- 3 files changed, 61 insertions(+), 27 deletions(-) diff --git a/src/config.rs b/src/config.rs index 516674910..212a22f1e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2550,6 +2550,7 @@ pub mod keys { pub const OPTION_TRACKPAD_SPEED: &str = "trackpad-speed"; pub const OPTION_REGISTER_DEVICE: &str = "register-device"; pub const OPTION_RELAY_SERVER: &str = "relay-server"; + pub const OPTION_ICE_SERVERS: &str = "ice-servers"; pub const OPTION_DISABLE_UDP: &str = "disable-udp"; pub const OPTION_ALLOW_INSECURE_TLS_FALLBACK: &str = "allow-insecure-tls-fallback"; pub const OPTION_SHOW_VIRTUAL_MOUSE: &str = "show-virtual-mouse"; @@ -2746,6 +2747,7 @@ pub mod keys { OPTION_ENABLE_ANDROID_SOFTWARE_ENCODING_HALF_SCALE, OPTION_ENABLE_TRUSTED_DEVICES, OPTION_RELAY_SERVER, + OPTION_ICE_SERVERS, OPTION_DISABLE_UDP, OPTION_ALLOW_INSECURE_TLS_FALLBACK, ]; diff --git a/src/socket_client.rs b/src/socket_client.rs index 0e898b88f..9178b74b5 100644 --- a/src/socket_client.rs +++ b/src/socket_client.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "webrtc")] +use crate::webrtc::{self, is_webrtc_endpoint}; use crate::{ config::{Config, NetworkType}, tcp::FramedStream, @@ -5,8 +7,6 @@ use crate::{ websocket::{self, check_ws, is_ws_endpoint}, ResultType, Stream, }; -#[cfg(feature = "webrtc")] -use crate::webrtc::{self, is_webrtc_endpoint}; use anyhow::Context; use std::{net::SocketAddr, sync::Arc}; use tokio::net::{ToSocketAddrs, UdpSocket}; diff --git a/src/webrtc.rs b/src/webrtc.rs index 7cdfc25a2..c0b5be943 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -138,14 +138,15 @@ impl WebRTCStream { } #[inline] - fn get_turn_server_from_url(url: &str) -> Option { + fn get_ice_server_from_url(url: &str) -> Option { // standard url format with turn scheme: turn://user:pass@host:port match Url::parse(url) { Ok(u) => { - if u.scheme() == "turn" { + if u.scheme() == "turn" || u.scheme() == "stun" { Some(RTCIceServer { urls: vec![format!( - "turn:{}:{}", + "{}:{}:{}", + u.scheme(), u.host_str().unwrap_or_default(), u.port().unwrap_or(3478) )], @@ -161,6 +162,24 @@ impl WebRTCStream { } } + #[inline] + fn get_ice_servers() -> Vec { + let mut ice_servers = Vec::new(); + let cfg = config::Config::get_option(config::keys::OPTION_ICE_SERVERS); + for url in cfg.split(',').map(str::trim) { + if let Some(ice_server) = Self::get_ice_server_from_url(url) { + ice_servers.push(ice_server); + } + } + if ice_servers.is_empty() { + ice_servers.push(RTCIceServer { + urls: DEFAULT_ICE_SERVERS.iter().map(|s| s.to_string()).collect(), + ..Default::default() + }); + } + ice_servers + } + pub async fn new( remote_endpoint: &str, force_relay: bool, @@ -191,21 +210,10 @@ impl WebRTCStream { // Create the API object let api = APIBuilder::new().with_setting_engine(s).build(); - let mut ice_servers = vec![RTCIceServer { - urls: DEFAULT_ICE_SERVERS.iter().map(|s| s.to_string()).collect(), - ..Default::default() - }]; - if start_local_offer { - // only offer needs TURN server - let relay_server = config::Config::get_option(config::keys::OPTION_RELAY_SERVER); - if let Some(turn_server) = Self::get_turn_server_from_url(&relay_server) { - ice_servers.push(turn_server); - } - } - // Prepare the configuration + // Prepare the configuration, get ICE servers from config let config = RTCConfiguration { - ice_servers, + ice_servers: Self::get_ice_servers(), ice_transport_policy: if force_relay { RTCIceTransportPolicy::Relay } else { @@ -458,55 +466,79 @@ pub fn is_webrtc_endpoint(endpoint: &str) -> bool { #[cfg(test)] mod tests { + use crate::config; + use crate::webrtc::DEFAULT_ICE_SERVERS; use crate::webrtc::WebRTCStream; use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; #[test] - fn test_webrtc_turn_url() { + fn test_webrtc_ice_url() { assert_eq!( - WebRTCStream::get_turn_server_from_url("turn://example.com:3478") + WebRTCStream::get_ice_server_from_url("turn://example.com:3478") .unwrap_or_default() .urls[0], "turn:example.com:3478" ); assert_eq!( - WebRTCStream::get_turn_server_from_url("turn://example.com") + WebRTCStream::get_ice_server_from_url("turn://example.com") .unwrap_or_default() .urls[0], "turn:example.com:3478" ); assert_eq!( - WebRTCStream::get_turn_server_from_url("turn://123@example.com") + WebRTCStream::get_ice_server_from_url("turn://123@example.com") .unwrap_or_default() .username, "123" ); assert_eq!( - WebRTCStream::get_turn_server_from_url("turn://123@example.com") + WebRTCStream::get_ice_server_from_url("turn://123@example.com") .unwrap_or_default() .credential, "" ); assert_eq!( - WebRTCStream::get_turn_server_from_url("turn://123:321@example.com") + WebRTCStream::get_ice_server_from_url("turn://123:321@example.com") .unwrap_or_default() .credential, "321" ); assert_eq!( - WebRTCStream::get_turn_server_from_url("stun://example.com:3478"), - None + WebRTCStream::get_ice_server_from_url("stun://example.com:3478") + .unwrap_or_default() + .urls[0], + "stun:example.com:3478" ); assert_eq!( - WebRTCStream::get_turn_server_from_url("http://123:123@example.com:3478"), + WebRTCStream::get_ice_server_from_url("http://123:123@example.com:3478"), None ); + + config::Config::set_option("ice-servers".to_string(), "".to_string()); + assert_eq!( + WebRTCStream::get_ice_servers()[0].urls[0], + DEFAULT_ICE_SERVERS[0].to_string() + ); + + config::Config::set_option("ice-servers".to_string(), ",stun://example.com,turn://example.com,sdf".to_string()); + assert_eq!( + WebRTCStream::get_ice_servers()[0].urls[0], + "stun:example.com:3478" + ); + assert_eq!( + WebRTCStream::get_ice_servers()[1].urls[0], + "turn:example.com:3478" + ); + assert_eq!( + WebRTCStream::get_ice_servers().len(), + 2 + ); } #[test] From 3282977e66fa0c0cfb8d59e3299df45ef413b10b Mon Sep 17 00:00:00 2001 From: lc Date: Mon, 17 Nov 2025 13:18:27 +0800 Subject: [PATCH 19/24] Apply suggestions from code review --- src/webrtc.rs | 45 +++++++++++++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/src/webrtc.rs b/src/webrtc.rs index c0b5be943..3c47798e6 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -142,7 +142,11 @@ impl WebRTCStream { // standard url format with turn scheme: turn://user:pass@host:port match Url::parse(url) { Ok(u) => { - if u.scheme() == "turn" || u.scheme() == "stun" { + if u.scheme() == "turn" + || u.scheme() == "turns" + || u.scheme() == "stun" + || u.scheme() == "stuns" + { Some(RTCIceServer { urls: vec![format!( "{}:{}:{}", @@ -166,16 +170,33 @@ impl WebRTCStream { fn get_ice_servers() -> Vec { let mut ice_servers = Vec::new(); let cfg = config::Config::get_option(config::keys::OPTION_ICE_SERVERS); + + let mut has_stun = false; + for url in cfg.split(',').map(str::trim) { if let Some(ice_server) = Self::get_ice_server_from_url(url) { + // Detect STUN in user config + if ice_server + .urls + .iter() + .any(|u| u.starts_with("stun:") || u.starts_with("stuns:")) + { + has_stun = true; + } + ice_servers.push(ice_server); } } - if ice_servers.is_empty() { - ice_servers.push(RTCIceServer { - urls: DEFAULT_ICE_SERVERS.iter().map(|s| s.to_string()).collect(), - ..Default::default() - }); + + // If there is no STUN (either TURN-only or empty config) → prepend defaults + if !has_stun { + ice_servers.insert( + 0, + RTCIceServer { + urls: DEFAULT_ICE_SERVERS.iter().map(|s| s.to_string()).collect(), + ..Default::default() + }, + ); } ice_servers } @@ -467,8 +488,8 @@ pub fn is_webrtc_endpoint(endpoint: &str) -> bool { #[cfg(test)] mod tests { use crate::config; - use crate::webrtc::DEFAULT_ICE_SERVERS; use crate::webrtc::WebRTCStream; + use crate::webrtc::DEFAULT_ICE_SERVERS; use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; #[test] @@ -526,7 +547,10 @@ mod tests { DEFAULT_ICE_SERVERS[0].to_string() ); - config::Config::set_option("ice-servers".to_string(), ",stun://example.com,turn://example.com,sdf".to_string()); + config::Config::set_option( + "ice-servers".to_string(), + ",stun://example.com,turn://example.com,sdf".to_string(), + ); assert_eq!( WebRTCStream::get_ice_servers()[0].urls[0], "stun:example.com:3478" @@ -535,10 +559,7 @@ mod tests { WebRTCStream::get_ice_servers()[1].urls[0], "turn:example.com:3478" ); - assert_eq!( - WebRTCStream::get_ice_servers().len(), - 2 - ); + assert_eq!(WebRTCStream::get_ice_servers().len(), 2); } #[test] From 13ef3411d9cafaed184a6931787fbe658d2b1803 Mon Sep 17 00:00:00 2001 From: lichon Date: Mon, 17 Nov 2025 14:45:51 +0800 Subject: [PATCH 20/24] Update examples/webrtc.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- examples/webrtc.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/examples/webrtc.rs b/examples/webrtc.rs index 3d5da7360..177166347 100644 --- a/examples/webrtc.rs +++ b/examples/webrtc.rs @@ -17,16 +17,19 @@ use tokio::time::Duration; #[cfg(feature = "webrtc")] use webrtc::peer_connection::math_rand_alpha; +#[cfg(not(feature = "webrtc"))] +#[tokio::main] +async fn main() -> Result<()> { + println!( + "The webrtc feature is not enabled. \ + Please enable the webrtc feature to run this example." + ); + Ok(()) +} + +#[cfg(feature = "webrtc")] #[tokio::main] async fn main() -> Result<()> { - #[cfg(not(feature = "webrtc"))] - if true { - println!( - "The webrtc feature is not enabled. \ - Please enable the webrtc feature to run this example." - ); - return Ok(()); - } let app = Command::new("webrtc-stream") .about("An example of webrtc stream using hbb_common and webrtc-rs") .arg( From 2dc15df250a789f5026da146a9f21235be646480 Mon Sep 17 00:00:00 2001 From: lichon Date: Mon, 17 Nov 2025 14:55:52 +0800 Subject: [PATCH 21/24] Update src/webrtc.rs webrtc session clean fallback Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/webrtc.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/webrtc.rs b/src/webrtc.rs index 3c47798e6..04a68ccd3 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -308,7 +308,23 @@ impl WebRTCStream { sessions_lock.remove(&k); log::debug!("WebRTC session removed key: {}", k); } - Err(_e) => {} + Err(e) => { + log::error!("Failed to extract key for peer during session cleanup: {:?}", e); + // Fallback: try to remove any session associated with this peer connection + let keys_to_remove: Vec = sessions_lock.iter() + .filter_map(|(key, session)| { + if Arc::ptr_eq(&session.peer_connection, &pc_for_close2) { + Some(key.clone()) + } else { + None + } + }) + .collect(); + for k in keys_to_remove { + sessions_lock.remove(&k); + log::debug!("WebRTC session removed by fallback key: {}", k); + } + } } } _ => {} From 652f68fd54c9015d626888b732505414d36acfa9 Mon Sep 17 00:00:00 2001 From: lichon Date: Mon, 17 Nov 2025 15:01:47 +0800 Subject: [PATCH 22/24] Update examples/webrtc_dummy.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- examples/webrtc_dummy.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/webrtc_dummy.rs b/examples/webrtc_dummy.rs index c826d64a7..a4c5b1e68 100644 --- a/examples/webrtc_dummy.rs +++ b/examples/webrtc_dummy.rs @@ -17,7 +17,7 @@ impl Clone for WebRTCStream { } impl WebRTCStream { - pub async fn new(_: &str, _: u64) -> ResultType { + pub async fn new(_: &str, _: bool, _: u64) -> ResultType { Ok(Self {}) } From e4224a19bc14f6f8fdd4d665491d9c1cfd974354 Mon Sep 17 00:00:00 2001 From: lc Date: Mon, 17 Nov 2025 15:19:20 +0800 Subject: [PATCH 23/24] Apply suggestions from code review --- src/webrtc.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/webrtc.rs b/src/webrtc.rs index 04a68ccd3..8f3c410cc 100644 --- a/src/webrtc.rs +++ b/src/webrtc.rs @@ -309,11 +309,15 @@ impl WebRTCStream { log::debug!("WebRTC session removed key: {}", k); } Err(e) => { - log::error!("Failed to extract key for peer during session cleanup: {:?}", e); + log::error!( + "Failed to extract key for peer during session cleanup: {:?}", + e + ); // Fallback: try to remove any session associated with this peer connection - let keys_to_remove: Vec = sessions_lock.iter() + let keys_to_remove: Vec = sessions_lock + .iter() .filter_map(|(key, session)| { - if Arc::ptr_eq(&session.peer_connection, &pc_for_close2) { + if Arc::ptr_eq(&session.pc, &pc_for_close2) { Some(key.clone()) } else { None @@ -357,13 +361,19 @@ impl WebRTCStream { log::debug!("Start webrtc with remote key: {}", key); } + let mut final_lock = SESSIONS.lock().await; + if let Some(session) = final_lock.get(&key) { + pc.close().await.ok(); + return Ok(session.clone()); + } + let webrtc_stream = Self { pc, stream, state_notify: notify_rx, send_timeout: ms_timeout, }; - SESSIONS.lock().await.insert(key, webrtc_stream.clone()); + final_lock.insert(key, webrtc_stream.clone()); Ok(webrtc_stream) } @@ -576,6 +586,10 @@ mod tests { "turn:example.com:3478" ); assert_eq!(WebRTCStream::get_ice_servers().len(), 2); + config::Config::set_option( + "ice-servers".to_string(), + "".to_string(), + ); } #[test] From 4e167838243997064e62fac038f42974275173c3 Mon Sep 17 00:00:00 2001 From: lc Date: Thu, 20 Nov 2025 14:58:10 +0800 Subject: [PATCH 24/24] remove dummy webrtc stream, add api to get webrtc stream --- examples/webrtc.rs | 12 +++-------- examples/webrtc_dummy.rs | 45 ---------------------------------------- src/stream.rs | 9 ++++++++ 3 files changed, 12 insertions(+), 54 deletions(-) delete mode 100644 examples/webrtc_dummy.rs diff --git a/examples/webrtc.rs b/examples/webrtc.rs index 177166347..2c993caa1 100644 --- a/examples/webrtc.rs +++ b/examples/webrtc.rs @@ -2,21 +2,13 @@ extern crate hbb_common; #[cfg(feature = "webrtc")] use hbb_common::webrtc::WebRTCStream; -#[cfg(not(feature = "webrtc"))] -mod webrtc_dummy; -#[cfg(not(feature = "webrtc"))] -use crate::webrtc_dummy::WebRTCStream; use std::io::Write; - use anyhow::Result; use bytes::Bytes; use clap::{Arg, Command}; use tokio::time::Duration; -#[cfg(feature = "webrtc")] -use webrtc::peer_connection::math_rand_alpha; - #[cfg(not(feature = "webrtc"))] #[tokio::main] async fn main() -> Result<()> { @@ -121,6 +113,7 @@ async fn main() -> Result<()> { } // read_loop shows how to read from the datachannel directly +#[cfg(feature = "webrtc")] async fn read_loop(mut stream: WebRTCStream) -> Result<()> { loop { let Some(res) = stream.next().await else { @@ -140,6 +133,7 @@ async fn read_loop(mut stream: WebRTCStream) -> Result<()> { } // write_loop shows how to write to the webrtc stream directly +#[cfg(feature = "webrtc")] async fn write_loop(mut stream: WebRTCStream) -> Result<()> { let mut result = Result::<()>::Ok(()); while result.is_ok() { @@ -148,7 +142,7 @@ async fn write_loop(mut stream: WebRTCStream) -> Result<()> { tokio::select! { _ = timeout.as_mut() =>{ - let message = math_rand_alpha(15); + let message = webrtc::peer_connection::math_rand_alpha(15); result = stream.send_bytes(Bytes::from(message.clone())).await; println!("Sent '{message}' {}", result.is_ok()); } diff --git a/examples/webrtc_dummy.rs b/examples/webrtc_dummy.rs deleted file mode 100644 index a4c5b1e68..000000000 --- a/examples/webrtc_dummy.rs +++ /dev/null @@ -1,45 +0,0 @@ -use std::io::Error; - -use bytes::BytesMut; - -use hbb_common::ResultType; - -/// Dummy implementation of WebRTCStream used when the `webrtc` feature is disabled. -/// This struct allows the code to compile and run without actual WebRTC functionality. -pub struct WebRTCStream { - // mock struct -} - -impl Clone for WebRTCStream { - fn clone(&self) -> Self { - WebRTCStream {} - } -} - -impl WebRTCStream { - pub async fn new(_: &str, _: bool, _: u64) -> ResultType { - Ok(Self {}) - } - - #[inline] - pub async fn get_local_endpoint(&self) -> ResultType { - Ok(String::new()) - } - - #[inline] - pub async fn set_remote_endpoint(&self, _: &str) -> ResultType<()> { - Ok(()) - } - - #[inline] - pub async fn send_bytes(&mut self, _: bytes::Bytes) -> ResultType<()> { - Ok(()) - } - - #[inline] - pub async fn next(&mut self) -> Option> { - None - } -} - - diff --git a/src/stream.rs b/src/stream.rs index 6a5ac858d..a8e6b6c2d 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -137,4 +137,13 @@ impl Stream { pub fn from(stream: TcpStream, stream_addr: SocketAddr) -> Self { Self::Tcp(tcp::FramedStream::from(stream, stream_addr)) } + + #[inline] + #[cfg(feature = "webrtc")] + pub fn get_webrtc_stream(&self) -> Option { + match self { + Self::WebRTC(s) => Some(s.clone()), + _ => None, + } + } }