websocket

Signed-off-by: 21pages <sunboeasy@gmail.com>
This commit is contained in:
21pages
2025-04-30 09:43:44 +08:00
parent d64954ae22
commit 585bd1f152
4 changed files with 292 additions and 38 deletions

View File

@@ -1,5 +1,9 @@
use crate::{
config::Socks5Server, protobuf::Message, sodiumoxide::crypto::secretbox::Key, tcp::Encrypt,
config::{use_ws, Config, Socks5Server, RELAY_PORT, RENDEZVOUS_PORT},
protobuf::Message,
socket_client::split_host_port,
sodiumoxide::crypto::secretbox::Key,
tcp::Encrypt,
ResultType,
};
use bytes::{Bytes, BytesMut};
@@ -33,7 +37,6 @@ impl WsFramedStream {
let url_str = url.as_ref();
// to-do: websocket proxy.
log::info!("{:?}", url_str);
let request = url_str
.into_client_request()
@@ -44,6 +47,10 @@ impl WsFramedStream {
let addr = match stream.get_ref() {
MaybeTlsStream::Plain(tcp) => tcp.peer_addr()?,
#[cfg(any(target_os = "macos", target_os = "windows"))]
MaybeTlsStream::NativeTls(tls) => tls.get_ref().get_ref().get_ref().peer_addr()?,
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
MaybeTlsStream::Rustls(tls) => tls.get_ref().0.peer_addr()?,
_ => return Err(Error::new(ErrorKind::Other, "Unsupported stream type").into()),
};
@@ -126,14 +133,11 @@ impl WsFramedStream {
#[inline]
pub async fn next(&mut self) -> Option<Result<BytesMut, Error>> {
log::debug!("Waiting for next message");
while let Some(msg) = self.stream.next().await {
log::debug!("receive msg: {:?}", msg);
let msg = match msg {
Ok(msg) => msg,
Err(e) => {
log::debug!("{}", e);
log::error!("{}", e);
return Some(Err(Error::new(
ErrorKind::Other,
format!("WebSocket protocol error: {}", e),
@@ -141,10 +145,8 @@ impl WsFramedStream {
}
};
log::debug!("Received message type: {}", msg.to_string());
match msg {
WsMessage::Binary(data) => {
log::info!("Received binary data ({} bytes)", data.len());
let mut bytes = BytesMut::from(&data[..]);
if let Some(key) = self.encrypt.as_mut() {
if let Err(err) = key.dec(&mut bytes) {
@@ -154,16 +156,13 @@ impl WsFramedStream {
return Some(Ok(bytes));
}
WsMessage::Text(text) => {
log::debug!("Received text message, converting to binary");
let bytes = BytesMut::from(text.as_bytes());
return Some(Ok(bytes));
}
WsMessage::Close(_) => {
log::info!("Received close frame");
return None;
}
_ => {
log::debug!("Unhandled message type: {}", msg.to_string());
continue;
}
}
@@ -180,3 +179,219 @@ impl WsFramedStream {
}
}
}
pub fn is_ws_endpoint(endpoint: &str) -> bool {
endpoint.starts_with("ws://") || endpoint.starts_with("wss://")
}
/**
* Core function to convert an endpoint to WebSocket format
*
* Converts between different address formats:
* 1. IPv4 address with/without port -> ws://ipv4:port
* 2. IPv6 address with/without port -> ws://[ipv6]:port
* 3. Domain with/without port -> ws(s)://domain/ws/path
*
* @param endpoint The endpoint to convert
* @return The converted WebSocket endpoint
*/
pub fn check_ws(endpoint: &str) -> String {
if !use_ws() {
return endpoint.to_string();
}
if endpoint.is_empty() {
return endpoint.to_string();
}
if is_ws_endpoint(endpoint) {
return endpoint.to_string();
}
let Some((endpoint_host, endpoint_port)) = split_host_port(endpoint) else {
debug_assert!(false, "endpoint doesn't have port");
return endpoint.to_string();
};
let custom_rendezvous_server = Config::get_rendezvous_server();
let relay_server = Config::get_option("relay-server");
let rendezvous_port = split_host_port(&custom_rendezvous_server)
.map(|(_, p)| p)
.unwrap_or(RENDEZVOUS_PORT);
let relay_port = split_host_port(&relay_server)
.map(|(_, p)| p)
.unwrap_or(RELAY_PORT);
let (relay, dst_port) = if endpoint_port == rendezvous_port {
// rendezvous
(false, endpoint_port + 2)
} else if endpoint_port == rendezvous_port - 1 {
// online
(false, endpoint_port + 3)
} else if endpoint_port == relay_port || endpoint_port == rendezvous_port + 1 {
// relay
// https://github.com/rustdesk/rustdesk/blob/6ffbcd1375771f2482ec4810680623a269be70f1/src/rendezvous_mediator.rs#L615
// https://github.com/rustdesk/rustdesk-server/blob/235a3c326ceb665e941edb50ab79faa1208f7507/src/relay_server.rs#L83, based on relay port.
(true, endpoint_port + 2)
} else {
// fallback relay
// for controlling side, relay server is passed from the controlled side, not related to local config.
(true, endpoint_port + 2)
};
let (address, is_domain) = if crate::is_ip_str(endpoint) {
(format!("{}:{}", endpoint_host, dst_port), false)
} else {
let domain_path = if relay { "/ws/relay" } else { "/ws/id" };
(format!("{}{}", endpoint_host, domain_path), true)
};
let protocol = if is_domain {
let api_server = Config::get_option("api-server");
if api_server.starts_with("https") {
"wss"
} else {
"ws"
}
} else {
"ws"
};
format!("{}://{}", protocol, address)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{keys, Config};
#[test]
fn test_check_ws() {
// enable websocket
Config::set_option(keys::OPTION_ALLOW_WEBSOCKET.to_string(), "Y".to_string());
// not set custom-rendezvous-server
Config::set_option("custom-rendezvous-server".to_string(), "".to_string());
Config::set_option("relay-server".to_string(), "".to_string());
Config::set_option("api-server".to_string(), "".to_string());
assert_eq!(check_ws("127.0.0.1:21115"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21116"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21117"), "ws://127.0.0.1:21119");
assert_eq!(check_ws("rustdesk.com:21115"), "ws://rustdesk.com/ws/id");
assert_eq!(check_ws("rustdesk.com:21116"), "ws://rustdesk.com/ws/id");
assert_eq!(check_ws("rustdesk.com:21117"), "ws://rustdesk.com/ws/relay");
// set relay-server without port
Config::set_option("relay-server".to_string(), "127.0.0.1".to_string());
Config::set_option(
"api-server".to_string(),
"https://api.rustdesk.com".to_string(),
);
assert_eq!(
check_ws("[0:0:0:0:0:0:0:1]:21115"),
"ws://[0:0:0:0:0:0:0:1]:21118"
);
assert_eq!(
check_ws("[0:0:0:0:0:0:0:1]:21116"),
"ws://[0:0:0:0:0:0:0:1]:21118"
);
assert_eq!(
check_ws("[0:0:0:0:0:0:0:1]:21117"),
"ws://[0:0:0:0:0:0:0:1]:21119"
);
assert_eq!(check_ws("rustdesk.com:21115"), "wss://rustdesk.com/ws/id");
assert_eq!(check_ws("rustdesk.com:21116"), "wss://rustdesk.com/ws/id");
assert_eq!(
check_ws("rustdesk.com:21117"),
"wss://rustdesk.com/ws/relay"
);
// set relay-server with default port
Config::set_option("relay-server".to_string(), "127.0.0.1:21117".to_string());
assert_eq!(check_ws("127.0.0.1:21115"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21116"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21117"), "ws://127.0.0.1:21119");
// set relay-server with custom port
Config::set_option("relay-server".to_string(), "127.0.0.1:34567".to_string());
assert_eq!(check_ws("rustdesk.com:21115"), "wss://rustdesk.com/ws/id");
assert_eq!(check_ws("rustdesk.com:21116"), "wss://rustdesk.com/ws/id");
assert_eq!(
check_ws("rustdesk.com:34567"),
"wss://rustdesk.com/ws/relay"
);
// set custom-rendezvous-server without port
Config::set_option(
"custom-rendezvous-server".to_string(),
"127.0.0.1".to_string(),
);
Config::set_option("relay-server".to_string(), "".to_string());
Config::set_option("api-server".to_string(), "".to_string());
assert_eq!(check_ws("127.0.0.1:21115"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21116"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21117"), "ws://127.0.0.1:21119");
// set relay-server without port
Config::set_option("relay-server".to_string(), "127.0.0.1".to_string());
assert_eq!(check_ws("127.0.0.1:21115"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21116"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21117"), "ws://127.0.0.1:21119");
// set relay-server with default port
Config::set_option("relay-server".to_string(), "127.0.0.1:21117".to_string());
assert_eq!(check_ws("127.0.0.1:21115"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21116"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21117"), "ws://127.0.0.1:21119");
// set relay-server with custom port
Config::set_option("relay-server".to_string(), "127.0.0.1:34567".to_string());
assert_eq!(check_ws("127.0.0.1:21115"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21116"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:34567"), "ws://127.0.0.1:34569");
// set custom-rendezvous-server without default port
Config::set_option(
"custom-rendezvous-server".to_string(),
"127.0.0.1".to_string(),
);
Config::set_option("relay-server".to_string(), "".to_string());
Config::set_option("api-server".to_string(), "".to_string());
assert_eq!(check_ws("127.0.0.1:21115"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21116"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21117"), "ws://127.0.0.1:21119");
// set relay-server without port
Config::set_option("relay-server".to_string(), "127.0.0.1".to_string());
assert_eq!(check_ws("127.0.0.1:21115"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21116"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21117"), "ws://127.0.0.1:21119");
// set relay-server with default port
Config::set_option("relay-server".to_string(), "127.0.0.1:21117".to_string());
assert_eq!(check_ws("127.0.0.1:21115"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21116"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21117"), "ws://127.0.0.1:21119");
// set relay-server with custom port
Config::set_option("relay-server".to_string(), "127.0.0.1:34567".to_string());
assert_eq!(check_ws("127.0.0.1:21115"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:21116"), "ws://127.0.0.1:21118");
assert_eq!(check_ws("127.0.0.1:34567"), "ws://127.0.0.1:34569");
// set custom-rendezvous-server with custom port
Config::set_option(
"custom-rendezvous-server".to_string(),
"127.0.0.1:23456".to_string(),
);
Config::set_option("relay-server".to_string(), "".to_string());
Config::set_option("api-server".to_string(), "".to_string());
assert_eq!(check_ws("127.0.0.1:23455"), "ws://127.0.0.1:23458");
assert_eq!(check_ws("127.0.0.1:23456"), "ws://127.0.0.1:23458");
assert_eq!(check_ws("127.0.0.1:23457"), "ws://127.0.0.1:23459");
// set relay-server without port
Config::set_option("relay-server".to_string(), "127.0.0.1".to_string());
assert_eq!(check_ws("127.0.0.1:23455"), "ws://127.0.0.1:23458");
assert_eq!(check_ws("127.0.0.1:23456"), "ws://127.0.0.1:23458");
assert_eq!(check_ws("127.0.0.1:21117"), "ws://127.0.0.1:21119");
// set relay-server with default port
Config::set_option("relay-server".to_string(), "127.0.0.1:21117".to_string());
assert_eq!(check_ws("127.0.0.1:23455"), "ws://127.0.0.1:23458");
assert_eq!(check_ws("127.0.0.1:23456"), "ws://127.0.0.1:23458");
assert_eq!(check_ws("127.0.0.1:21117"), "ws://127.0.0.1:21119");
// set relay-server with custom port
Config::set_option("relay-server".to_string(), "127.0.0.1:34567".to_string());
assert_eq!(check_ws("127.0.0.1:23455"), "ws://127.0.0.1:23458");
assert_eq!(check_ws("127.0.0.1:23456"), "ws://127.0.0.1:23458");
assert_eq!(check_ws("127.0.0.1:34567"), "ws://127.0.0.1:34569");
}
}