From 9b1f2a70110d5e4e0f1a4f8e1de82f8e07177be6 Mon Sep 17 00:00:00 2001 From: timvisee Date: Mon, 22 Nov 2021 22:40:37 +0100 Subject: [PATCH] Probe server on lazymc start to get up-to-date server details for Forge --- res/lazymc.toml | 3 + src/config.rs | 4 + src/forge.rs | 239 ++++++++++++++++++++++++++++ src/lobby.rs | 83 +++++++++- src/main.rs | 2 + src/mc/uuid.rs | 2 +- src/probe.rs | 353 ++++++++++++++++++++++++++++++++++++++++++ src/proto/packet.rs | 53 +++++-- src/proto/packets.rs | 15 +- src/server.rs | 8 + src/service/mod.rs | 1 + src/service/probe.rs | 23 +++ src/service/server.rs | 3 +- src/status.rs | 2 +- 14 files changed, 762 insertions(+), 29 deletions(-) create mode 100644 src/forge.rs create mode 100644 src/probe.rs create mode 100644 src/service/probe.rs diff --git a/res/lazymc.toml b/res/lazymc.toml index dac75ef..2a00a1c 100644 --- a/res/lazymc.toml +++ b/res/lazymc.toml @@ -39,6 +39,9 @@ command = "java -Xmx1G -Xms1G -jar server.jar --nogui" # Immediately wake server after crash. #wake_on_crash = false +# Set to true if this server runs Forge. +#forge = false + # Server start/stop timeout in seconds. Force kill server process if it takes too long. #start_timeout = 300 #stop_timeout = 150 diff --git a/src/config.rs b/src/config.rs index f508889..de7f445 100644 --- a/src/config.rs +++ b/src/config.rs @@ -182,6 +182,10 @@ pub struct Server { #[serde(default)] pub wake_on_crash: bool, + /// Whether this server runs forge. + #[serde(default)] + pub forge: bool, + /// Server starting timeout. Force kill server process if it takes longer. #[serde(default = "u32_300")] pub start_timeout: u32, diff --git a/src/forge.rs b/src/forge.rs new file mode 100644 index 0000000..0a9a2b0 --- /dev/null +++ b/src/forge.rs @@ -0,0 +1,239 @@ +use std::sync::Arc; +use std::time::Duration; + +use bytes::BytesMut; +use minecraft_protocol::decoder::Decoder; +use minecraft_protocol::encoder::Encoder; +use minecraft_protocol::version::forge_v1_13::login::{Acknowledgement, LoginWrapper, ModList}; +use minecraft_protocol::version::v1_14_4::login::{LoginPluginRequest, LoginPluginResponse}; +use minecraft_protocol::version::PacketId; +use tokio::io::AsyncWriteExt; +use tokio::net::tcp::WriteHalf; +use tokio::net::TcpStream; +use tokio::time; + +use crate::forge; +use crate::proto::client::{Client, ClientState}; +use crate::proto::packet::RawPacket; +use crate::proto::{packet, packets}; +use crate::server::Server; + +/// Forge status magic. +pub const STATUS_MAGIC: &str = "\0FML2\0"; + +/// Forge plugin wrapper login plugin request channel. +pub const CHANNEL_LOGIN_WRAPPER: &str = "fml:loginwrapper"; + +/// Forge handshake channel. +pub const CHANNEL_HANDSHAKE: &str = "fml:handshake"; + +/// Respond with Forge login wrapper packet. +pub async fn respond_forge_login_packet( + client: &Client, + writer: &mut WriteHalf<'_>, + message_id: i32, + forge_channel: String, + forge_packet: impl PacketId + Encoder, +) -> Result<(), ()> { + // Encode Forge packet to data + let mut forge_data = Vec::new(); + forge_packet.encode(&mut forge_data).map_err(|_| ())?; + + // Encode Forge payload + let forge_payload = + RawPacket::new(forge_packet.packet_id(), forge_data).encode_without_len(client)?; + + // Wrap Forge payload in login wrapper + let mut payload = Vec::new(); + let packet = LoginWrapper { + channel: forge_channel, + packet: forge_payload, + }; + packet.encode(&mut payload).map_err(|_| ())?; + + // Write login plugin request with forge payload + packet::write_packet( + LoginPluginResponse { + message_id, + successful: true, + data: payload, + }, + client, + writer, + ) + .await +} + +/// Respond to a Forge login plugin request. +pub async fn respond_login_plugin_request( + client: &Client, + packet: LoginPluginRequest, + writer: &mut WriteHalf<'_>, +) -> Result<(), ()> { + // Decode Forge login wrapper packet + let (message_id, login_wrapper, packet) = + forge::decode_forge_login_packet(&client, packet).await?; + + // Determine whether we received the mod list + let is_unknown_header = login_wrapper.channel != forge::CHANNEL_HANDSHAKE; + let is_mod_list = !is_unknown_header && packet.id == packets::forge::login::CLIENT_MOD_LIST; + + // If not the mod list, just acknowledge + if !is_mod_list { + trace!(target: "lazymc::forge", "Acknowledging login plugin request"); + forge::respond_forge_login_packet( + client, + writer, + message_id, + login_wrapper.channel, + Acknowledgement {}, + ) + .await + .map_err(|_| { + error!(target: "lazymc::forge", "Failed to send Forge login plugin request acknowledgement"); + })?; + return Ok(()); + } + + trace!(target: "lazymc::forge", "Sending mod list reply to server with same contents"); + + // Parse mod list, transform into reply + let mod_list = ModList::decode(&mut packet.data.as_slice()).map_err(|err| { + error!(target: "lazymc::forge", "Failed to decode Forge mod list: {:?}", err); + })?; + let mod_list_reply = mod_list.into_reply(); + + // We got mod list, respond with reply + forge::respond_forge_login_packet( + client, + writer, + message_id, + login_wrapper.channel, + mod_list_reply, + ) + .await + .map_err(|_| { + error!(target: "lazymc::forge", "Failed to send Forge login plugin mod list reply"); + })?; + + Ok(()) +} + +/// Decode a Forge login wrapper packet from login plugin request. +/// +/// Returns (`message_id`, `login_wrapper`, `packet`). +pub async fn decode_forge_login_packet( + client: &Client, + plugin_request: LoginPluginRequest, +) -> Result<(i32, LoginWrapper, RawPacket), ()> { + // Validate channel + assert_eq!(plugin_request.channel, CHANNEL_LOGIN_WRAPPER); + + // Decode login wrapped packet + let login_wrapper = + LoginWrapper::decode(&mut plugin_request.data.as_slice()).map_err(|err| { + error!(target: "lazymc::forge", "Failed to decode Forge LoginWrapper packet: {:?}", err); + })?; + + // Parse packet + let packet = RawPacket::decode_without_len(client, &login_wrapper.packet).map_err(|err| { + error!(target: "lazymc::forge", "Failed to decode Forge LoginWrapper packet contents: {:?}", err); + })?; + + Ok((plugin_request.message_id, login_wrapper, packet)) +} + +/// Replay the Forge login payload for a client. +pub async fn replay_login_payload( + client: &Client, + inbound: &mut TcpStream, + server: Arc, + inbound_buf: &mut BytesMut, +) -> Result<(), ()> { + debug!(target: "lazymc::lobby", "Replaying Forge login procedure for lobby client..."); + + // Replay each Forge packet + for packet in server.forge_payload.lock().await.as_slice() { + inbound.write_all(&packet).await.map_err(|err| { + error!(target: "lazymc::lobby", "Failed to send Forge join payload to lobby client, will likely cause issues: {}", err); + })?; + } + + // Drain all responses + let count = server.forge_payload.lock().await.len(); + drain_forge_responses(client, inbound, inbound_buf, count).await?; + + trace!(target: "lazymc::lobby", "Forge join payload replayed"); + + Ok(()) +} + +/// Drain Forge login plugin response packets from stream. +async fn drain_forge_responses( + client: &Client, + inbound: &mut TcpStream, + buf: &mut BytesMut, + mut count: usize, +) -> Result<(), ()> { + let (mut reader, mut _writer) = inbound.split(); + + loop { + // We're done if count is zero + if count == 0 { + trace!(target: "lazymc::forge", "Drained all plugin responses from client"); + return Ok(()); + } + + // TODO: move timeout into constant + let read_packet_task = packet::read_packet(&client, buf, &mut reader); + let timeout = time::timeout(Duration::from_secs(5), read_packet_task).await; + + let read_packet_task = match timeout { + Ok(result) => result, + Err(_) => { + error!(target: "lazymc::forge", "Expected more plugin responses from client, but didn't receive anything in a while, may be problematic"); + return Ok(()); + } + }; + + // Read packet from stream + let (packet, _raw) = match read_packet_task { + Ok(Some(packet)) => packet, + Ok(None) => break, + Err(_) => { + error!(target: "lazymc::forge", "Closing connection, error occurred"); + break; + } + }; + + // Grab client state + let client_state = client.state(); + + // Catch login plugin resposne + if client_state == ClientState::Login + && packet.id == packets::login::SERVER_LOGIN_PLUGIN_RESPONSE + { + trace!(target: "lazymc::forge", "Voiding plugin response from client"); + count -= 1; + continue; + } + + // TODO: instantly return on this packet? + // // Hijack login success + // if client_state == ClientState::Login && packet.id == packets::login::CLIENT_LOGIN_SUCCESS { + // trace!(target: "lazymc::forge", "Received login success from server connection, change to play mode"); + + // // Switch to play state + // tmp_client.set_state(ClientState::Play); + + // return Ok(forge_payload); + // } + + // Show unhandled packet warning + debug!(target: "lazymc::forge", "Received unhandled packet from server in record_forge_response:"); + debug!(target: "lazymc::forge", "- State: {:?}", client_state); + debug!(target: "lazymc::forge", "- Packet ID: 0x{:02X} ({})", packet.id, packet.id); + } + + Err(()) +} diff --git a/src/lobby.rs b/src/lobby.rs index 89499f3..983fdc0 100644 --- a/src/lobby.rs +++ b/src/lobby.rs @@ -8,12 +8,20 @@ use bytes::BytesMut; use futures::FutureExt; use minecraft_protocol::data::chat::{Message, Payload}; use minecraft_protocol::decoder::Decoder; -use minecraft_protocol::version::v1_14_4::login::{LoginStart, LoginSuccess, SetCompression}; +use minecraft_protocol::version::forge_v1_13::login::LoginWrapper; +use minecraft_protocol::version::v1_14_4::login::{ + LoginDisconnect, LoginPluginRequest, LoginPluginResponse, LoginStart, LoginSuccess, + SetCompression, +}; +use minecraft_protocol::version::v1_16_5::game::{Title, TitleAction}; use minecraft_protocol::version::v1_17_1::game::{ ClientBoundKeepAlive, ClientBoundPluginMessage, JoinGame, NamedSoundEffect, PlayerPositionAndLook, Respawn, SetTitleSubtitle, SetTitleText, SetTitleTimes, TimeUpdate, }; +use minecraft_protocol::version::PacketId; use nbt::CompoundTag; +use proto::packet::RawPacket; +use rand::Rng; use tokio::io::AsyncWriteExt; use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::TcpStream; @@ -21,6 +29,7 @@ use tokio::select; use tokio::time; use crate::config::*; +use crate::forge; use crate::mc::{self, uuid}; use crate::net; use crate::proto; @@ -108,6 +117,15 @@ pub async fn serve( debug!(target: "lazymc::lobby", "Login on lobby server (user: {})", login_start.name); + // Replay Forge payload + if config.server.forge { + forge::replay_login_payload(client, &mut inbound, server.clone(), &mut inbound_buf) + .await?; + let (returned_reader, returned_writer) = inbound.split(); + reader = returned_reader; + writer = returned_writer; + } + // Respond with set compression if compression is enabled based on threshold if proto::COMPRESSION_THRESHOLD >= 0 { trace!(target: "lazymc::lobby", "Enabling compression for lobby client because server has it enabled (threshold: {})", proto::COMPRESSION_THRESHOLD); @@ -255,11 +273,18 @@ async fn send_lobby_join_game( writer: &mut WriteHalf<'_>, server: &Server, ) -> Result<(), ()> { + // Grab probed dimension codec + let dimension_codec: CompoundTag = + if let Some(ref join_game) = server.probed_join_game.lock().await.as_ref() { + join_game.dimension_codec.clone() + } else { + snbt_to_compound_tag(include_str!("../res/dimension_codec.snbt")) + }; + // Send Minecrafts default states, slightly customised for lobby world packet::write_packet( { let status = server.status().await; - JoinGame { // Player ID must be unique, if it collides with another server entity ID the player gets // in a weird state and cannot move @@ -273,7 +298,7 @@ async fn send_lobby_join_game( "minecraft:the_nether".into(), "minecraft:the_end".into(), ], - dimension_codec: snbt_to_compound_tag(include_str!("../res/dimension_codec.snbt")), + dimension_codec, dimension: snbt_to_compound_tag(include_str!("../res/dimension.snbt")), world_name: "lazymc:lobby".into(), hashed_seed: 0, @@ -647,9 +672,7 @@ async fn connect_to_server_no_timeout( { // Decode compression packet let set_compression = - SetCompression::decode(&mut packet.data.as_slice()).map_err(|err| { - dbg!(err); - })?; + SetCompression::decode(&mut packet.data.as_slice()).map_err(|_| ())?; // Client and server compression threshold should match, show warning if not if set_compression.threshold != proto::COMPRESSION_THRESHOLD { @@ -666,6 +689,27 @@ async fn connect_to_server_no_timeout( continue; } + // Hijack login plugin request + if client_state == ClientState::Login + && packet.id == packets::login::CLIENT_LOGIN_PLUGIN_REQUEST + { + // TODO: update message if not on forge + trace!(target: "lazymc::lobby", "Received login plugin request from server, assuming we should send Forge payload now"); + + // Decode login plugin request + let plugin_request = + LoginPluginRequest::decode(&mut packet.data.as_slice()).map_err(|err| { + dbg!(err); + })?; + + // Respond to Forge login plugin request + forge::respond_login_plugin_request(&tmp_client, plugin_request, &mut writer).await?; + + // TODO: if not on forge, respond with empty payload because we don't understand + + continue; + } + // Hijack login success if client_state == ClientState::Login && packet.id == packets::login::CLIENT_LOGIN_SUCCESS { trace!(target: "lazymc::lobby", "Received login success from server connection, change to play mode"); @@ -688,6 +732,24 @@ async fn connect_to_server_no_timeout( return Ok((tmp_client, outbound, buf)); } + // Hijack disconnect + if client_state == ClientState::Login && packet.id == packets::login::CLIENT_DISCONNECT { + error!(target: "lazymc::lobby", "Received disconnect from server connection"); + + // // Decode disconnect packet + // let login_disconnect = + // LoginDisconnect::decode(&mut packet.data.as_slice()).map_err(|err| { + // dbg!(err); + // })?; + + // TODO: report/forward error to client + + break; + } + + // TODO: if receiving encryption request, disconnect with error because we don't support + // online mode! + // Show unhandled packet warning debug!(target: "lazymc::lobby", "Received unhandled packet from server in connect_to_server:"); debug!(target: "lazymc::lobby", "- State: {:?}", client_state); @@ -799,7 +861,6 @@ async fn drain_stream(reader: &mut ReadHalf<'_>) -> Result<(), ()> { /// Read NBT CompoundTag from SNBT. fn snbt_to_compound_tag(data: &str) -> CompoundTag { - use nbt::decode::read_compound_tag; use quartz_nbt::io::{write_nbt, Flavor}; use quartz_nbt::snbt; @@ -812,5 +873,11 @@ fn snbt_to_compound_tag(data: &str) -> CompoundTag { .expect("failed to encode NBT CompoundTag as binary"); // Parse binary with usable NBT create - read_compound_tag(&mut &*binary).unwrap() + bin_to_compound_tag(&mut &*binary) +} + +/// Read NBT CompoundTag from SNBT. +fn bin_to_compound_tag(data: &[u8]) -> CompoundTag { + use nbt::decode::read_compound_tag; + read_compound_tag(&mut &*data).unwrap() } diff --git a/src/main.rs b/src/main.rs index 678967e..fb93f8a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ extern crate log; pub(crate) mod action; pub(crate) mod cli; pub(crate) mod config; +pub(crate) mod forge; pub(crate) mod join; #[cfg(feature = "lobby")] pub(crate) mod lobby; @@ -17,6 +18,7 @@ pub(crate) mod mc; pub(crate) mod monitor; pub(crate) mod net; pub(crate) mod os; +pub(crate) mod probe; pub(crate) mod proto; pub(crate) mod proxy; pub(crate) mod server; diff --git a/src/mc/uuid.rs b/src/mc/uuid.rs index fabc122..0aee292 100644 --- a/src/mc/uuid.rs +++ b/src/mc/uuid.rs @@ -5,7 +5,7 @@ use uuid::Uuid; const OFFLINE_PLAYER_NAMESPACE: &str = "OfflinePlayer:"; /// Get UUID for given player username. -pub fn player_uuid(username: &str) -> Uuid { +fn player_uuid(username: &str) -> Uuid { java_name_uuid_from_bytes(username.as_bytes()) } diff --git a/src/probe.rs b/src/probe.rs new file mode 100644 index 0000000..30f4ce1 --- /dev/null +++ b/src/probe.rs @@ -0,0 +1,353 @@ +use std::ops::Deref; +use std::sync::Arc; +use std::time::Duration; + +use bytes::BytesMut; +use minecraft_protocol::decoder::Decoder; +use minecraft_protocol::version::v1_14_4::handshake::Handshake; +use minecraft_protocol::version::v1_14_4::login::{ + LoginPluginRequest, LoginPluginResponse, LoginStart, SetCompression, +}; +use minecraft_protocol::version::v1_17_1::game::JoinGame; +use tokio::net::TcpStream; +use tokio::time; + +use crate::config::Config; +use crate::forge; +use crate::net; +use crate::proto::client::{Client, ClientState}; +use crate::proto::{self, packet, packets}; +use crate::server::{Server, State}; + +/// Minecraft username to use for probing the server. +const PROBE_USER: &str = "_lazymc_probe"; + +/// Timeout for probe user connecting to the server. +const PROBE_CONNECT_TIMEOUT: Duration = Duration::from_secs(30); + +/// Maximum time the probe may wait for the server to come online. +const PROBE_ONLINE_TIMEOUT: Duration = Duration::from_secs(10 * 60); + +/// Timeout for receiving join game packet. +/// +/// When the play state is reached, the server should immeditely respond with a join game packet. +/// This defines the maximum timeout for waiting on it. +const PROBE_JOIN_GAME_TIMEOUT: Duration = Duration::from_secs(20); + +/// Connect to the Minecraft server and probe useful details from it. +pub async fn probe(config: Arc, server: Arc) -> Result<(), ()> { + debug!(target: "lazymc::probe", "Starting server probe..."); + + // Start server if not starting already + if Server::start(config.clone(), server.clone(), None).await { + warn!(target: "lazymc::probe", "Starting server to probe required details..."); + } + + // Wait for server to come online + if !wait_until_online(&server).await? { + warn!(target: "lazymc::probe", "Couldn't probe server, failed to wait for server to come online"); + return Err(()); + } + + debug!(target: "lazymc::probe", "Connecting to server to probe details..."); + + // Connect to server, record Forge payload + let forge_payload = connect_to_server(&config, &server).await?; + *server.forge_payload.lock().await = forge_payload.into(); + + Ok(()) +} + +/// Wait for the server to come online. +/// +/// Returns `true` when it is online. +async fn wait_until_online<'a>(server: &Server) -> Result { + trace!(target: "lazymc::probe", "Waiting for server to come online..."); + + // A task to wait for suitable server state + // Waits for started state, errors if stopping/stopped state is reached + let task_wait = async { + let mut state = server.state_receiver(); + loop { + // Wait for state change + state.changed().await.unwrap(); + + match state.borrow().deref() { + // Still waiting on server start + State::Starting => { + continue; + } + + // Server started, start relaying and proxy + State::Started => { + break true; + } + + // Server stopping, this shouldn't happen, skip + State::Stopping => { + warn!(target: "lazymc::probe", "Server stopping while trying to probe, skipping"); + break false; + } + + // Server stopped, this shouldn't happen, skip + State::Stopped => { + error!(target: "lazymc::probe", "Server stopped while trying to probe, skipping"); + break false; + } + } + } + }; + + // Wait for server state with timeout + match time::timeout(PROBE_ONLINE_TIMEOUT, task_wait).await { + Ok(online) => Ok(online), + + // Timeout reached, kick with starting message + Err(_) => { + warn!(target: "lazymc::probe", "Probe waited for server to come online but timed out after {}s", PROBE_ONLINE_TIMEOUT.as_secs()); + Ok(false) + } + } +} + +/// Create connection to the server, with timeout. +/// +/// This will initialize the connection to the play state. Client details are used. +/// +/// Returns recorded Forge login payload if any. +async fn connect_to_server(config: &Config, server: &Server) -> Result>, ()> { + time::timeout( + PROBE_CONNECT_TIMEOUT, + connect_to_server_no_timeout(config, server), + ) + .await + .map_err(|_| { + error!(target: "lazymc::probe", "Probe tried to connect to server but timed out after {}s", PROBE_CONNECT_TIMEOUT.as_secs()); + })? +} + +/// Create connection to the server, with no timeout. +/// +/// This will initialize the connection to the play state. Client details are used. +/// +/// Returns recorded Forge login payload if any. +// TODO: clean this up +async fn connect_to_server_no_timeout( + config: &Config, + server: &Server, +) -> Result>, ()> { + // Open connection + // TODO: on connect fail, ping server and redirect to serve_status if offline + let mut outbound = TcpStream::connect(config.server.address) + .await + .map_err(|_| ())?; + + // Construct temporary server client + let tmp_client = match outbound.local_addr() { + Ok(addr) => Client::new(addr), + Err(_) => Client::dummy(), + }; + tmp_client.set_state(ClientState::Login); + + let (mut reader, mut writer) = outbound.split(); + + // Select server address to use, add magic if Forge + let server_addr = if config.server.forge { + format!( + "{}{}", + config.server.address.ip().to_string(), + forge::STATUS_MAGIC, + ) + } else { + config.server.address.ip().to_string() + }; + + // Send handshake packet + packet::write_packet( + Handshake { + protocol_version: config.public.protocol as i32, + server_addr, + server_port: config.server.address.port(), + next_state: ClientState::Login.to_id(), + }, + &tmp_client, + &mut writer, + ) + .await?; + + // Request login start + packet::write_packet( + LoginStart { + name: PROBE_USER.into(), + }, + &tmp_client, + &mut writer, + ) + .await?; + + // Incoming buffer, record Forge plugin request payload + let mut buf = BytesMut::new(); + let mut forge_payload = Vec::new(); + + loop { + // Read packet from stream + let (packet, raw) = match packet::read_packet(&tmp_client, &mut buf, &mut reader).await { + Ok(Some(packet)) => packet, + Ok(None) => break, + Err(_) => { + error!(target: "lazymc::forge", "Closing connection, error occurred"); + break; + } + }; + + // Grab client state + let client_state = tmp_client.state(); + + // Catch set compression + if client_state == ClientState::Login && packet.id == packets::login::CLIENT_SET_COMPRESSION + { + // Decode compression packet + let set_compression = + SetCompression::decode(&mut packet.data.as_slice()).map_err(|_| ())?; + + // Client and server compression threshold should match, show warning if not + if set_compression.threshold != proto::COMPRESSION_THRESHOLD { + error!( + target: "lazymc::forge", + "Compression threshold sent to lobby client does not match threshold from server, this may cause errors (client: {}, server: {})", + proto::COMPRESSION_THRESHOLD, + set_compression.threshold + ); + } + + // Set client compression + tmp_client.set_compression(set_compression.threshold); + continue; + } + + // Catch login plugin request + if client_state == ClientState::Login + && packet.id == packets::login::CLIENT_LOGIN_PLUGIN_REQUEST + { + // Decode login plugin request packet + let plugin_request = LoginPluginRequest::decode(&mut packet.data.as_slice()).map_err(|err| { + error!(target: "lazymc::probe", "Failed to decode login plugin request from server, cannot respond properly: {:?}", err); + })?; + + // Handle plugin requests for Forge + if config.server.forge { + // Record Forge login payload + forge_payload.push(raw); + + // Respond to Forge login plugin request + forge::respond_login_plugin_request(&tmp_client, plugin_request, &mut writer) + .await?; + continue; + } + + warn!(target: "lazymc::probe", "Received unexpected login plugin request, responding with error"); + + // Respond with plugin response failure + packet::write_packet( + LoginPluginResponse { + message_id: plugin_request.message_id, + successful: false, + data: vec![], + }, + &tmp_client, + &mut writer, + ) + .await?; + + continue; + } + + // Hijack login success + if client_state == ClientState::Login && packet.id == packets::login::CLIENT_LOGIN_SUCCESS { + trace!(target: "lazymc::probe", "Received login success from server connection, change to play mode"); + + // Switch to play state + tmp_client.set_state(ClientState::Play); + + // Wait to catch join game packet + let join_game = wait_for_server_join_game(&tmp_client, &mut outbound, &mut buf).await?; + server.probed_join_game.lock().await.replace(join_game); + + // Gracefully close connection + let _ = net::close_tcp_stream(outbound).await; + + return Ok(forge_payload); + } + + // Show unhandled packet warning + debug!(target: "lazymc::forge", "Received unhandled packet from server in connect_to_server:"); + debug!(target: "lazymc::forge", "- State: {:?}", client_state); + debug!(target: "lazymc::forge", "- Packet ID: 0x{:02X} ({})", packet.id, packet.id); + } + + // Gracefully close connection + net::close_tcp_stream(outbound).await.map_err(|_| ())?; + + Err(()) +} + +/// Wait for join game packet on server connection, with timeout. +/// +/// This parses, consumes and returns the packet. +async fn wait_for_server_join_game( + client: &Client, + outbound: &mut TcpStream, + buf: &mut BytesMut, +) -> Result { + time::timeout( + PROBE_JOIN_GAME_TIMEOUT, + wait_for_server_join_game_no_timeout(client, outbound, buf), + ) + .await + .map_err(|_| { + error!(target: "lazymc::probe", "Waiting for for game data from server for probe client timed out after {}s", PROBE_JOIN_GAME_TIMEOUT.as_secs()); + })? +} + +/// Wait for join game packet on server connection, with no timeout. +/// +/// This parses, consumes and returns the packet. +// TODO: clean this up +// TODO: do not drop error here, return Box +async fn wait_for_server_join_game_no_timeout( + client: &Client, + outbound: &mut TcpStream, + buf: &mut BytesMut, +) -> Result { + let (mut reader, mut _writer) = outbound.split(); + + loop { + // Read packet from stream + let (packet, _raw) = match packet::read_packet(client, buf, &mut reader).await { + Ok(Some(packet)) => packet, + Ok(None) => break, + Err(_) => { + error!(target: "lazymc::lobby", "Closing connection, error occurred"); + break; + } + }; + + // Catch join game + if packet.id == packets::play::CLIENT_JOIN_GAME { + let join_game = JoinGame::decode(&mut packet.data.as_slice()).map_err(|err| { + error!(target: "lazymc::probe", "Failed to decode join game packet, ignoring: {:?}", err); + })?; + + return Ok(join_game); + } + + // Show unhandled packet warning + debug!(target: "lazymc::lobby", "Received unhandled packet from server in wait_for_server_join_game:"); + debug!(target: "lazymc::lobby", "- Packet ID: 0x{:02X} ({})", packet.id, packet.id); + } + + // Gracefully close connection + net::close_tcp_stream_ref(outbound).await.map_err(|_| ())?; + + Err(()) +} diff --git a/src/proto/packet.rs b/src/proto/packet.rs index ba5b1ad..ef8b0e1 100644 --- a/src/proto/packet.rs +++ b/src/proto/packet.rs @@ -1,3 +1,4 @@ +use std::fmt::Debug; use std::io::prelude::*; use bytes::BytesMut; @@ -44,11 +45,22 @@ impl RawPacket { /// /// This decodes both compressed and uncompressed packets based on the client threshold /// preference. - pub fn decode(client: &Client, mut buf: &[u8]) -> Result { + pub fn decode_with_len(client: &Client, mut buf: &[u8]) -> Result { // Read length let (read, len) = types::read_var_int(buf)?; buf = &buf[read..][..len as usize]; + // TODO: assert buffer length! + + Self::decode_without_len(client, buf) + } + + /// Decode packet from raw buffer without packet length. + /// + /// This decodes both compressed and uncompressed packets based on the client threshold + /// preference. + /// The length is given, and not included in the buffer itself. + pub fn decode_without_len(client: &Client, mut buf: &[u8]) -> Result { // If no compression is used, read remaining packet ID and data if !client.is_compressed() { // Read packet ID and data @@ -85,7 +97,20 @@ impl RawPacket { /// Encode packet to raw buffer. /// /// This compresses packets based on the client threshold preference. - pub fn encode(&self, client: &Client) -> Result, ()> { + pub fn encode_with_len(&self, client: &Client) -> Result, ()> { + // Encode packet without length + let mut payload = self.encode_without_len(client)?; + + // Add length header + let mut packet = types::encode_var_int(payload.len() as i32)?; + packet.append(&mut payload); + Ok(packet) + } + + /// Encode packet to raw buffer without length header. + /// + /// This compresses packets based on the client threshold preference. + pub fn encode_without_len(&self, client: &Client) -> Result, ()> { let threshold = client.compressed(); if threshold >= 0 { self.encode_compressed(threshold) @@ -103,8 +128,7 @@ impl RawPacket { // Determine whether to compress, encode data length bytes let data_len = payload.len() as i32; let compress = data_len > threshold; - let mut data_len_bytes = - types::encode_var_int(if compress { data_len } else { 0 }).unwrap(); + let data_len_header = if compress { data_len } else { 0 }; // Compress payload if compress { @@ -117,10 +141,8 @@ impl RawPacket { })?; } - // Encapsulate payload with packet and data length - let len = data_len_bytes.len() as i32 + payload.len() as i32; - let mut packet = types::encode_var_int(len)?; - packet.append(&mut data_len_bytes); + // Add data length header + let mut packet = types::encode_var_int(data_len_header).unwrap(); packet.append(&mut payload); Ok(packet) @@ -128,12 +150,8 @@ impl RawPacket { /// Encode uncompressed packet to raw buffer. fn encode_uncompressed(&self) -> Result, ()> { - let mut data = types::encode_var_int(self.id as i32)?; - data.extend_from_slice(&self.data); - - let len = data.len() as i32; - let mut packet = types::encode_var_int(len)?; - packet.append(&mut data); + let mut packet = types::encode_var_int(self.id as i32)?; + packet.extend_from_slice(&self.data); Ok(packet) } @@ -194,22 +212,23 @@ pub async fn read_packet( } // Parse packet, use full buffer since we'll read the packet length again + // TODO: use decode_without_len, strip len from buffer let raw = buf.split_to(consumed + len as usize); - let packet = RawPacket::decode(client, &raw)?; + let packet = RawPacket::decode_with_len(client, &raw)?; Ok(Some((packet, raw.to_vec()))) } /// Write packet to stream writer. pub async fn write_packet( - packet: impl PacketId + Encoder, + packet: impl PacketId + Encoder + Debug, client: &Client, writer: &mut WriteHalf<'_>, ) -> Result<(), ()> { let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(packet.packet_id(), data).encode(client)?; + let response = RawPacket::new(packet.packet_id(), data).encode_with_len(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) diff --git a/src/proto/packets.rs b/src/proto/packets.rs index fa1aa0b..818a401 100644 --- a/src/proto/packets.rs +++ b/src/proto/packets.rs @@ -17,7 +17,9 @@ pub mod login { pub const CLIENT_DISCONNECT: u8 = 0x00; pub const CLIENT_LOGIN_SUCCESS: u8 = 0x02; pub const CLIENT_SET_COMPRESSION: u8 = 0x03; + pub const CLIENT_LOGIN_PLUGIN_REQUEST: u8 = 0x04; pub const SERVER_LOGIN_START: u8 = 0x00; + pub const SERVER_LOGIN_PLUGIN_RESPONSE: u8 = 0x02; } pub mod play { @@ -35,7 +37,18 @@ pub mod play { pub const CLIENT_SET_TITLE_TEXT: u8 = 0x59; pub const CLIENT_SET_TITLE_TIMES: u8 = 0x5A; pub const SERVER_CLIENT_SETTINGS: u8 = 0x05; - pub const SERVER_PLUGIN_MESSAGE: u8 = 0x0A; + // TODO: update + pub const SERVER_PLUGIN_MESSAGE: u8 = 0x0B; //0A pub const SERVER_PLAYER_POS: u8 = 0x11; pub const SERVER_PLAYER_POS_ROT: u8 = 0x12; } + +pub mod forge { + pub mod login { + pub const CLIENT_MOD_LIST: u8 = 1; + pub const CLIENT_SERVER_REGISTRY: u8 = 3; + pub const CLIENT_CONFIG_DATA: u8 = 4; + pub const SERVER_MOD_LIST_REPLY: u8 = 2; + pub const SERVER_ACKNOWLEDGEMENT: u8 = 99; + } +} diff --git a/src/server.rs b/src/server.rs index 9a5d643..2a16406 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,6 +5,7 @@ use std::time::{Duration, Instant}; use futures::FutureExt; use minecraft_protocol::data::server_status::ServerStatus; +use minecraft_protocol::version::v1_17_1::game::JoinGame; use tokio::process::Command; use tokio::sync::watch; #[cfg(feature = "rcon")] @@ -79,6 +80,11 @@ pub struct Server { /// Last time server was stopped over RCON. #[cfg(feature = "rcon")] rcon_last_stop: Mutex>, + + // TODO: dont use mutex, do not make public, dont use bytesmut + pub forge_payload: Mutex>>, + // TODO: dont use mutex, do not make public, dont use bytesmut + pub probed_join_game: Mutex>, } impl Server { @@ -366,6 +372,8 @@ impl Default for Server { rcon_lock: Semaphore::new(1), #[cfg(feature = "rcon")] rcon_last_stop: Default::default(), + forge_payload: Default::default(), + probed_join_game: Default::default(), } } } diff --git a/src/service/mod.rs b/src/service/mod.rs index 369b870..1b35833 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,4 +1,5 @@ pub mod ban_reload; pub mod monitor; +pub mod probe; pub mod server; pub mod signal; diff --git a/src/service/probe.rs b/src/service/probe.rs new file mode 100644 index 0000000..b0c3682 --- /dev/null +++ b/src/service/probe.rs @@ -0,0 +1,23 @@ +use std::sync::Arc; + +use crate::config::Config; +use crate::probe; +use crate::server::Server; + +/// Probe server. +pub async fn service(config: Arc, state: Arc) { + // Only probe if Forge is enabled + // TODO: do more comprehensive check for probe, only with forge and lobby? + // TODO: add config option to probe on start + if !config.server.forge { + return; + } + + // Probe + match probe::probe(config, state).await { + Ok(_) => info!(target: "lazymc::probe", "Succesfully probed server"), + Err(_) => { + error!(target: "lazymc::probe", "Failed to probe server, this may limit lazymc features") + } + } +} diff --git a/src/service/server.rs b/src/service/server.rs index 3c67144..ff293e5 100644 --- a/src/service/server.rs +++ b/src/service/server.rs @@ -46,9 +46,10 @@ pub async fn service(config: Arc) -> Result<(), ()> { ); } - // Spawn server monitor and signal handler services + // Spawn server monitor, signal handler, probe and ban services tokio::spawn(service::monitor::service(config.clone(), server.clone())); tokio::spawn(service::signal::service(config.clone(), server.clone())); + tokio::spawn(service::probe::service(config.clone(), server.clone())); tokio::task::spawn_blocking({ let (config, server) = (config.clone(), server.clone()); || service::ban_reload::service(config, server) diff --git a/src/status.rs b/src/status.rs index c6fbad9..8600355 100644 --- a/src/status.rs +++ b/src/status.rs @@ -103,7 +103,7 @@ pub async fn serve( let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(0, data).encode(&client)?; + let response = RawPacket::new(0, data).encode_with_len(&client)?; writer.write_all(&response).await.map_err(|_| ())?; continue;