From 90e64297c05994792e6feaa14538970fa12d1550 Mon Sep 17 00:00:00 2001 From: timvisee Date: Mon, 15 Nov 2021 20:18:52 +0100 Subject: [PATCH] Add support for packet compression --- Cargo.lock | 1 + Cargo.toml | 1 + src/action/start.rs | 9 ++ src/lobby.rs | 199 ++++++++++++++++++++++++++++++++------------ src/monitor.rs | 54 +++++++----- src/proto.rs | 153 ++++++++++++++++++++++++++++++---- src/status.rs | 12 +-- 7 files changed, 334 insertions(+), 95 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e7c95ee..1c5c858 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -628,6 +628,7 @@ dependencies = [ "colored", "derive_builder", "dotenv", + "flate2", "futures", "libc", "log", diff --git a/Cargo.toml b/Cargo.toml index 557f6fb..bf3d249 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ clap = { version = "3.0.0-beta.5", default-features = false, features = [ "std", colored = "2.0" derive_builder = "0.10" dotenv = "0.15" +flate2 = { version = "1.0", default-features = false, features = ["default"] } futures = { version = "0.3", default-features = false } log = "0.4" minecraft-protocol = { git = "https://github.com/timvisee/rust-minecraft-protocol", branch = "lazymc-v1_17_1" } diff --git a/src/action/start.rs b/src/action/start.rs index 36adafb..77205f0 100644 --- a/src/action/start.rs +++ b/src/action/start.rs @@ -5,6 +5,7 @@ use clap::ArgMatches; use crate::config::{self, Config}; use crate::mc::server_properties; +use crate::proto; use crate::service; /// RCON randomized password length. @@ -141,6 +142,14 @@ fn rewrite_server_properties(config: &Config) { changes.extend([("prevent-proxy-connections", "false".into())]); } + // Update network compression threshold for lobby mode + if config.join.methods.contains(&config::Method::Lobby) { + changes.extend([( + "network-compression-threshold", + proto::COMPRESSION_THRESHOLD.to_string(), + )]); + } + // Add RCON configuration #[cfg(feature = "rcon")] if config.rcon.enabled { diff --git a/src/lobby.rs b/src/lobby.rs index 1fa0dbf..cae8674 100644 --- a/src/lobby.rs +++ b/src/lobby.rs @@ -10,7 +10,7 @@ use minecraft_protocol::data::chat::{Message, Payload}; use minecraft_protocol::decoder::Decoder; use minecraft_protocol::encoder::Encoder; use minecraft_protocol::version::v1_14_4::handshake::Handshake; -use minecraft_protocol::version::v1_14_4::login::{LoginStart, LoginSuccess}; +use minecraft_protocol::version::v1_14_4::login::{LoginStart, LoginSuccess, SetCompression}; use minecraft_protocol::version::v1_17_1::game::{ ClientBoundKeepAlive, JoinGame, NamedSoundEffect, PlayerPositionAndLook, PluginMessage, Respawn, SetTitleSubtitle, SetTitleText, SetTitleTimes, TimeUpdate, @@ -91,7 +91,8 @@ pub async fn serve( loop { // Read packet from stream - let (packet, _raw) = match proto::read_packet(&mut inbound_buf, &mut reader).await { + let (packet, _raw) = match proto::read_packet(&client, &mut inbound_buf, &mut reader).await + { Ok(Some(packet)) => packet, Ok(None) => break, Err(_) => { @@ -112,27 +113,36 @@ pub async fn serve( debug!(target: "lazymc::lobby", "Login on lobby server (user: {})", login_start.name); + // Respond with set compression if compression is enabled based on threshold + if proto::COMPRESSION_THRESHOLD >= 0 { + trace!(target: "lazymc::lobby", "Enabling compression for lobby client because server has it enabled (threshold: {})", proto::COMPRESSION_THRESHOLD); + respond_set_compression(&client, &mut writer, proto::COMPRESSION_THRESHOLD).await?; + client.set_compression(proto::COMPRESSION_THRESHOLD); + } + // Respond with login success, switch to play state - respond_login_success(&mut writer, &login_start).await?; + respond_login_success(&client, &mut writer, &login_start).await?; client.set_state(ClientState::Play); trace!(target: "lazymc::lobby", "Client login success, sending required play packets for lobby world"); // Send packets to client required to get into workable play state for lobby world - send_lobby_play_packets(&mut writer, &server).await?; + send_lobby_play_packets(&client, &mut writer, &server).await?; // Wait for server to come online, then set up new connection to it - stage_wait(&server, &config, &mut writer).await?; - let (mut outbound, mut server_buf) = connect_to_server(client_info, &config).await?; + stage_wait(&client, &server, &config, &mut writer).await?; + let (server_client, mut outbound, mut server_buf) = + connect_to_server(client_info, &config).await?; // Grab join game packet from server - let join_game = wait_for_server_join_game(&mut outbound, &mut server_buf).await?; + let join_game = + wait_for_server_join_game(&server_client, &mut outbound, &mut server_buf).await?; // Reset lobby title - send_lobby_title(&mut writer, "").await?; + send_lobby_title(&client, &mut writer, "").await?; // Play ready sound if configured - play_lobby_ready_sound(&mut writer, &config).await?; + play_lobby_ready_sound(&client, &mut writer, &config).await?; // Wait a second because Notchian servers are slow // See: https://wiki.vg/Protocol#Login_Success @@ -140,7 +150,7 @@ pub async fn serve( time::sleep(SERVER_WARMUP).await; // Send respawn packet, initiates teleport to real server world - send_respawn_from_join(&mut writer, join_game).await?; + send_respawn_from_join(&client, &mut writer, join_game).await?; // Drain inbound connection so we don't confuse the server // TODO: can we void everything? we might need to forward everything to server except @@ -174,9 +184,28 @@ pub async fn serve( Ok(()) } +/// Respond to client with a set compression packet. +async fn respond_set_compression( + client: &Client, + writer: &mut WriteHalf<'_>, + threshold: i32, +) -> Result<(), ()> { + let packet = SetCompression { threshold }; + + let mut data = Vec::new(); + packet.encode(&mut data).map_err(|_| ())?; + + let response = + RawPacket::new(proto::packets::login::CLIENT_SET_COMPRESSION, data).encode(client)?; + writer.write_all(&response).await.map_err(|_| ())?; + + Ok(()) +} + /// Respond to client with login success packet // TODO: support online mode here async fn respond_login_success( + client: &Client, writer: &mut WriteHalf<'_>, login_start: &LoginStart, ) -> Result<(), ()> { @@ -191,14 +220,19 @@ async fn respond_login_success( let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(proto::packets::login::CLIENT_LOGIN_SUCCESS, data).encode()?; + let response = + RawPacket::new(proto::packets::login::CLIENT_LOGIN_SUCCESS, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) } /// Play lobby ready sound effect if configured. -async fn play_lobby_ready_sound(writer: &mut WriteHalf<'_>, config: &Config) -> Result<(), ()> { +async fn play_lobby_ready_sound( + client: &Client, + writer: &mut WriteHalf<'_>, + config: &Config, +) -> Result<(), ()> { if let Some(sound_name) = config.join.lobby.ready_sound.as_ref() { // Must not be empty string if sound_name.trim().is_empty() { @@ -207,34 +241,42 @@ async fn play_lobby_ready_sound(writer: &mut WriteHalf<'_>, config: &Config) -> } // Play sound effect - send_lobby_player_pos(writer).await?; - send_lobby_sound_effect(writer, sound_name).await?; + send_lobby_player_pos(client, writer).await?; + send_lobby_sound_effect(client, writer, sound_name).await?; } Ok(()) } /// Send packets to client to get workable play state for lobby world. -async fn send_lobby_play_packets(writer: &mut WriteHalf<'_>, server: &Server) -> Result<(), ()> { +async fn send_lobby_play_packets( + client: &Client, + writer: &mut WriteHalf<'_>, + server: &Server, +) -> Result<(), ()> { // See: https://wiki.vg/Protocol_FAQ#What.27s_the_normal_login_sequence_for_a_client.3F // Send initial game join - send_lobby_join_game(writer, server).await?; + send_lobby_join_game(client, writer, server).await?; // Send server brand - send_lobby_brand(writer).await?; + send_lobby_brand(client, writer).await?; // Send spawn and player position, disables 'download terrain' screen - send_lobby_player_pos(writer).await?; + send_lobby_player_pos(client, writer).await?; // Notify client of world time, required once before keep-alive packets - send_lobby_time_update(writer).await?; + send_lobby_time_update(client, writer).await?; Ok(()) } /// Send initial join game packet to client for lobby. -async fn send_lobby_join_game(writer: &mut WriteHalf<'_>, server: &Server) -> Result<(), ()> { +async fn send_lobby_join_game( + client: &Client, + writer: &mut WriteHalf<'_>, + server: &Server, +) -> Result<(), ()> { // Send Minecrafts default states, slightly customised for lobby world let packet = { let status = server.status().await; @@ -271,14 +313,14 @@ async fn send_lobby_join_game(writer: &mut WriteHalf<'_>, server: &Server) -> Re let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(proto::packets::play::CLIENT_JOIN_GAME, data).encode()?; + let response = RawPacket::new(proto::packets::play::CLIENT_JOIN_GAME, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) } /// Send lobby brand to client. -async fn send_lobby_brand(writer: &mut WriteHalf<'_>) -> Result<(), ()> { +async fn send_lobby_brand(client: &Client, writer: &mut WriteHalf<'_>) -> Result<(), ()> { let packet = PluginMessage { channel: "minecraft:brand".into(), data: SERVER_BRAND.into(), @@ -287,14 +329,15 @@ async fn send_lobby_brand(writer: &mut WriteHalf<'_>) -> Result<(), ()> { let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(proto::packets::play::CLIENT_PLUGIN_MESSAGE, data).encode()?; + let response = + RawPacket::new(proto::packets::play::CLIENT_PLUGIN_MESSAGE, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) } /// Send lobby player position to client. -async fn send_lobby_player_pos(writer: &mut WriteHalf<'_>) -> Result<(), ()> { +async fn send_lobby_player_pos(client: &Client, writer: &mut WriteHalf<'_>) -> Result<(), ()> { // Send player location, disables download terrain screen let packet = PlayerPositionAndLook { x: 0.0, @@ -310,14 +353,15 @@ async fn send_lobby_player_pos(writer: &mut WriteHalf<'_>) -> Result<(), ()> { let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(proto::packets::play::CLIENT_PLAYER_POS_LOOK, data).encode()?; + let response = + RawPacket::new(proto::packets::play::CLIENT_PLAYER_POS_LOOK, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) } /// Send lobby time update to client. -async fn send_lobby_time_update(writer: &mut WriteHalf<'_>) -> Result<(), ()> { +async fn send_lobby_time_update(client: &Client, writer: &mut WriteHalf<'_>) -> Result<(), ()> { const MC_TIME_NOON: i64 = 6000; // Send time update, required once for keep-alive packets @@ -329,7 +373,7 @@ async fn send_lobby_time_update(writer: &mut WriteHalf<'_>) -> Result<(), ()> { let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(proto::packets::play::CLIENT_TIME_UPDATE, data).encode()?; + let response = RawPacket::new(proto::packets::play::CLIENT_TIME_UPDATE, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) @@ -338,7 +382,7 @@ async fn send_lobby_time_update(writer: &mut WriteHalf<'_>) -> Result<(), ()> { /// Send keep alive packet to client. /// /// Required periodically in play mode to prevent client timeout. -async fn send_keep_alive(writer: &mut WriteHalf<'_>) -> Result<(), ()> { +async fn send_keep_alive(client: &Client, writer: &mut WriteHalf<'_>) -> Result<(), ()> { let packet = ClientBoundKeepAlive { // Keep sending new IDs id: KEEP_ALIVE_ID.fetch_add(1, Ordering::Relaxed), @@ -347,7 +391,7 @@ async fn send_keep_alive(writer: &mut WriteHalf<'_>) -> Result<(), ()> { let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(proto::packets::play::CLIENT_KEEP_ALIVE, data).encode()?; + let response = RawPacket::new(proto::packets::play::CLIENT_KEEP_ALIVE, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; // TODO: verify we receive keep alive response with same ID from client @@ -360,7 +404,11 @@ async fn send_keep_alive(writer: &mut WriteHalf<'_>) -> Result<(), ()> { /// This will show the given text for two keep-alive periods. Use a newline for the subtitle. /// /// If an empty string is given, the title times will be reset to default. -async fn send_lobby_title(writer: &mut WriteHalf<'_>, text: &str) -> Result<(), ()> { +async fn send_lobby_title( + client: &Client, + writer: &mut WriteHalf<'_>, + text: &str, +) -> Result<(), ()> { // Grab title and subtitle bits let title = text.lines().next().unwrap_or(""); let subtitle = text.lines().skip(1).collect::>().join("\n"); @@ -373,7 +421,8 @@ async fn send_lobby_title(writer: &mut WriteHalf<'_>, text: &str) -> Result<(), let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_TEXT, data).encode()?; + let response = + RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_TEXT, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; // Set subtitle @@ -385,7 +434,7 @@ async fn send_lobby_title(writer: &mut WriteHalf<'_>, text: &str) -> Result<(), packet.encode(&mut data).map_err(|_| ())?; let response = - RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_SUBTITLE, data).encode()?; + RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_SUBTITLE, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; // Set title times @@ -407,14 +456,19 @@ async fn send_lobby_title(writer: &mut WriteHalf<'_>, text: &str) -> Result<(), let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_TIMES, data).encode()?; + let response = + RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_TIMES, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) } /// Send lobby ready sound effect to client. -async fn send_lobby_sound_effect(writer: &mut WriteHalf<'_>, sound_name: &str) -> Result<(), ()> { +async fn send_lobby_sound_effect( + client: &Client, + writer: &mut WriteHalf<'_>, + sound_name: &str, +) -> Result<(), ()> { let packet = NamedSoundEffect { sound_name: sound_name.into(), sound_category: 0, @@ -429,7 +483,7 @@ async fn send_lobby_sound_effect(writer: &mut WriteHalf<'_>, sound_name: &str) - packet.encode(&mut data).map_err(|_| ())?; let response = - RawPacket::new(proto::packets::play::CLIENT_NAMED_SOUND_EFFECT, data).encode()?; + RawPacket::new(proto::packets::play::CLIENT_NAMED_SOUND_EFFECT, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) @@ -438,7 +492,11 @@ async fn send_lobby_sound_effect(writer: &mut WriteHalf<'_>, sound_name: &str) - /// Send respawn packet to client to jump from lobby into now loaded server. /// /// The required details will be fetched from the `join_game` packet as provided by the server. -async fn send_respawn_from_join(writer: &mut WriteHalf<'_>, join_game: JoinGame) -> Result<(), ()> { +async fn send_respawn_from_join( + client: &Client, + writer: &mut WriteHalf<'_>, + join_game: JoinGame, +) -> Result<(), ()> { let packet = Respawn { dimension: join_game.dimension, world_name: join_game.world_name, @@ -453,7 +511,7 @@ async fn send_respawn_from_join(writer: &mut WriteHalf<'_>, join_game: JoinGame) let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(proto::packets::play::CLIENT_RESPAWN, data).encode()?; + let response = RawPacket::new(proto::packets::play::CLIENT_RESPAWN, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) @@ -462,7 +520,11 @@ async fn send_respawn_from_join(writer: &mut WriteHalf<'_>, join_game: JoinGame) /// An infinite keep-alive loop. /// /// This will keep sending keep-alive and title packets to the client until it is dropped. -async fn keep_alive_loop(writer: &mut WriteHalf<'_>, config: &Config) -> Result<(), ()> { +async fn keep_alive_loop( + client: &Client, + writer: &mut WriteHalf<'_>, + config: &Config, +) -> Result<(), ()> { let mut interval = time::interval(KEEP_ALIVE_INTERVAL); loop { @@ -471,8 +533,8 @@ async fn keep_alive_loop(writer: &mut WriteHalf<'_>, config: &Config) -> Result< trace!(target: "lazymc::lobby", "Sending keep-alive sequence to lobby client"); // Send keep alive and title packets - send_keep_alive(writer).await?; - send_lobby_title(writer, &config.join.lobby.message).await?; + send_keep_alive(client, writer).await?; + send_lobby_title(client, writer, &config.join.lobby.message).await?; } } @@ -482,12 +544,13 @@ async fn keep_alive_loop(writer: &mut WriteHalf<'_>, config: &Config) -> Result< /// /// During this stage we keep sending keep-alive and title packets to the client to keep it active. async fn stage_wait<'a>( + client: &Client, server: &Server, config: &Config, writer: &mut WriteHalf<'a>, ) -> Result<(), ()> { select! { - a = keep_alive_loop(writer, config) => a, + a = keep_alive_loop(client, writer, config) => a, b = wait_for_server(server, config) => b, } } @@ -553,7 +616,7 @@ async fn wait_for_server<'a>(server: &Server, config: &Config) -> Result<(), ()> async fn connect_to_server( client_info: ClientInfo, config: &Config, -) -> Result<(TcpStream, BytesMut), ()> { +) -> Result<(Client, TcpStream, BytesMut), ()> { time::timeout( SERVER_CONNECT_TIMEOUT, connect_to_server_no_timeout(client_info, config), @@ -561,7 +624,6 @@ async fn connect_to_server( .await .map_err(|_| { error!(target: "lazymc::lobby", "Creating new server connection for lobby client timed out after {}s", SERVER_CONNECT_TIMEOUT.as_secs()); - () })? } @@ -572,7 +634,7 @@ async fn connect_to_server( async fn connect_to_server_no_timeout( client_info: ClientInfo, config: &Config, -) -> Result<(TcpStream, BytesMut), ()> { +) -> Result<(Client, TcpStream, BytesMut), ()> { // Open connection // TODO: on connect fail, ping server and redirect to serve_status if offline let mut outbound = TcpStream::connect(config.server.address) @@ -595,7 +657,8 @@ async fn connect_to_server_no_timeout( let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let request = RawPacket::new(proto::packets::handshake::SERVER_HANDSHAKE, data).encode()?; + let request = + RawPacket::new(proto::packets::handshake::SERVER_HANDSHAKE, data).encode(&tmp_client)?; writer.write_all(&request).await.map_err(|_| ())?; // Request login start @@ -606,7 +669,8 @@ async fn connect_to_server_no_timeout( let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let request = RawPacket::new(proto::packets::login::SERVER_LOGIN_START, data).encode()?; + let request = + RawPacket::new(proto::packets::login::SERVER_LOGIN_START, data).encode(&tmp_client)?; writer.write_all(&request).await.map_err(|_| ())?; // Incoming buffer @@ -614,7 +678,7 @@ async fn connect_to_server_no_timeout( loop { // Read packet from stream - let (packet, _raw) = match proto::read_packet(&mut buf, &mut reader).await { + let (packet, _raw) = match proto::read_packet(&tmp_client, &mut buf, &mut reader).await { Ok(Some(packet)) => packet, Ok(None) => break, Err(_) => { @@ -626,6 +690,32 @@ async fn connect_to_server_no_timeout( // Grab client state let client_state = tmp_client.state(); + // Catch set compression + if client_state == ClientState::Login + && packet.id == proto::packets::login::CLIENT_SET_COMPRESSION + { + // Decode compression packet + let set_compression = + SetCompression::decode(&mut packet.data.as_slice()).map_err(|err| { + dbg!(err); + () + })?; + + // Client and server compression threshold should match, show warning if not + if set_compression.threshold != proto::COMPRESSION_THRESHOLD { + error!( + target: "lazymc::lobby", + "Compression threshold sent to lobby client does not match threshold from server, this may cause errors (client: {}, server: {})", + proto::COMPRESSION_THRESHOLD, + set_compression.threshold + ); + } + + // Set client compression + tmp_client.set_compression(set_compression.threshold); + continue; + } + // Hijack login success if client_state == ClientState::Login && packet.id == proto::packets::login::CLIENT_LOGIN_SUCCESS @@ -642,7 +732,12 @@ async fn connect_to_server_no_timeout( // Switch to play state tmp_client.set_state(ClientState::Play); - return Ok((outbound, buf)); + // Server must enable compression if enabled for client, show warning otherwise + if tmp_client.is_compressed() != (proto::COMPRESSION_THRESHOLD >= 0) { + error!(target: "lazymc::lobby", "Compression enabled for lobby client while the server did not, this will cause errors"); + } + + return Ok((tmp_client, outbound, buf)); } // Show unhandled packet warning @@ -665,17 +760,17 @@ async fn connect_to_server_no_timeout( /// /// This parses, consumes and returns the packet. async fn wait_for_server_join_game( + client: &Client, outbound: &mut TcpStream, buf: &mut BytesMut, ) -> Result { time::timeout( SERVER_JOIN_GAME_TIMEOUT, - wait_for_server_join_game_no_timeout(outbound, buf), + wait_for_server_join_game_no_timeout(client, outbound, buf), ) .await .map_err(|_| { error!(target: "lazymc::lobby", "Waiting for for game data from server for lobby client timed out after {}s", SERVER_JOIN_GAME_TIMEOUT.as_secs()); - () })? } @@ -685,6 +780,7 @@ async fn wait_for_server_join_game( // TODO: clean this up // TODO: do not drop error here, return Box async fn wait_for_server_join_game_no_timeout( + client: &Client, outbound: &mut TcpStream, buf: &mut BytesMut, ) -> Result { @@ -692,7 +788,7 @@ async fn wait_for_server_join_game_no_timeout( loop { // Read packet from stream - let (packet, _raw) = match proto::read_packet(buf, &mut reader).await { + let (packet, _raw) = match proto::read_packet(&client, buf, &mut reader).await { Ok(Some(packet)) => packet, Ok(None) => break, Err(_) => { @@ -705,7 +801,6 @@ async fn wait_for_server_join_game_no_timeout( if packet.id == proto::packets::play::CLIENT_JOIN_GAME { let join_game = JoinGame::decode(&mut packet.data.as_slice()).map_err(|err| { dbg!(err); - () })?; return Ok(join_game); diff --git a/src/monitor.rs b/src/monitor.rs index 053bf4a..07cf463 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -16,7 +16,7 @@ use tokio::net::TcpStream; use tokio::time; use crate::config::Config; -use crate::proto::{self, ClientState, RawPacket}; +use crate::proto::{self, Client, ClientState, RawPacket}; use crate::server::{Server, State}; /// Monitor ping inverval in seconds. @@ -96,22 +96,29 @@ pub async fn poll_server( async fn fetch_status(config: &Config, addr: SocketAddr) -> Result { let mut stream = TcpStream::connect(addr).await.map_err(|_| ())?; - send_handshake(&mut stream, config, addr).await?; - request_status(&mut stream).await?; - wait_for_status_timeout(&mut stream).await + // Dummy client + let client = Client::default(); + + send_handshake(&client, &mut stream, config, addr).await?; + request_status(&client, &mut stream).await?; + wait_for_status_timeout(&client, &mut stream).await } /// Attemp to ping server. async fn do_ping(config: &Config, addr: SocketAddr) -> Result<(), ()> { let mut stream = TcpStream::connect(addr).await.map_err(|_| ())?; - send_handshake(&mut stream, config, addr).await?; - let token = send_ping(&mut stream).await?; - wait_for_ping_timeout(&mut stream, token).await + // Dummy client + let client = Client::default(); + + send_handshake(&client, &mut stream, config, addr).await?; + let token = send_ping(&client, &mut stream).await?; + wait_for_ping_timeout(&client, &mut stream, token).await } /// Send handshake. async fn send_handshake( + client: &Client, stream: &mut TcpStream, config: &Config, addr: SocketAddr, @@ -127,7 +134,7 @@ async fn send_handshake( handshake.encode(&mut packet).map_err(|_| ())?; let raw = RawPacket::new(proto::packets::handshake::SERVER_HANDSHAKE, packet) - .encode() + .encode(&client) .map_err(|_| ())?; stream.write_all(&raw).await.map_err(|_| ())?; @@ -135,16 +142,16 @@ async fn send_handshake( } /// Send status request. -async fn request_status(stream: &mut TcpStream) -> Result<(), ()> { +async fn request_status(client: &Client, stream: &mut TcpStream) -> Result<(), ()> { let raw = RawPacket::new(proto::packets::status::SERVER_STATUS, vec![]) - .encode() + .encode(client) .map_err(|_| ())?; stream.write_all(&raw).await.map_err(|_| ())?; Ok(()) } /// Send status request. -async fn send_ping(stream: &mut TcpStream) -> Result { +async fn send_ping(client: &Client, stream: &mut TcpStream) -> Result { let token = rand::thread_rng().gen(); let ping = PingRequest { time: token }; @@ -152,21 +159,21 @@ async fn send_ping(stream: &mut TcpStream) -> Result { ping.encode(&mut packet).map_err(|_| ())?; let raw = RawPacket::new(proto::packets::status::SERVER_PING, packet) - .encode() + .encode(client) .map_err(|_| ())?; stream.write_all(&raw).await.map_err(|_| ())?; Ok(token) } /// Wait for a status response. -async fn wait_for_status(stream: &mut TcpStream) -> Result { +async fn wait_for_status(client: &Client, stream: &mut TcpStream) -> Result { // Get stream reader, set up buffer let (mut reader, mut _writer) = stream.split(); let mut buf = BytesMut::new(); loop { // Read packet from stream - let (packet, _raw) = match proto::read_packet(&mut buf, &mut reader).await { + let (packet, _raw) = match proto::read_packet(client, &mut buf, &mut reader).await { Ok(Some(packet)) => packet, Ok(None) => break, Err(_) => continue, @@ -184,22 +191,25 @@ async fn wait_for_status(stream: &mut TcpStream) -> Result { } /// Wait for a status response. -async fn wait_for_status_timeout(stream: &mut TcpStream) -> Result { - let status = wait_for_status(stream); +async fn wait_for_status_timeout( + client: &Client, + stream: &mut TcpStream, +) -> Result { + let status = wait_for_status(client, stream); tokio::time::timeout(Duration::from_secs(STATUS_TIMEOUT), status) .await .map_err(|_| ())? } /// Wait for a status response. -async fn wait_for_ping(stream: &mut TcpStream, token: u64) -> Result<(), ()> { +async fn wait_for_ping(client: &Client, stream: &mut TcpStream, token: u64) -> Result<(), ()> { // Get stream reader, set up buffer let (mut reader, mut _writer) = stream.split(); let mut buf = BytesMut::new(); loop { // Read packet from stream - let (packet, _raw) = match proto::read_packet(&mut buf, &mut reader).await { + let (packet, _raw) = match proto::read_packet(client, &mut buf, &mut reader).await { Ok(Some(packet)) => packet, Ok(None) => break, Err(_) => continue, @@ -223,8 +233,12 @@ async fn wait_for_ping(stream: &mut TcpStream, token: u64) -> Result<(), ()> { } /// Wait for a status response. -async fn wait_for_ping_timeout(stream: &mut TcpStream, token: u64) -> Result<(), ()> { - let status = wait_for_ping(stream, token); +async fn wait_for_ping_timeout( + client: &Client, + stream: &mut TcpStream, + token: u64, +) -> Result<(), ()> { + let status = wait_for_ping(client, stream, token); tokio::time::timeout(Duration::from_secs(PING_TIMEOUT), status) .await .map_err(|_| ())? diff --git a/src/proto.rs b/src/proto.rs index 13bb7df..999a737 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -1,6 +1,11 @@ +use std::io::prelude::*; +use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::Mutex; use bytes::BytesMut; +use flate2::read::ZlibDecoder; +use flate2::write::ZlibEncoder; +use flate2::Compression; use tokio::io; use tokio::io::AsyncReadExt; use tokio::net::tcp::ReadHalf; @@ -23,6 +28,10 @@ pub const PROTO_DEFAULT_VERSION: &str = "1.17.1"; /// Should be kept up-to-date with latest supported Minecraft version by lazymc. pub const PROTO_DEFAULT_PROTOCOL: u32 = 756; +/// Compression threshold to use. +// TODO: read this from server.properties instead +pub const COMPRESSION_THRESHOLD: i32 = 256; + /// Minecraft protocol packet IDs. #[allow(unused)] pub mod packets { @@ -38,9 +47,10 @@ pub mod packets { } pub mod login { - pub const CLIENT_DISCONNECT: i32 = 0; - pub const CLIENT_LOGIN_SUCCESS: i32 = 2; - pub const SERVER_LOGIN_START: i32 = 0; + pub const CLIENT_DISCONNECT: i32 = 0x00; + pub const CLIENT_LOGIN_SUCCESS: i32 = 0x02; + pub const CLIENT_SET_COMPRESSION: i32 = 0x03; + pub const SERVER_LOGIN_START: i32 = 0x00; } pub mod play { @@ -68,10 +78,15 @@ pub mod packets { /// /// Note: this does not keep track of compression/encryption states because packets are never /// inspected when these modes are enabled. -#[derive(Debug, Default)] +#[derive(Debug)] pub struct Client { /// Current client state. pub state: Mutex, + + /// Compression state. + /// + /// 0 or positive if enabled, negative if disabled. + pub compression: AtomicI32, } impl Client { @@ -84,6 +99,31 @@ impl Client { pub fn set_state(&self, state: ClientState) { *self.state.lock().unwrap() = state; } + + /// Get compression threshold. + pub fn compressed(&self) -> i32 { + self.compression.load(Ordering::Relaxed) + } + + /// Whether compression is used. + pub fn is_compressed(&self) -> bool { + self.compressed() >= 0 + } + + /// Set compression value. + pub fn set_compression(&self, threshold: i32) { + trace!(target: "lazymc", "Client now uses compression threshold of {}", threshold); + self.compression.store(threshold, Ordering::Relaxed); + } +} + +impl Default for Client { + fn default() -> Self { + Self { + state: Default::default(), + compression: AtomicI32::new(-1), + } + } } /// Protocol state a client may be in. @@ -166,12 +206,8 @@ impl RawPacket { Self { id, data } } - /// Decode packet from raw buffer. - pub fn decode(mut buf: &[u8]) -> Result { - // Read length - let (read, len) = types::read_var_int(buf)?; - buf = &buf[read..][..len as usize]; - + /// Read packet ID from buffer, use remaining buffer as data. + fn read_packet_id_data(mut buf: &[u8]) -> Result { // Read packet ID, select buf let (read, packet_id) = types::read_var_int(buf)?; buf = &buf[read..]; @@ -179,8 +215,94 @@ impl RawPacket { Ok(Self::new(packet_id, buf.to_vec())) } + /// Decode packet from raw buffer. + /// + /// This decodes both compressed and uncompressed packets based on the client threshold + /// preference. + pub fn decode(client: &Client, mut buf: &[u8]) -> Result { + // Read length + let (read, len) = types::read_var_int(buf)?; + buf = &buf[read..][..len as usize]; + + // If no compression is used, read remaining packet ID and data + if !client.is_compressed() { + // Read packet ID and data + return Self::read_packet_id_data(buf); + } + + // Read data length + let (read, data_len) = types::read_var_int(buf)?; + buf = &buf[read..]; + + // If data length is zero, the rest is not compressed + if data_len == 0 { + return Self::read_packet_id_data(buf); + } + + // Decompress packet ID and data section + let mut decompressed = Vec::with_capacity(data_len as usize); + ZlibDecoder::new(buf) + .read_to_end(&mut decompressed) + .map_err(|err| { + error!(target: "lazymc", "Packet decompression error: {}", err); + })?; + + // Decompressed data must match length + if decompressed.len() != data_len as usize { + error!(target: "lazymc", "Decompressed packet has different length than expected ({}b != {}b)", decompressed.len(), data_len); + return Err(()); + } + + // Read decompressed packet ID + return Self::read_packet_id_data(&decompressed); + } + /// Encode packet to raw buffer. - pub fn encode(&self) -> Result, ()> { + /// + /// This compresses packets based on the client threshold preference. + pub fn encode(&self, client: &Client) -> Result, ()> { + let threshold = client.compressed(); + if threshold >= 0 { + self.encode_compressed(threshold) + } else { + self.encode_uncompressed() + } + } + + /// Encode compressed packet to raw buffer. + fn encode_compressed(&self, threshold: i32) -> Result, ()> { + // Packet payload: packet ID and data buffer + let mut payload = types::encode_var_int(self.id)?; + payload.extend_from_slice(&self.data); + + // Determine whether to compress, encode data length bytes + let data_len = payload.len() as i32; + let compress = data_len > threshold; + let mut data_len_bytes = + types::encode_var_int(if compress { data_len } else { 0 }).unwrap(); + + // Compress payload + if compress { + let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(&payload).map_err(|err| { + error!(target: "lazymc", "Failed to compress packet: {}", err); + })?; + payload = encoder.finish().map_err(|err| { + error!(target: "lazymc", "Failed to compress packet: {}", err); + })?; + } + + // Encapsulate payload with packet and data length + let len = data_len_bytes.len() as i32 + payload.len() as i32; + let mut packet = types::encode_var_int(len)?; + packet.append(&mut data_len_bytes); + packet.append(&mut payload); + + Ok(packet) + } + + /// Encode uncompressed packet to raw buffer. + fn encode_uncompressed(&self) -> Result, ()> { let mut data = types::encode_var_int(self.id)?; data.extend_from_slice(&self.data); @@ -193,11 +315,8 @@ impl RawPacket { } /// Read raw packet from stream. -/// -/// Note: this does not support reading compressed/encrypted packets. -/// We should never need this though, as we're done reading user packets before any of this is -/// enabled. See: https://wiki.vg/Protocol#Packet_format pub async fn read_packet( + client: &Client, buf: &mut BytesMut, stream: &mut ReadHalf<'_>, ) -> Result)>, ()> { @@ -249,9 +368,9 @@ pub async fn read_packet( buf.extend(tmp); } - // Parse packet + // Parse packet, use full buffer since we'll read the packet length again let raw = buf.split_to(consumed + len as usize); - let packet = RawPacket::decode(&raw)?; + let packet = RawPacket::decode(client, &raw)?; Ok(Some((packet, raw.to_vec()))) } diff --git a/src/status.rs b/src/status.rs index fde52d0..a9d4e11 100644 --- a/src/status.rs +++ b/src/status.rs @@ -43,7 +43,7 @@ pub async fn serve( loop { // Read packet from stream - let (packet, raw) = match proto::read_packet(&mut buf, &mut reader).await { + let (packet, raw) = match proto::read_packet(&client, &mut buf, &mut reader).await { Ok(Some(packet)) => packet, Ok(None) => break, Err(_) => { @@ -100,7 +100,7 @@ pub async fn serve( let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(0, data).encode()?; + let response = RawPacket::new(0, data).encode(&client)?; writer.write_all(&response).await.map_err(|_| ())?; continue; @@ -131,7 +131,7 @@ pub async fn serve( } None => info!(target: "lazymc", "Kicked player because lockout is enabled"), } - kick(&config.lockout.message, &mut writer).await?; + kick(&client, &config.lockout.message, &mut writer).await?; break; } @@ -154,7 +154,7 @@ pub async fn serve( | server::State::Started => &config.join.kick.starting, server::State::Stopping => &config.join.kick.stopping, }; - kick(msg, &mut writer).await?; + kick(&client, msg, &mut writer).await?; break; } @@ -308,7 +308,7 @@ pub async fn hold<'a>(config: &Config, server: &Server) -> Result { /// Kick client with a message. /// /// Should close connection afterwards. -async fn kick(msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> { +async fn kick(client: &Client, msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> { let packet = LoginDisconnect { reason: Message::new(Payload::text(msg)), }; @@ -316,7 +316,7 @@ async fn kick(msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> { let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(proto::packets::login::CLIENT_DISCONNECT, data).encode()?; + let response = RawPacket::new(proto::packets::login::CLIENT_DISCONNECT, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ()) }