diff --git a/res/lazymc.toml b/res/lazymc.toml index 68d33d0..e41a98b 100644 --- a/res/lazymc.toml +++ b/res/lazymc.toml @@ -43,6 +43,10 @@ command = "java -Xmx1G -Xms1G -jar server.jar --nogui" # Minimum time in seconds to stay online when server is started. #minimum_online_time = 60 +# Hold client for number of seconds on connect while server starts (instead of kicking immediately). +# 0 to disable, keep below Minecraft timeout of 30 seconds. +#hold_client_for = 25 + [messages] # MOTDs, shown in server browser. #motd_sleeping = "☠ Server is sleeping\n§2☻ Join to start it up" diff --git a/src/config.rs b/src/config.rs index d4884b2..86837e4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -144,6 +144,16 @@ pub struct Time { /// Minimum time in seconds to stay online when server is started. #[serde(default, alias = "minimum_online_time")] pub min_online_time: u32, + + /// Hold client for number of seconds while server starts, instead of kicking immediately. + pub hold_client_for: u32, +} + +impl Time { + /// Whether to hold clients. + pub fn hold(&self) -> bool { + self.hold_client_for > 0 + } } impl Default for Time { @@ -151,6 +161,7 @@ impl Default for Time { Self { sleep_after: 60, min_online_time: 60, + hold_client_for: 25, } } } diff --git a/src/proxy.rs b/src/proxy.rs index 30f237d..40cf5b4 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -6,7 +6,18 @@ use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; /// Proxy the inbound stream to a target address. -pub async fn proxy(mut inbound: TcpStream, addr_target: SocketAddr) -> Result<(), Box> { +pub async fn proxy(inbound: TcpStream, addr_target: SocketAddr) -> Result<(), Box> { + proxy_with_queue(inbound, addr_target, &[]).await +} + +/// Proxy the inbound stream to a target address. +/// +/// Send the queue to the target server before proxying. +pub async fn proxy_with_queue( + mut inbound: TcpStream, + addr_target: SocketAddr, + queue: &[u8], +) -> Result<(), Box> { // Set up connection to server // TODO: on connect fail, ping server and redirect to serve_status if offline let mut outbound = TcpStream::connect(addr_target).await?; @@ -14,11 +25,17 @@ pub async fn proxy(mut inbound: TcpStream, addr_target: SocketAddr) -> Result<() let (mut ri, mut wi) = inbound.split(); let (mut ro, mut wo) = outbound.split(); + // Forward queued bytes to server once writable + if !queue.is_empty() { + wo.writable().await?; + trace!(target: "lazymc", "Relaying {} queued bytes to server", queue.len()); + wo.write_all(&queue).await?; + } + let client_to_server = async { io::copy(&mut ri, &mut wo).await?; wo.shutdown().await }; - let server_to_client = async { io::copy(&mut ro, &mut wi).await?; wi.shutdown().await diff --git a/src/status.rs b/src/status.rs index b7d6025..813c1aa 100644 --- a/src/status.rs +++ b/src/status.rs @@ -1,6 +1,10 @@ use std::sync::Arc; +use std::time::{Duration, Instant}; +use crate::proxy; +use crate::server::State; use bytes::BytesMut; +use futures::TryFutureExt; use minecraft_protocol::data::chat::{Message, Payload}; use minecraft_protocol::data::server_status::*; use minecraft_protocol::decoder::Decoder; @@ -8,14 +12,18 @@ use minecraft_protocol::encoder::Encoder; use minecraft_protocol::version::v1_14_4::handshake::Handshake; use minecraft_protocol::version::v1_14_4::login::{LoginDisconnect, LoginStart}; use minecraft_protocol::version::v1_14_4::status::StatusResponse; -use tokio::io; -use tokio::io::AsyncWriteExt; +use tokio::io::{self, AsyncWriteExt}; +use tokio::net::tcp::WriteHalf; use tokio::net::TcpStream; +use tokio::time; use crate::config::*; use crate::proto::{self, Client, ClientState, RawPacket}; use crate::server::{self, Server}; +/// Client holding server state poll interval. +const HOLD_POLL_INTERVAL: Duration = Duration::from_millis(500); + /// Proxy the given inbound stream to a target address. // TODO: do not drop error here, return Box pub async fn serve( @@ -26,8 +34,9 @@ pub async fn serve( ) -> Result<(), ()> { let (mut reader, mut writer) = inbound.split(); - // Incoming buffer + // Incoming buffer and packet holding queue let mut buf = BytesMut::new(); + let mut hold_queue = BytesMut::new(); loop { // Read packet from stream @@ -40,52 +49,33 @@ pub async fn serve( } }; - // Hijack login start - if client.state() == ClientState::Login && packet.id == proto::LOGIN_PACKET_ID_LOGIN_START { - // Try to get login username - let username = LoginStart::decode(&mut packet.data.as_slice()) - .ok() - .map(|p| p.name); - - // Select message - let msg = match server.state() { - server::State::Starting | server::State::Stopped | server::State::Started => { - &config.messages.login_starting - } - server::State::Stopping => &config.messages.login_stopping, - }; - - let packet = LoginDisconnect { - reason: Message::new(Payload::text(msg)), - }; - - let mut data = Vec::new(); - packet.encode(&mut data).map_err(|_| ())?; - - let response = RawPacket::new(0, data).encode()?; - writer.write_all(&response).await.map_err(|_| ())?; - - // Start server if not starting yet - Server::start(config, server, username); - break; - } + // Grab client state + let client_state = client.state(); // Hijack handshake - if client.state() == ClientState::Handshake && packet.id == proto::STATUS_PACKET_ID_STATUS { - match Handshake::decode(&mut packet.data.as_slice()) { + if client_state == ClientState::Handshake && packet.id == proto::STATUS_PACKET_ID_STATUS { + // Parse handshake, grab new state + let new_state = match Handshake::decode(&mut packet.data.as_slice()) { Ok(handshake) => { // TODO: do not panic here - client.set_state( - ClientState::from_id(handshake.next_state) - .expect("unknown next client state"), - ); + ClientState::from_id(handshake.next_state).expect("unknown next client state") } Err(_) => break, + }; + + // Update client state + client.set_state(new_state); + + // If login handshake and holding is enabled, hold packets + if new_state == ClientState::Login && config.time.hold() { + hold_queue.extend(raw); } + + continue; } // Hijack server status packet - if client.state() == ClientState::Status && packet.id == proto::STATUS_PACKET_ID_STATUS { + if client_state == ClientState::Status && packet.id == proto::STATUS_PACKET_ID_STATUS { // Select version and player max from last known server status let (version, max) = match server.clone_status() { Some(status) => (status.version, status.players.max), @@ -122,18 +112,52 @@ pub async fn serve( let response = RawPacket::new(0, data).encode()?; writer.write_all(&response).await.map_err(|_| ())?; + continue; } // Hijack ping packet - if client.state() == ClientState::Status && packet.id == proto::STATUS_PACKET_ID_PING { + if client_state == ClientState::Status && packet.id == proto::STATUS_PACKET_ID_PING { writer.write_all(&raw).await.map_err(|_| ())?; continue; } + // Hijack login start + if client_state == ClientState::Login && packet.id == proto::LOGIN_PACKET_ID_LOGIN_START { + // Try to get login username + let username = LoginStart::decode(&mut packet.data.as_slice()) + .ok() + .map(|p| p.name); + + // Start server if not starting yet + Server::start(config.clone(), server.clone(), username); + + // Hold client if enabled and starting + if config.time.hold() && server.state() == State::Starting { + // Hold login packet and remaining read bytes + hold_queue.extend(raw); + hold_queue.extend(buf.split_off(0)); + + // Start holding + hold(inbound, config, server, &mut hold_queue).await?; + return Ok(()); + } + + // Select message and kick + let msg = match server.state() { + server::State::Starting | server::State::Stopped | server::State::Started => { + &config.messages.login_starting + } + server::State::Stopping => &config.messages.login_stopping, + }; + kick(msg, &mut writer).await?; + + break; + } + // Show unhandled packet warning debug!(target: "lazymc", "Received unhandled packet:"); - debug!(target: "lazymc", "- State: {:?}", client.state()); + debug!(target: "lazymc", "- State: {:?}", client_state); debug!(target: "lazymc", "- Packet ID: {}", packet.id); } @@ -146,3 +170,92 @@ pub async fn serve( Ok(()) } + +/// Hold a client while server starts. +/// +/// Relays client to proxy once server is ready. +pub async fn hold<'a>( + mut inbound: TcpStream, + config: Arc, + server: Arc, + holding: &mut BytesMut, +) -> Result<(), ()> { + trace!(target: "lazymc", "Started holding client"); + + // Set up polling interval, get timeout + let mut poll_interval = time::interval(HOLD_POLL_INTERVAL); + let since = Instant::now(); + let timeout = config.time.hold_client_for as u64; + + loop { + // TODO: do not poll, wait for started signal instead (with timeout) + poll_interval.tick().await; + + trace!("Polling server state for holding client..."); + + match server.state() { + // Still waiting on server start + State::Starting => { + trace!(target: "lazymc", "Server not ready, holding client for longer"); + + // If hold timeout is reached, kick client + if since.elapsed().as_secs() >= timeout { + warn!(target: "lazymc", "Holding client reached timeout of {}s, disconnecting", timeout); + kick(&config.messages.login_starting, &mut inbound.split().1).await?; + return Ok(()); + } + + continue; + } + + // Server started, start relaying and proxy + State::Started => { + // TODO: drop client if already disconnected + + // Relay client to proxy + info!(target: "lazymc", "Server ready for held client, relaying to server"); + proxy::proxy_with_queue(inbound, config.server.address, &holding) + .map_err(|_| ()) + .await?; + return Ok(()); + } + + // Server stopping, this shouldn't happen, kick + State::Stopping => { + warn!(target: "lazymc", "Server stopping for held client, disconnecting"); + kick(&config.messages.login_stopping, &mut inbound.split().1).await?; + break; + } + + // Server stopped, this shouldn't happen, disconnect + State::Stopped => { + error!(target: "lazymc", "Server stopped for held client, disconnecting"); + break; + } + } + } + + // Gracefully close connection + match inbound.shutdown().await { + Ok(_) => {} + Err(err) if err.kind() == io::ErrorKind::NotConnected => {} + Err(_) => return Err(()), + } + + Ok(()) +} + +/// Kick client with a message. +/// +/// Should close connection afterwards. +async fn kick<'a>(msg: &str, writer: &mut WriteHalf<'a>) -> Result<(), ()> { + let packet = LoginDisconnect { + reason: Message::new(Payload::text(msg)), + }; + + let mut data = Vec::new(); + packet.encode(&mut data).map_err(|_| ())?; + + let response = RawPacket::new(0, data).encode()?; + writer.write_all(&response).await.map_err(|_| ()) +}