diff --git a/src/join/forward.rs b/src/join/forward.rs new file mode 100644 index 0000000..4584859 --- /dev/null +++ b/src/join/forward.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; + +use bytes::BytesMut; +use tokio::net::TcpStream; + +use crate::config::*; +use crate::service; + +use super::MethodResult; + +/// Forward the client. +pub async fn occupy( + config: Arc, + inbound: TcpStream, + inbound_history: &mut BytesMut, +) -> Result { + trace!(target: "lazymc", "Using forward method to occupy joining client"); + + debug!(target: "lazymc", "Forwarding client to {:?}!", config.join.forward.address); + + service::server::route_proxy_address_queue( + inbound, + config.join.forward.address, + inbound_history.clone(), + ); + + // TODO: do not consume, continue on proxy connect failure + + Ok(MethodResult::Consumed) +} diff --git a/src/join/hold.rs b/src/join/hold.rs new file mode 100644 index 0000000..3a39270 --- /dev/null +++ b/src/join/hold.rs @@ -0,0 +1,101 @@ +use std::ops::Deref; +use std::sync::Arc; +use std::time::Duration; + +use bytes::BytesMut; +use tokio::net::TcpStream; +use tokio::time; + +use crate::config::*; +use crate::server::{Server, State}; +use crate::service; + +use super::MethodResult; + +/// Hold the client. +pub async fn occupy( + config: Arc, + server: Arc, + inbound: TcpStream, + inbound_history: &mut BytesMut, +) -> Result { + trace!(target: "lazymc", "Using hold method to occupy joining client"); + + // Server must be starting + if server.state() != State::Starting { + return Ok(MethodResult::Continue(inbound)); + } + + // Start holding, consume client + if hold(&config, &server).await? { + service::server::route_proxy_queue(inbound, config, inbound_history.clone()); + return Ok(MethodResult::Consumed); + } + + Ok(MethodResult::Continue(inbound)) +} + +/// Hold a client while server starts. +/// +/// Returns holding status. `true` if client is held and it should be proxied, `false` it was held +/// but it timed out. +async fn hold<'a>(config: &Config, server: &Server) -> Result { + trace!(target: "lazymc", "Started holding client"); + + // A task to wait for suitable server state + // Waits for started state, errors if stopping/stopped state is reached + let task_wait = async { + let mut state = server.state_receiver(); + loop { + // Wait for state change + state.changed().await.unwrap(); + + match state.borrow().deref() { + // Still waiting on server start + State::Starting => { + trace!(target: "lazymc", "Server not ready, holding client for longer"); + continue; + } + + // Server started, start relaying and proxy + State::Started => { + break true; + } + + // Server stopping, this shouldn't happen, kick + State::Stopping => { + warn!(target: "lazymc", "Server stopping for held client, disconnecting"); + break false; + } + + // Server stopped, this shouldn't happen, disconnect + State::Stopped => { + error!(target: "lazymc", "Server stopped for held client, disconnecting"); + break false; + } + } + } + }; + + // Wait for server state with timeout + let timeout = Duration::from_secs(config.join.hold.timeout as u64); + match time::timeout(timeout, task_wait).await { + // Relay client to proxy + Ok(true) => { + info!(target: "lazymc", "Server ready for held client, relaying to server"); + Ok(true) + } + + // Server stopping/stopped, this shouldn't happen, kick + Ok(false) => { + warn!(target: "lazymc", "Server stopping for held client"); + Ok(false) + } + + // Timeout reached, kick with starting message + Err(_) => { + warn!(target: "lazymc", "Held client reached timeout of {}s", config.join.hold.timeout); + Ok(false) + } + } +} diff --git a/src/join/kick.rs b/src/join/kick.rs new file mode 100644 index 0000000..42fe20f --- /dev/null +++ b/src/join/kick.rs @@ -0,0 +1,33 @@ +use tokio::net::TcpStream; + +use crate::config::*; +use crate::net; +use crate::proto::action; +use crate::proto::client::Client; +use crate::server::{self, Server}; + +use super::MethodResult; + +/// Kick the client. +pub async fn occupy( + client: &Client, + config: &Config, + server: &Server, + mut inbound: TcpStream, +) -> Result { + trace!(target: "lazymc", "Using kick method to occupy joining client"); + + // Select message and kick + let msg = match server.state() { + server::State::Starting | server::State::Stopped | server::State::Started => { + &config.join.kick.starting + } + server::State::Stopping => &config.join.kick.stopping, + }; + action::kick(client, msg, &mut inbound.split().1).await?; + + // Gracefully close connection + net::close_tcp_stream(inbound).await.map_err(|_| ())?; + + Ok(MethodResult::Consumed) +} diff --git a/src/join/lobby.rs b/src/join/lobby.rs new file mode 100644 index 0000000..fed9188 --- /dev/null +++ b/src/join/lobby.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; + +use bytes::BytesMut; +use tokio::net::TcpStream; + +use crate::config::*; +use crate::lobby; +use crate::proto::client::{Client, ClientInfo}; +use crate::server::Server; + +use super::MethodResult; + +/// Lobby the client. +pub async fn occupy( + client: &Client, + client_info: ClientInfo, + config: Arc, + server: Arc, + inbound: TcpStream, + inbound_queue: BytesMut, +) -> Result { + trace!(target: "lazymc", "Using lobby method to occupy joining client"); + + // Start lobby + lobby::serve(client, client_info, inbound, config, server, inbound_queue).await?; + + // TODO: do not consume client here, allow other join method on fail + + Ok(MethodResult::Consumed) +} diff --git a/src/join/mod.rs b/src/join/mod.rs new file mode 100644 index 0000000..959b183 --- /dev/null +++ b/src/join/mod.rs @@ -0,0 +1,106 @@ +use std::sync::Arc; + +use bytes::BytesMut; +use tokio::net::TcpStream; + +use crate::config::*; +use crate::net; +use crate::proto::client::{Client, ClientInfo, ClientState}; +use crate::server::Server; + +pub mod forward; +pub mod hold; +pub mod kick; +#[cfg(feature = "lobby")] +pub mod lobby; + +/// A result returned by a join occupy method. +pub enum MethodResult { + /// Client is consumed. + Consumed, + + /// Method is done, continue with the next. + Continue(TcpStream), +} + +/// Start occupying client. +/// +/// This assumes the login start packet has just been received. +pub async fn occupy( + client: Client, + #[allow(unused_variables)] client_info: ClientInfo, + config: Arc, + server: Arc, + mut inbound: TcpStream, + mut inbound_history: BytesMut, + #[allow(unused_variables)] login_queue: BytesMut, +) -> Result<(), ()> { + // Assert state is correct + assert_eq!( + client.state(), + ClientState::Login, + "when occupying client, it should be in login state" + ); + + // Go through all configured join methods + for method in &config.join.methods { + // Invoke method, take result + let result = match method { + // Kick method, immediately kick client + Method::Kick => kick::occupy(&client, &config, &server, inbound).await?, + + // Hold method, hold client connection while server starts + Method::Hold => { + hold::occupy( + config.clone(), + server.clone(), + inbound, + &mut inbound_history, + ) + .await? + } + + // Forward method, forward client connection while server starts + Method::Forward => { + forward::occupy(config.clone(), inbound, &mut inbound_history).await? + } + + // Lobby method, keep client in lobby while server starts + #[cfg(feature = "lobby")] + Method::Lobby => { + lobby::occupy( + &client, + client_info.clone(), + config.clone(), + server.clone(), + inbound, + login_queue.clone(), + ) + .await? + } + + // Lobby method, keep client in lobby while server starts + #[cfg(not(feature = "lobby"))] + Method::Lobby => { + error!(target: "lazymc", "Lobby join method not supported in this lazymc build"); + MethodResult::Continue(inbound) + } + }; + + // Handle method result + match result { + MethodResult::Consumed => return Ok(()), + MethodResult::Continue(stream) => { + inbound = stream; + continue; + } + } + } + + debug!(target: "lazymc", "No method left to occupy joining client, disconnecting"); + + // Gracefully close connection + net::close_tcp_stream(inbound).await.map_err(|_| ())?; + + Ok(()) +} diff --git a/src/lobby.rs b/src/lobby.rs index b44beba..c7a681f 100644 --- a/src/lobby.rs +++ b/src/lobby.rs @@ -16,7 +16,7 @@ use minecraft_protocol::version::v1_17_1::game::{ Respawn, SetTitleSubtitle, SetTitleText, SetTitleTimes, TimeUpdate, }; use nbt::CompoundTag; -use tokio::io::{self, AsyncWriteExt}; +use tokio::io::AsyncWriteExt; use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::net::TcpStream; use tokio::select; @@ -25,7 +25,11 @@ use uuid::Uuid; use crate::config::*; use crate::mc; -use crate::proto::{self, Client, ClientInfo, ClientState, RawPacket}; +use crate::net; +use crate::proto; +use crate::proto::client::{Client, ClientInfo, ClientState}; +use crate::proto::packet::{self, RawPacket}; +use crate::proto::packets; use crate::proxy; use crate::server::{Server, State}; @@ -62,7 +66,7 @@ const SERVER_BRAND: &[u8] = b"lazymc"; // TODO: do not drop error here, return Box // TODO: on error, nicely kick client with message pub async fn serve( - client: Client, + client: &Client, client_info: ClientInfo, mut inbound: TcpStream, config: Arc, @@ -88,7 +92,7 @@ pub async fn serve( loop { // Read packet from stream - let (packet, _raw) = match proto::read_packet(&client, &mut inbound_buf, &mut reader).await + let (packet, _raw) = match packet::read_packet(client, &mut inbound_buf, &mut reader).await { Ok(Some(packet)) => packet, Ok(None) => break, @@ -102,9 +106,7 @@ pub async fn serve( let client_state = client.state(); // Hijack login start - if client_state == ClientState::Login - && packet.id == proto::packets::login::SERVER_LOGIN_START - { + if client_state == ClientState::Login && packet.id == packets::login::SERVER_LOGIN_START { // Parse login start packet let login_start = LoginStart::decode(&mut packet.data.as_slice()).map_err(|_| ())?; @@ -113,21 +115,21 @@ pub async fn serve( // Respond with set compression if compression is enabled based on threshold if proto::COMPRESSION_THRESHOLD >= 0 { trace!(target: "lazymc::lobby", "Enabling compression for lobby client because server has it enabled (threshold: {})", proto::COMPRESSION_THRESHOLD); - respond_set_compression(&client, &mut writer, proto::COMPRESSION_THRESHOLD).await?; + respond_set_compression(client, &mut writer, proto::COMPRESSION_THRESHOLD).await?; client.set_compression(proto::COMPRESSION_THRESHOLD); } // Respond with login success, switch to play state - respond_login_success(&client, &mut writer, &login_start).await?; + respond_login_success(client, &mut writer, &login_start).await?; client.set_state(ClientState::Play); trace!(target: "lazymc::lobby", "Client login success, sending required play packets for lobby world"); // Send packets to client required to get into workable play state for lobby world - send_lobby_play_packets(&client, &mut writer, &server).await?; + send_lobby_play_packets(client, &mut writer, &server).await?; // Wait for server to come online, then set up new connection to it - stage_wait(&client, &server, &config, &mut writer).await?; + stage_wait(client, &server, &config, &mut writer).await?; let (server_client, mut outbound, mut server_buf) = connect_to_server(client_info, &config).await?; @@ -136,10 +138,10 @@ pub async fn serve( wait_for_server_join_game(&server_client, &mut outbound, &mut server_buf).await?; // Reset lobby title - send_lobby_title(&client, &mut writer, "").await?; + send_lobby_title(client, &mut writer, "").await?; // Play ready sound if configured - play_lobby_ready_sound(&client, &mut writer, &config).await?; + play_lobby_ready_sound(client, &mut writer, &config).await?; // Wait a second because Notchian servers are slow // See: https://wiki.vg/Protocol#Login_Success @@ -147,7 +149,7 @@ pub async fn serve( time::sleep(SERVER_WARMUP).await; // Send respawn packet, initiates teleport to real server world - send_respawn_from_join(&client, &mut writer, join_game).await?; + send_respawn_from_join(client, &mut writer, join_game).await?; // Drain inbound connection so we don't confuse the server // TODO: can we void everything? we might need to forward everything to server except @@ -172,11 +174,7 @@ pub async fn serve( } // Gracefully close connection - match writer.shutdown().await { - Ok(_) => {} - Err(err) if err.kind() == io::ErrorKind::NotConnected => {} - Err(_) => return Err(()), - } + net::close_tcp_stream(inbound).await.map_err(|_| ())?; Ok(()) } @@ -192,8 +190,7 @@ async fn respond_set_compression( let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = - RawPacket::new(proto::packets::login::CLIENT_SET_COMPRESSION, data).encode(client)?; + let response = RawPacket::new(packets::login::CLIENT_SET_COMPRESSION, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) @@ -217,8 +214,7 @@ async fn respond_login_success( let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = - RawPacket::new(proto::packets::login::CLIENT_LOGIN_SUCCESS, data).encode(client)?; + let response = RawPacket::new(packets::login::CLIENT_LOGIN_SUCCESS, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) @@ -310,7 +306,7 @@ async fn send_lobby_join_game( let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(proto::packets::play::CLIENT_JOIN_GAME, data).encode(client)?; + let response = RawPacket::new(packets::play::CLIENT_JOIN_GAME, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) @@ -326,8 +322,7 @@ async fn send_lobby_brand(client: &Client, writer: &mut WriteHalf<'_>) -> Result let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = - RawPacket::new(proto::packets::play::CLIENT_PLUGIN_MESSAGE, data).encode(client)?; + let response = RawPacket::new(packets::play::CLIENT_PLUGIN_MESSAGE, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) @@ -350,8 +345,7 @@ async fn send_lobby_player_pos(client: &Client, writer: &mut WriteHalf<'_>) -> R let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = - RawPacket::new(proto::packets::play::CLIENT_PLAYER_POS_LOOK, data).encode(client)?; + let response = RawPacket::new(packets::play::CLIENT_PLAYER_POS_LOOK, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) @@ -370,7 +364,7 @@ async fn send_lobby_time_update(client: &Client, writer: &mut WriteHalf<'_>) -> let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(proto::packets::play::CLIENT_TIME_UPDATE, data).encode(client)?; + let response = RawPacket::new(packets::play::CLIENT_TIME_UPDATE, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) @@ -388,7 +382,7 @@ async fn send_keep_alive(client: &Client, writer: &mut WriteHalf<'_>) -> Result< let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(proto::packets::play::CLIENT_KEEP_ALIVE, data).encode(client)?; + let response = RawPacket::new(packets::play::CLIENT_KEEP_ALIVE, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; // TODO: verify we receive keep alive response with same ID from client @@ -418,8 +412,7 @@ async fn send_lobby_title( let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = - RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_TEXT, data).encode(client)?; + let response = RawPacket::new(packets::play::CLIENT_SET_TITLE_TEXT, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; // Set subtitle @@ -430,8 +423,7 @@ async fn send_lobby_title( let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = - RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_SUBTITLE, data).encode(client)?; + let response = RawPacket::new(packets::play::CLIENT_SET_TITLE_SUBTITLE, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; // Set title times @@ -453,8 +445,7 @@ async fn send_lobby_title( let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = - RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_TIMES, data).encode(client)?; + let response = RawPacket::new(packets::play::CLIENT_SET_TITLE_TIMES, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) @@ -479,8 +470,7 @@ async fn send_lobby_sound_effect( let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = - RawPacket::new(proto::packets::play::CLIENT_NAMED_SOUND_EFFECT, data).encode(client)?; + let response = RawPacket::new(packets::play::CLIENT_NAMED_SOUND_EFFECT, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) @@ -508,7 +498,7 @@ async fn send_respawn_from_join( let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(proto::packets::play::CLIENT_RESPAWN, data).encode(client)?; + let response = RawPacket::new(packets::play::CLIENT_RESPAWN, data).encode(client)?; writer.write_all(&response).await.map_err(|_| ())?; Ok(()) @@ -654,8 +644,7 @@ async fn connect_to_server_no_timeout( let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let request = - RawPacket::new(proto::packets::handshake::SERVER_HANDSHAKE, data).encode(&tmp_client)?; + let request = RawPacket::new(packets::handshake::SERVER_HANDSHAKE, data).encode(&tmp_client)?; writer.write_all(&request).await.map_err(|_| ())?; // Request login start @@ -666,8 +655,7 @@ async fn connect_to_server_no_timeout( let mut data = Vec::new(); packet.encode(&mut data).map_err(|_| ())?; - let request = - RawPacket::new(proto::packets::login::SERVER_LOGIN_START, data).encode(&tmp_client)?; + let request = RawPacket::new(packets::login::SERVER_LOGIN_START, data).encode(&tmp_client)?; writer.write_all(&request).await.map_err(|_| ())?; // Incoming buffer @@ -675,7 +663,7 @@ async fn connect_to_server_no_timeout( loop { // Read packet from stream - let (packet, _raw) = match proto::read_packet(&tmp_client, &mut buf, &mut reader).await { + let (packet, _raw) = match packet::read_packet(&tmp_client, &mut buf, &mut reader).await { Ok(Some(packet)) => packet, Ok(None) => break, Err(_) => { @@ -688,8 +676,7 @@ async fn connect_to_server_no_timeout( let client_state = tmp_client.state(); // Catch set compression - if client_state == ClientState::Login - && packet.id == proto::packets::login::CLIENT_SET_COMPRESSION + if client_state == ClientState::Login && packet.id == packets::login::CLIENT_SET_COMPRESSION { // Decode compression packet let set_compression = @@ -713,9 +700,7 @@ async fn connect_to_server_no_timeout( } // Hijack login success - if client_state == ClientState::Login - && packet.id == proto::packets::login::CLIENT_LOGIN_SUCCESS - { + if client_state == ClientState::Login && packet.id == packets::login::CLIENT_LOGIN_SUCCESS { trace!(target: "lazymc::lobby", "Received login success from server connection, change to play mode"); // TODO: parse this packet to ensure it's fine @@ -743,11 +728,7 @@ async fn connect_to_server_no_timeout( } // Gracefully close connection - match writer.shutdown().await { - Ok(_) => {} - Err(err) if err.kind() == io::ErrorKind::NotConnected => {} - Err(_) => return Err(()), - } + net::close_tcp_stream(outbound).await.map_err(|_| ())?; Err(()) } @@ -780,11 +761,11 @@ async fn wait_for_server_join_game_no_timeout( outbound: &mut TcpStream, buf: &mut BytesMut, ) -> Result { - let (mut reader, mut writer) = outbound.split(); + let (mut reader, mut _writer) = outbound.split(); loop { // Read packet from stream - let (packet, _raw) = match proto::read_packet(client, buf, &mut reader).await { + let (packet, _raw) = match packet::read_packet(client, buf, &mut reader).await { Ok(Some(packet)) => packet, Ok(None) => break, Err(_) => { @@ -794,7 +775,7 @@ async fn wait_for_server_join_game_no_timeout( }; // Catch join game - if packet.id == proto::packets::play::CLIENT_JOIN_GAME { + if packet.id == packets::play::CLIENT_JOIN_GAME { let join_game = JoinGame::decode(&mut packet.data.as_slice()).map_err(|err| { dbg!(err); })?; @@ -808,11 +789,7 @@ async fn wait_for_server_join_game_no_timeout( } // Gracefully close connection - match writer.shutdown().await { - Ok(_) => {} - Err(err) if err.kind() == io::ErrorKind::NotConnected => {} - Err(_) => return Err(()), - } + net::close_tcp_stream_ref(outbound).await.map_err(|_| ())?; Err(()) } diff --git a/src/main.rs b/src/main.rs index 29dd3a0..678967e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,10 +10,12 @@ extern crate log; pub(crate) mod action; pub(crate) mod cli; pub(crate) mod config; +pub(crate) mod join; #[cfg(feature = "lobby")] pub(crate) mod lobby; pub(crate) mod mc; pub(crate) mod monitor; +pub(crate) mod net; pub(crate) mod os; pub(crate) mod proto; pub(crate) mod proxy; diff --git a/src/monitor.rs b/src/monitor.rs index e7e5f16..d0d8849 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -16,7 +16,9 @@ use tokio::net::TcpStream; use tokio::time; use crate::config::Config; -use crate::proto::{self, Client, ClientState, RawPacket}; +use crate::proto::client::{Client, ClientState}; +use crate::proto::packet::{self, RawPacket}; +use crate::proto::packets; use crate::server::{Server, State}; /// Monitor ping inverval in seconds. @@ -133,7 +135,7 @@ async fn send_handshake( let mut packet = Vec::new(); handshake.encode(&mut packet).map_err(|_| ())?; - let raw = RawPacket::new(proto::packets::handshake::SERVER_HANDSHAKE, packet) + let raw = RawPacket::new(packets::handshake::SERVER_HANDSHAKE, packet) .encode(client) .map_err(|_| ())?; stream.write_all(&raw).await.map_err(|_| ())?; @@ -143,7 +145,7 @@ async fn send_handshake( /// Send status request. async fn request_status(client: &Client, stream: &mut TcpStream) -> Result<(), ()> { - let raw = RawPacket::new(proto::packets::status::SERVER_STATUS, vec![]) + let raw = RawPacket::new(packets::status::SERVER_STATUS, vec![]) .encode(client) .map_err(|_| ())?; stream.write_all(&raw).await.map_err(|_| ())?; @@ -158,7 +160,7 @@ async fn send_ping(client: &Client, stream: &mut TcpStream) -> Result { let mut packet = Vec::new(); ping.encode(&mut packet).map_err(|_| ())?; - let raw = RawPacket::new(proto::packets::status::SERVER_PING, packet) + let raw = RawPacket::new(packets::status::SERVER_PING, packet) .encode(client) .map_err(|_| ())?; stream.write_all(&raw).await.map_err(|_| ())?; @@ -173,14 +175,14 @@ async fn wait_for_status(client: &Client, stream: &mut TcpStream) -> Result packet, Ok(None) => break, Err(_) => continue, }; // Catch status response - if packet.id == proto::packets::status::CLIENT_STATUS { + if packet.id == packets::status::CLIENT_STATUS { let status = StatusResponse::decode(&mut packet.data.as_slice()).map_err(|_| ())?; return Ok(status.server_status); } @@ -209,14 +211,14 @@ async fn wait_for_ping(client: &Client, stream: &mut TcpStream, token: u64) -> R loop { // Read packet from stream - let (packet, _raw) = match proto::read_packet(client, &mut buf, &mut reader).await { + let (packet, _raw) = match packet::read_packet(client, &mut buf, &mut reader).await { Ok(Some(packet)) => packet, Ok(None) => break, Err(_) => continue, }; // Catch ping response - if packet.id == proto::packets::status::CLIENT_PING { + if packet.id == packets::status::CLIENT_PING { let ping = PingResponse::decode(&mut packet.data.as_slice()).map_err(|_| ())?; // Ping token must match diff --git a/src/net.rs b/src/net.rs new file mode 100644 index 0000000..1dc834b --- /dev/null +++ b/src/net.rs @@ -0,0 +1,22 @@ +use std::error::Error; +use std::io; +use tokio::io::AsyncWriteExt; +use tokio::net::TcpStream; + +/// Gracefully close given TCP stream. +/// +/// Intended as helper to make code less messy. This also succeeds if already closed. +pub async fn close_tcp_stream(mut stream: TcpStream) -> Result<(), Box> { + close_tcp_stream_ref(&mut stream).await +} + +/// Gracefully close given TCP stream. +/// +/// Intended as helper to make code less messy. This also succeeds if already closed. +pub async fn close_tcp_stream_ref(stream: &mut TcpStream) -> Result<(), Box> { + match stream.shutdown().await { + Ok(_) => Ok(()), + Err(err) if err.kind() == io::ErrorKind::NotConnected => Ok(()), + Err(err) => Err(err.into()), + } +} diff --git a/src/proto/action.rs b/src/proto/action.rs new file mode 100644 index 0000000..3253ac4 --- /dev/null +++ b/src/proto/action.rs @@ -0,0 +1,51 @@ +use minecraft_protocol::data::chat::{Message, Payload}; +use minecraft_protocol::encoder::Encoder; +use minecraft_protocol::version::v1_14_4::game::GameDisconnect; +use minecraft_protocol::version::v1_14_4::login::LoginDisconnect; +use tokio::io::AsyncWriteExt; +use tokio::net::tcp::WriteHalf; + +use crate::proto::client::{Client, ClientState}; +use crate::proto::packet::RawPacket; +use crate::proto::packets; + +/// Kick client with a message. +/// +/// Should close connection afterwards. +pub async fn kick(client: &Client, msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> { + match client.state() { + ClientState::Login => login_kick(client, msg, writer).await, + ClientState::Play => play_kick(client, msg, writer).await, + _ => Err(()), + } +} + +/// Kick client with a message in login state. +/// +/// Should close connection afterwards. +async fn login_kick(client: &Client, msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> { + let packet = LoginDisconnect { + reason: Message::new(Payload::text(msg)), + }; + + let mut data = Vec::new(); + packet.encode(&mut data).map_err(|_| ())?; + + let response = RawPacket::new(packets::login::CLIENT_DISCONNECT, data).encode(client)?; + writer.write_all(&response).await.map_err(|_| ()) +} + +/// Kick client with a message in play state. +/// +/// Should close connection afterwards. +async fn play_kick(client: &Client, msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> { + let packet = GameDisconnect { + reason: Message::new(Payload::text(msg)), + }; + + let mut data = Vec::new(); + packet.encode(&mut data).map_err(|_| ())?; + + let response = RawPacket::new(packets::play::CLIENT_DISCONNECT, data).encode(client)?; + writer.write_all(&response).await.map_err(|_| ()) +} diff --git a/src/proto/client.rs b/src/proto/client.rs new file mode 100644 index 0000000..584696b --- /dev/null +++ b/src/proto/client.rs @@ -0,0 +1,118 @@ +use std::sync::atomic::{AtomicI32, Ordering}; +use std::sync::Mutex; + +/// Client state. +/// +/// Note: this does not keep track of encryption states. +#[derive(Debug)] +pub struct Client { + /// Current client state. + pub state: Mutex, + + /// Compression state. + /// + /// 0 or positive if enabled, negative if disabled. + pub compression: AtomicI32, +} + +impl Client { + /// Get client state. + pub fn state(&self) -> ClientState { + *self.state.lock().unwrap() + } + + /// Set client state. + pub fn set_state(&self, state: ClientState) { + *self.state.lock().unwrap() = state; + } + + /// Get compression threshold. + pub fn compressed(&self) -> i32 { + self.compression.load(Ordering::Relaxed) + } + + /// Whether compression is used. + pub fn is_compressed(&self) -> bool { + self.compressed() >= 0 + } + + /// Set compression value. + #[allow(unused)] + pub fn set_compression(&self, threshold: i32) { + trace!(target: "lazymc", "Client now uses compression threshold of {}", threshold); + self.compression.store(threshold, Ordering::Relaxed); + } +} + +impl Default for Client { + fn default() -> Self { + Self { + state: Default::default(), + compression: AtomicI32::new(-1), + } + } +} + +/// Protocol state a client may be in. +/// +/// Note: this does not include the `play` state, because this is never used anymore when a client +/// reaches this state. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum ClientState { + /// Initial client state. + Handshake, + + /// State to query server status. + Status, + + /// State to login to server. + Login, + + /// State to play on the server. + #[allow(unused)] + Play, +} + +impl ClientState { + /// From state ID. + pub fn from_id(id: i32) -> Option { + match id { + 0 => Some(Self::Handshake), + 1 => Some(Self::Status), + 2 => Some(Self::Login), + _ => None, + } + } + + /// Get state ID. + pub fn to_id(self) -> i32 { + match self { + Self::Handshake => 0, + Self::Status => 1, + Self::Login => 2, + Self::Play => -1, + } + } +} + +impl Default for ClientState { + fn default() -> Self { + Self::Handshake + } +} + +/// Client info, useful during connection handling. +#[derive(Debug, Clone, Default)] +pub struct ClientInfo { + /// Client protocol version. + pub protocol_version: Option, + + /// Client username. + pub username: Option, +} + +impl ClientInfo { + pub fn empty() -> Self { + Self::default() + } +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs new file mode 100644 index 0000000..f712819 --- /dev/null +++ b/src/proto/mod.rs @@ -0,0 +1,27 @@ +pub mod action; +pub mod client; +pub mod packet; +pub mod packets; + +/// Default minecraft protocol version name. +/// +/// Just something to default to when real server version isn't known or when no hint is specified +/// in the configuration. +/// +/// Should be kept up-to-date with latest supported Minecraft version by lazymc. +pub const PROTO_DEFAULT_VERSION: &str = "1.17.1"; + +/// Default minecraft protocol version. +/// +/// Just something to default to when real server version isn't known or when no hint is specified +/// in the configuration. +/// +/// Should be kept up-to-date with latest supported Minecraft version by lazymc. +pub const PROTO_DEFAULT_PROTOCOL: u32 = 756; + +/// Compression threshold to use. +// TODO: read this from server.properties instead +pub const COMPRESSION_THRESHOLD: i32 = 256; + +/// Default buffer size when reading packets. +pub(super) const BUF_SIZE: usize = 8 * 1024; diff --git a/src/proto.rs b/src/proto/packet.rs similarity index 55% rename from src/proto.rs rename to src/proto/packet.rs index f8bb2c4..39eb172 100644 --- a/src/proto.rs +++ b/src/proto/packet.rs @@ -1,6 +1,4 @@ use std::io::prelude::*; -use std::sync::atomic::{AtomicI32, Ordering}; -use std::sync::Mutex; use bytes::BytesMut; use flate2::read::ZlibDecoder; @@ -10,189 +8,10 @@ use tokio::io; use tokio::io::AsyncReadExt; use tokio::net::tcp::ReadHalf; +use crate::proto::client::Client; +use crate::proto::BUF_SIZE; use crate::types; -/// Default minecraft protocol version name. -/// -/// Just something to default to when real server version isn't known or when no hint is specified -/// in the configuration. -/// -/// Should be kept up-to-date with latest supported Minecraft version by lazymc. -pub const PROTO_DEFAULT_VERSION: &str = "1.17.1"; - -/// Default minecraft protocol version. -/// -/// Just something to default to when real server version isn't known or when no hint is specified -/// in the configuration. -/// -/// Should be kept up-to-date with latest supported Minecraft version by lazymc. -pub const PROTO_DEFAULT_PROTOCOL: u32 = 756; - -/// Compression threshold to use. -// TODO: read this from server.properties instead -pub const COMPRESSION_THRESHOLD: i32 = 256; - -/// Default buffer size when reading packets. -const BUF_SIZE: usize = 8 * 1024; - -/// Minecraft protocol packet IDs. -#[allow(unused)] -pub mod packets { - pub mod handshake { - pub const SERVER_HANDSHAKE: i32 = 0; - } - - pub mod status { - pub const CLIENT_STATUS: i32 = 0; - pub const CLIENT_PING: i32 = 1; - pub const SERVER_STATUS: i32 = 0; - pub const SERVER_PING: i32 = 1; - } - - pub mod login { - pub const CLIENT_DISCONNECT: i32 = 0x00; - pub const CLIENT_LOGIN_SUCCESS: i32 = 0x02; - pub const CLIENT_SET_COMPRESSION: i32 = 0x03; - pub const SERVER_LOGIN_START: i32 = 0x00; - } - - pub mod play { - pub const CLIENT_CHAT_MSG: i32 = 0x0F; - pub const CLIENT_PLUGIN_MESSAGE: i32 = 0x18; - pub const CLIENT_NAMED_SOUND_EFFECT: i32 = 0x19; - pub const CLIENT_DISCONNECT: i32 = 0x1A; - pub const CLIENT_KEEP_ALIVE: i32 = 0x21; - pub const CLIENT_JOIN_GAME: i32 = 0x26; - pub const CLIENT_PLAYER_POS_LOOK: i32 = 0x38; - pub const CLIENT_RESPAWN: i32 = 0x3D; - pub const CLIENT_SPAWN_POS: i32 = 0x4B; - pub const CLIENT_SET_TITLE_SUBTITLE: i32 = 0x57; - pub const CLIENT_TIME_UPDATE: i32 = 0x58; - pub const CLIENT_SET_TITLE_TEXT: i32 = 0x59; - pub const CLIENT_SET_TITLE_TIMES: i32 = 0x5A; - pub const SERVER_CLIENT_SETTINGS: i32 = 0x05; - pub const SERVER_PLUGIN_MESSAGE: i32 = 0x0A; - pub const SERVER_PLAYER_POS: i32 = 0x11; - pub const SERVER_PLAYER_POS_ROT: i32 = 0x12; - } -} - -/// Client state. -/// -/// Note: this does not keep track of encryption states. -#[derive(Debug)] -pub struct Client { - /// Current client state. - pub state: Mutex, - - /// Compression state. - /// - /// 0 or positive if enabled, negative if disabled. - pub compression: AtomicI32, -} - -impl Client { - /// Get client state. - pub fn state(&self) -> ClientState { - *self.state.lock().unwrap() - } - - /// Set client state. - pub fn set_state(&self, state: ClientState) { - *self.state.lock().unwrap() = state; - } - - /// Get compression threshold. - pub fn compressed(&self) -> i32 { - self.compression.load(Ordering::Relaxed) - } - - /// Whether compression is used. - pub fn is_compressed(&self) -> bool { - self.compressed() >= 0 - } - - /// Set compression value. - #[allow(unused)] - pub fn set_compression(&self, threshold: i32) { - trace!(target: "lazymc", "Client now uses compression threshold of {}", threshold); - self.compression.store(threshold, Ordering::Relaxed); - } -} - -impl Default for Client { - fn default() -> Self { - Self { - state: Default::default(), - compression: AtomicI32::new(-1), - } - } -} - -/// Protocol state a client may be in. -/// -/// Note: this does not include the `play` state, because this is never used anymore when a client -/// reaches this state. -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub enum ClientState { - /// Initial client state. - Handshake, - - /// State to query server status. - Status, - - /// State to login to server. - Login, - - /// State to play on the server. - #[allow(unused)] - Play, -} - -impl ClientState { - /// From state ID. - pub fn from_id(id: i32) -> Option { - match id { - 0 => Some(Self::Handshake), - 1 => Some(Self::Status), - 2 => Some(Self::Login), - _ => None, - } - } - - /// Get state ID. - pub fn to_id(self) -> i32 { - match self { - Self::Handshake => 0, - Self::Status => 1, - Self::Login => 2, - Self::Play => -1, - } - } -} - -impl Default for ClientState { - fn default() -> Self { - Self::Handshake - } -} - -/// Client info, useful during connection handling. -#[derive(Debug, Default)] -pub struct ClientInfo { - /// Client protocol version. - pub protocol_version: Option, - - /// Client username. - pub username: Option, -} - -impl ClientInfo { - pub fn empty() -> Self { - Self::default() - } -} - /// Raw Minecraft packet. /// /// Having a packet ID and a raw data byte array. diff --git a/src/proto/packets.rs b/src/proto/packets.rs new file mode 100644 index 0000000..ede89a7 --- /dev/null +++ b/src/proto/packets.rs @@ -0,0 +1,41 @@ +//! Minecraft protocol packet IDs. + +#![allow(unused)] + +pub mod handshake { + pub const SERVER_HANDSHAKE: i32 = 0; +} + +pub mod status { + pub const CLIENT_STATUS: i32 = 0; + pub const CLIENT_PING: i32 = 1; + pub const SERVER_STATUS: i32 = 0; + pub const SERVER_PING: i32 = 1; +} + +pub mod login { + pub const CLIENT_DISCONNECT: i32 = 0x00; + pub const CLIENT_LOGIN_SUCCESS: i32 = 0x02; + pub const CLIENT_SET_COMPRESSION: i32 = 0x03; + pub const SERVER_LOGIN_START: i32 = 0x00; +} + +pub mod play { + pub const CLIENT_CHAT_MSG: i32 = 0x0F; + pub const CLIENT_PLUGIN_MESSAGE: i32 = 0x18; + pub const CLIENT_NAMED_SOUND_EFFECT: i32 = 0x19; + pub const CLIENT_DISCONNECT: i32 = 0x1A; + pub const CLIENT_KEEP_ALIVE: i32 = 0x21; + pub const CLIENT_JOIN_GAME: i32 = 0x26; + pub const CLIENT_PLAYER_POS_LOOK: i32 = 0x38; + pub const CLIENT_RESPAWN: i32 = 0x3D; + pub const CLIENT_SPAWN_POS: i32 = 0x4B; + pub const CLIENT_SET_TITLE_SUBTITLE: i32 = 0x57; + pub const CLIENT_TIME_UPDATE: i32 = 0x58; + pub const CLIENT_SET_TITLE_TEXT: i32 = 0x59; + pub const CLIENT_SET_TITLE_TIMES: i32 = 0x5A; + pub const SERVER_CLIENT_SETTINGS: i32 = 0x05; + pub const SERVER_PLUGIN_MESSAGE: i32 = 0x0A; + pub const SERVER_PLAYER_POS: i32 = 0x11; + pub const SERVER_PLAYER_POS_ROT: i32 = 0x12; +} diff --git a/src/proxy.rs b/src/proxy.rs index 197c7d7..312a402 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -5,6 +5,8 @@ use tokio::io; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; +use crate::net; + /// Proxy the inbound stream to a target address. pub async fn proxy(inbound: TcpStream, addr_target: SocketAddr) -> Result<(), Box> { proxy_with_queue(inbound, addr_target, &[]).await @@ -64,5 +66,8 @@ pub async fn proxy_inbound_outbound_with_queue( tokio::try_join!(client_to_server, server_to_client)?; + // Gracefully close connection if not done already + net::close_tcp_stream(inbound).await?; + Ok(()) } diff --git a/src/service/server.rs b/src/service/server.rs index b5ecc38..4c156c6 100644 --- a/src/service/server.rs +++ b/src/service/server.rs @@ -6,7 +6,7 @@ use futures::FutureExt; use tokio::net::{TcpListener, TcpStream}; use crate::config::Config; -use crate::proto::Client; +use crate::proto::client::Client; use crate::proxy; use crate::server::{self, Server}; use crate::service; diff --git a/src/status.rs b/src/status.rs index 93d1c34..ceaf07d 100644 --- a/src/status.rs +++ b/src/status.rs @@ -1,6 +1,4 @@ -use std::ops::Deref; use std::sync::Arc; -use std::time::Duration; use bytes::BytesMut; use minecraft_protocol::data::chat::{Message, Payload}; @@ -8,19 +6,18 @@ use minecraft_protocol::data::server_status::*; use minecraft_protocol::decoder::Decoder; use minecraft_protocol::encoder::Encoder; use minecraft_protocol::version::v1_14_4::handshake::Handshake; -use minecraft_protocol::version::v1_14_4::login::{LoginDisconnect, LoginStart}; +use minecraft_protocol::version::v1_14_4::login::LoginStart; use minecraft_protocol::version::v1_14_4::status::StatusResponse; -use tokio::io::{self, AsyncWriteExt}; -use tokio::net::tcp::WriteHalf; +use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; -use tokio::time; use crate::config::*; -#[cfg(feature = "lobby")] -use crate::lobby; -use crate::proto::{self, Client, ClientInfo, ClientState, RawPacket}; -use crate::server::{self, Server, State}; -use crate::service; +use crate::join; +use crate::proto::action; +use crate::proto::client::{Client, ClientInfo, ClientState}; +use crate::proto::packet::{self, RawPacket}; +use crate::proto::packets; +use crate::server::{self, Server}; /// Proxy the given inbound stream to a target address. // TODO: do not drop error here, return Box @@ -35,16 +32,13 @@ pub async fn serve( // Incoming buffer and packet holding queue let mut buf = BytesMut::new(); - // Remember inbound packets, used for client holding and forwarding - let remember_inbound = config.join.methods.contains(&Method::Hold) - || config.join.methods.contains(&Method::Forward); + // Remember inbound packets, track client info let mut inbound_history = BytesMut::new(); - let mut client_info = ClientInfo::empty(); loop { // Read packet from stream - let (packet, raw) = match proto::read_packet(&client, &mut buf, &mut reader).await { + let (packet, raw) = match packet::read_packet(&client, &mut buf, &mut reader).await { Ok(Some(packet)) => packet, Ok(None) => break, Err(_) => { @@ -58,7 +52,7 @@ pub async fn serve( // Hijack handshake if client_state == ClientState::Handshake - && packet.id == proto::packets::handshake::SERVER_HANDSHAKE + && packet.id == packets::handshake::SERVER_HANDSHAKE { // Parse handshake let handshake = match Handshake::decode(&mut packet.data.as_slice()) { @@ -84,8 +78,8 @@ pub async fn serve( .replace(handshake.protocol_version); client.set_state(new_state); - // If login handshake and holding is enabled, hold packets - if new_state == ClientState::Login && remember_inbound { + // If loggin in with handshake, remember inbound + if new_state == ClientState::Login { inbound_history.extend(raw); } @@ -93,8 +87,7 @@ pub async fn serve( } // Hijack server status packet - if client_state == ClientState::Status && packet.id == proto::packets::status::SERVER_STATUS - { + if client_state == ClientState::Status && packet.id == packets::status::SERVER_STATUS { let server_status = server_status(&config, &server).await; let packet = StatusResponse { server_status }; @@ -108,15 +101,13 @@ pub async fn serve( } // Hijack ping packet - if client_state == ClientState::Status && packet.id == proto::packets::status::SERVER_PING { + if client_state == ClientState::Status && packet.id == packets::status::SERVER_PING { writer.write_all(&raw).await.map_err(|_| ())?; continue; } // Hijack login start - if client_state == ClientState::Login - && packet.id == proto::packets::login::SERVER_LOGIN_START - { + if client_state == ClientState::Login && packet.id == packets::login::SERVER_LOGIN_START { // Try to get login username, update client info // TODO: we should always parse this packet successfully let username = LoginStart::decode(&mut packet.data.as_slice()) @@ -132,100 +123,37 @@ pub async fn serve( } None => info!(target: "lazymc", "Kicked player because lockout is enabled"), } - kick(&client, &config.lockout.message, &mut writer).await?; + action::kick(&client, &config.lockout.message, &mut writer).await?; break; } // Start server if not starting yet Server::start(config.clone(), server.clone(), username).await; - // Use join occupy methods - for method in &config.join.methods { - match method { - // Kick method, immediately kick client - Method::Kick => { - trace!(target: "lazymc", "Using kick method to occupy joining client"); + // Remember inbound packets + inbound_history.extend(&raw); + inbound_history.extend(&buf); - // Select message and kick - let msg = match server.state() { - server::State::Starting - | server::State::Stopped - | server::State::Started => &config.join.kick.starting, - server::State::Stopping => &config.join.kick.stopping, - }; - kick(&client, msg, &mut writer).await?; - break; - } + // Build inbound packet queue with everything from login start (including this) + let mut login_queue = BytesMut::with_capacity(raw.len() + buf.len()); + login_queue.extend(&raw); + login_queue.extend(&buf); - // Hold method, hold client connection while server starts - Method::Hold => { - trace!(target: "lazymc", "Using hold method to occupy joining client"); + // Buf is fully consumed here + buf.clear(); - // Server must be starting - if server.state() != State::Starting { - continue; - } - - // Hold login packet and remaining read bytes - inbound_history.extend(&raw); - inbound_history.extend(buf.split_off(0)); - - // Start holding - if hold(&config, &server).await? { - service::server::route_proxy_queue(inbound, config, inbound_history); - return Ok(()); - } - } - - // Forward method, forward client connection while server starts - Method::Forward => { - trace!(target: "lazymc", "Using forward method to occupy joining client"); - - // Hold login packet and remaining read bytes - inbound_history.extend(&raw); - inbound_history.extend(buf.split_off(0)); - - // Forward client - debug!(target: "lazymc", "Forwarding client to {:?}!", config.join.forward.address); - - service::server::route_proxy_address_queue( - inbound, - config.join.forward.address, - inbound_history, - ); - return Ok(()); - - // TODO: do not consume client here, allow other join method on fail - } - - // Lobby method, keep client in lobby while server starts - #[cfg(feature = "lobby")] - Method::Lobby => { - trace!(target: "lazymc", "Using lobby method to occupy joining client"); - - // Build queue with login packet and any additionally received - let mut queue = BytesMut::with_capacity(raw.len() + buf.len()); - queue.extend(raw); - queue.extend(buf.split_off(0)); - - // Start lobby - lobby::serve(client, client_info, inbound, config, server, queue).await?; - return Ok(()); - // TODO: do not consume client here, allow other join method on fail - } - - // Lobby method, keep client in lobby while server starts - #[cfg(not(feature = "lobby"))] - Method::Lobby => { - error!(target: "lazymc", "Lobby join method not supported in this lazymc build"); - } - } - } - - debug!(target: "lazymc", "No method left to occupy joining client, disconnecting"); - - // Done occupying client, just disconnect - break; + // Start occupying client + join::occupy( + client, + client_info, + config, + server, + inbound, + inbound_history, + login_queue, + ) + .await?; + return Ok(()); } // Show unhandled packet warning @@ -234,96 +162,9 @@ pub async fn serve( debug!(target: "lazymc", "- Packet ID: {}", packet.id); } - // Gracefully close connection - match writer.shutdown().await { - Ok(_) => {} - Err(err) if err.kind() == io::ErrorKind::NotConnected => {} - Err(_) => return Err(()), - } - Ok(()) } -/// Hold a client while server starts. -/// -/// Returns holding status. `true` if client is held and it should be proxied, `false` it was held -/// but it timed out. -pub async fn hold<'a>(config: &Config, server: &Server) -> Result { - trace!(target: "lazymc", "Started holding client"); - - // A task to wait for suitable server state - // Waits for started state, errors if stopping/stopped state is reached - let task_wait = async { - let mut state = server.state_receiver(); - loop { - // Wait for state change - state.changed().await.unwrap(); - - match state.borrow().deref() { - // Still waiting on server start - State::Starting => { - trace!(target: "lazymc", "Server not ready, holding client for longer"); - continue; - } - - // Server started, start relaying and proxy - State::Started => { - break true; - } - - // Server stopping, this shouldn't happen, kick - State::Stopping => { - warn!(target: "lazymc", "Server stopping for held client, disconnecting"); - break false; - } - - // Server stopped, this shouldn't happen, disconnect - State::Stopped => { - error!(target: "lazymc", "Server stopped for held client, disconnecting"); - break false; - } - } - } - }; - - // Wait for server state with timeout - let timeout = Duration::from_secs(config.join.hold.timeout as u64); - match time::timeout(timeout, task_wait).await { - // Relay client to proxy - Ok(true) => { - info!(target: "lazymc", "Server ready for held client, relaying to server"); - Ok(true) - } - - // Server stopping/stopped, this shouldn't happen, kick - Ok(false) => { - warn!(target: "lazymc", "Server stopping for held client"); - Ok(false) - } - - // Timeout reached, kick with starting message - Err(_) => { - warn!(target: "lazymc", "Held client reached timeout of {}s", config.join.hold.timeout); - Ok(false) - } - } -} - -/// Kick client with a message. -/// -/// Should close connection afterwards. -async fn kick(client: &Client, msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> { - let packet = LoginDisconnect { - reason: Message::new(Payload::text(msg)), - }; - - let mut data = Vec::new(); - packet.encode(&mut data).map_err(|_| ())?; - - let response = RawPacket::new(proto::packets::login::CLIENT_DISCONNECT, data).encode(client)?; - writer.write_all(&response).await.map_err(|_| ()) -} - /// Build server status object to respond to client with. async fn server_status(config: &Config, server: &Server) -> ServerStatus { let status = server.status().await;