From 38d90681c7aa4c82715727271729ecef885db163 Mon Sep 17 00:00:00 2001 From: timvisee Date: Sun, 14 Nov 2021 16:20:46 +0100 Subject: [PATCH] Improve waiting for server when holding client Instead of constantly polling the server state until it is ready, this now subscribes to server state changes and uses a proper timeout. --- Cargo.toml | 2 +- src/monitor.rs | 4 +-- src/server.rs | 21 ++++++++++- src/status.rs | 95 +++++++++++++++++++++++++++----------------------- 4 files changed, 74 insertions(+), 48 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e9f7f4d..7aa4e90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ rand = "0.8" serde = "1.0" shlex = "1.1" thiserror = "1.0" -tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "io-util", "net", "macros", "time", "process", "signal"] } +tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "io-util", "net", "macros", "time", "process", "signal", "sync"] } toml = "0.5" # Feature: rcon diff --git a/src/monitor.rs b/src/monitor.rs index 9d1758c..6f569df 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -36,6 +36,8 @@ pub async fn monitor_server(config: Arc, server: Arc) { let mut poll_interval = time::interval(MONITOR_POLL_INTERVAL); loop { + poll_interval.tick().await; + // Poll server state and update internal status trace!(target: "lazymc::monitor", "Fetching status for {} ... ", addr); let status = poll_server(&config, &server, addr).await; @@ -65,8 +67,6 @@ pub async fn monitor_server(config: Arc, server: Arc) { warn!(target: "lazymc", "Failed to force kill server"); } } - - poll_interval.tick().await; } } diff --git a/src/server.rs b/src/server.rs index 4ab521b..bae9405 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,6 +5,7 @@ use std::time::{Duration, Instant}; use futures::FutureExt; use minecraft_protocol::data::server_status::ServerStatus; use tokio::process::Command; +use tokio::sync::watch; use tokio::time; use crate::config::Config; @@ -61,6 +62,12 @@ pub struct Server { /// Matches `State`, utilzes AtomicU8 for better performance. state: AtomicU8, + /// State watch sender, broadcast state changes. + state_watch_sender: watch::Sender, + + /// State watch receiver, subscribe to state changes. + state_watch_receiver: watch::Receiver, + /// Server process PID. /// /// Set if a server process is running. @@ -92,6 +99,11 @@ impl Server { State::from_u8(self.state.load(Ordering::Relaxed)) } + /// Get state receiver to subscribe on server state changes. + pub fn state_receiver(&self) -> watch::Receiver { + self.state_watch_receiver.clone() + } + /// Set a new state. /// /// This updates various other internal things depending on how the state changes. @@ -128,6 +140,9 @@ impl Server { trace!("Change server state from {:?} to {:?}", old, new); + // Broadcast change + let _ = self.state_watch_sender.send(new); + // Update kill at time for starting/stopping state *self.kill_at.write().unwrap() = match new { State::Starting if config.time.start_timeout > 0 => { @@ -155,7 +170,7 @@ impl Server { true } - /// Update status as polled from the server. + /// Update status as obtained from the server. /// /// This updates various other internal things depending on the current state and the given /// status. @@ -311,8 +326,12 @@ impl Server { impl Default for Server { fn default() -> Self { + let (state_watch_sender, state_watch_receiver) = watch::channel(State::Stopped); + Self { state: AtomicU8::new(State::Stopped.to_u8()), + state_watch_sender, + state_watch_receiver, pid: Default::default(), status: Default::default(), last_active: Default::default(), diff --git a/src/status.rs b/src/status.rs index c191e23..f33917c 100644 --- a/src/status.rs +++ b/src/status.rs @@ -1,5 +1,6 @@ +use std::ops::Deref; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use crate::server::State; use bytes::BytesMut; @@ -20,9 +21,6 @@ use crate::proto::{self, Client, ClientState, RawPacket}; use crate::server::{self, Server}; use crate::service; -/// Client holding server state poll interval. -const HOLD_POLL_INTERVAL: Duration = Duration::from_millis(500); - /// Proxy the given inbound stream to a target address. // TODO: do not drop error here, return Box pub async fn serve( @@ -159,54 +157,63 @@ pub async fn hold<'a>( ) -> Result<(), ()> { trace!(target: "lazymc", "Started holding client"); - // Set up polling interval, get timeout - let mut poll_interval = time::interval(HOLD_POLL_INTERVAL); - let since = Instant::now(); - let timeout = config.time.hold_client_for as u64; + // A task to wait for suitable server state + // Waits for started state, errors if stopping/stopped state is reached + let task_wait = async { + let mut state = server.state_receiver(); + loop { + // Wait for state change + if state.changed().await.is_err() { + break Err(()); + } - loop { - // TODO: wait for start signal over channel instead of polling - poll_interval.tick().await; - - trace!("Polling server state for holding client..."); - - match server.state() { - // Still waiting on server start - State::Starting => { - trace!(target: "lazymc", "Server not ready, holding client for longer"); - - // If hold timeout is reached, kick client - if since.elapsed().as_secs() >= timeout { - warn!(target: "lazymc", "Held client reached timeout of {}s, disconnecting", timeout); - kick(&config.messages.login_starting, &mut inbound.split().1).await?; - return Ok(()); + match state.borrow().deref() { + // Still waiting on server start + State::Starting => { + trace!(target: "lazymc", "Server not ready, holding client for longer"); + continue; } - continue; - } + // Server started, start relaying and proxy + State::Started => { + break Ok(()); + } - // Server started, start relaying and proxy - State::Started => { - // TODO: drop client if already disconnected + // Server stopping, this shouldn't happen, kick + State::Stopping => { + warn!(target: "lazymc", "Server stopping for held client, disconnecting"); + break Err(()); + } - // Relay client to proxy - info!(target: "lazymc", "Server ready for held client, relaying to server"); - service::server::route_proxy_queue(inbound, config, hold_queue); - return Ok(()); + // Server stopped, this shouldn't happen, disconnect + State::Stopped => { + error!(target: "lazymc", "Server stopped for held client, disconnecting"); + break Err(()); + } } + } + }; - // Server stopping, this shouldn't happen, kick - State::Stopping => { - warn!(target: "lazymc", "Server stopping for held client, disconnecting"); - kick(&config.messages.login_stopping, &mut inbound.split().1).await?; - break; - } + // Wait for server state with timeout + let timeout = Duration::from_secs(config.time.hold_client_for as u64); + match time::timeout(timeout, task_wait).await { + // Relay client to proxy + Ok(Ok(())) => { + info!(target: "lazymc", "Server ready for held client, relaying to server"); + service::server::route_proxy_queue(inbound, config, hold_queue); + return Ok(()); + } - // Server stopped, this shouldn't happen, disconnect - State::Stopped => { - error!(target: "lazymc", "Server stopped for held client, disconnecting"); - break; - } + // Server stopping/stopped, this shouldn't happen, kick + Ok(Err(())) => { + warn!(target: "lazymc", "Server stopping for held client, disconnecting"); + kick(&config.messages.login_stopping, &mut inbound.split().1).await?; + } + + // Timeout reached, kick with starting message + Err(_) => { + warn!(target: "lazymc", "Held client reached timeout of {}s, disconnecting", config.time.hold_client_for); + kick(&config.messages.login_starting, &mut inbound.split().1).await?; } }