Improve waiting for server when holding client

Instead of constantly polling the server state until it is ready, this
now subscribes to server state changes and uses a proper timeout.
This commit is contained in:
timvisee
2021-11-14 16:20:46 +01:00
parent 2af20945cc
commit 38d90681c7
4 changed files with 74 additions and 48 deletions

View File

@@ -37,7 +37,7 @@ rand = "0.8"
serde = "1.0"
shlex = "1.1"
thiserror = "1.0"
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "io-util", "net", "macros", "time", "process", "signal"] }
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "io-util", "net", "macros", "time", "process", "signal", "sync"] }
toml = "0.5"
# Feature: rcon

View File

@@ -36,6 +36,8 @@ pub async fn monitor_server(config: Arc<Config>, server: Arc<Server>) {
let mut poll_interval = time::interval(MONITOR_POLL_INTERVAL);
loop {
poll_interval.tick().await;
// Poll server state and update internal status
trace!(target: "lazymc::monitor", "Fetching status for {} ... ", addr);
let status = poll_server(&config, &server, addr).await;
@@ -65,8 +67,6 @@ pub async fn monitor_server(config: Arc<Config>, server: Arc<Server>) {
warn!(target: "lazymc", "Failed to force kill server");
}
}
poll_interval.tick().await;
}
}

View File

@@ -5,6 +5,7 @@ use std::time::{Duration, Instant};
use futures::FutureExt;
use minecraft_protocol::data::server_status::ServerStatus;
use tokio::process::Command;
use tokio::sync::watch;
use tokio::time;
use crate::config::Config;
@@ -61,6 +62,12 @@ pub struct Server {
/// Matches `State`, utilzes AtomicU8 for better performance.
state: AtomicU8,
/// State watch sender, broadcast state changes.
state_watch_sender: watch::Sender<State>,
/// State watch receiver, subscribe to state changes.
state_watch_receiver: watch::Receiver<State>,
/// Server process PID.
///
/// Set if a server process is running.
@@ -92,6 +99,11 @@ impl Server {
State::from_u8(self.state.load(Ordering::Relaxed))
}
/// Get state receiver to subscribe on server state changes.
pub fn state_receiver(&self) -> watch::Receiver<State> {
self.state_watch_receiver.clone()
}
/// Set a new state.
///
/// This updates various other internal things depending on how the state changes.
@@ -128,6 +140,9 @@ impl Server {
trace!("Change server state from {:?} to {:?}", old, new);
// Broadcast change
let _ = self.state_watch_sender.send(new);
// Update kill at time for starting/stopping state
*self.kill_at.write().unwrap() = match new {
State::Starting if config.time.start_timeout > 0 => {
@@ -155,7 +170,7 @@ impl Server {
true
}
/// Update status as polled from the server.
/// Update status as obtained from the server.
///
/// This updates various other internal things depending on the current state and the given
/// status.
@@ -311,8 +326,12 @@ impl Server {
impl Default for Server {
fn default() -> Self {
let (state_watch_sender, state_watch_receiver) = watch::channel(State::Stopped);
Self {
state: AtomicU8::new(State::Stopped.to_u8()),
state_watch_sender,
state_watch_receiver,
pid: Default::default(),
status: Default::default(),
last_active: Default::default(),

View File

@@ -1,5 +1,6 @@
use std::ops::Deref;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;
use crate::server::State;
use bytes::BytesMut;
@@ -20,9 +21,6 @@ use crate::proto::{self, Client, ClientState, RawPacket};
use crate::server::{self, Server};
use crate::service;
/// Client holding server state poll interval.
const HOLD_POLL_INTERVAL: Duration = Duration::from_millis(500);
/// Proxy the given inbound stream to a target address.
// TODO: do not drop error here, return Box<dyn Error>
pub async fn serve(
@@ -159,54 +157,63 @@ pub async fn hold<'a>(
) -> Result<(), ()> {
trace!(target: "lazymc", "Started holding client");
// Set up polling interval, get timeout
let mut poll_interval = time::interval(HOLD_POLL_INTERVAL);
let since = Instant::now();
let timeout = config.time.hold_client_for as u64;
// A task to wait for suitable server state
// Waits for started state, errors if stopping/stopped state is reached
let task_wait = async {
let mut state = server.state_receiver();
loop {
// Wait for state change
if state.changed().await.is_err() {
break Err(());
}
loop {
// TODO: wait for start signal over channel instead of polling
poll_interval.tick().await;
trace!("Polling server state for holding client...");
match server.state() {
// Still waiting on server start
State::Starting => {
trace!(target: "lazymc", "Server not ready, holding client for longer");
// If hold timeout is reached, kick client
if since.elapsed().as_secs() >= timeout {
warn!(target: "lazymc", "Held client reached timeout of {}s, disconnecting", timeout);
kick(&config.messages.login_starting, &mut inbound.split().1).await?;
return Ok(());
match state.borrow().deref() {
// Still waiting on server start
State::Starting => {
trace!(target: "lazymc", "Server not ready, holding client for longer");
continue;
}
continue;
}
// Server started, start relaying and proxy
State::Started => {
break Ok(());
}
// Server started, start relaying and proxy
State::Started => {
// TODO: drop client if already disconnected
// Server stopping, this shouldn't happen, kick
State::Stopping => {
warn!(target: "lazymc", "Server stopping for held client, disconnecting");
break Err(());
}
// Relay client to proxy
info!(target: "lazymc", "Server ready for held client, relaying to server");
service::server::route_proxy_queue(inbound, config, hold_queue);
return Ok(());
// Server stopped, this shouldn't happen, disconnect
State::Stopped => {
error!(target: "lazymc", "Server stopped for held client, disconnecting");
break Err(());
}
}
}
};
// Server stopping, this shouldn't happen, kick
State::Stopping => {
warn!(target: "lazymc", "Server stopping for held client, disconnecting");
kick(&config.messages.login_stopping, &mut inbound.split().1).await?;
break;
}
// Wait for server state with timeout
let timeout = Duration::from_secs(config.time.hold_client_for as u64);
match time::timeout(timeout, task_wait).await {
// Relay client to proxy
Ok(Ok(())) => {
info!(target: "lazymc", "Server ready for held client, relaying to server");
service::server::route_proxy_queue(inbound, config, hold_queue);
return Ok(());
}
// Server stopped, this shouldn't happen, disconnect
State::Stopped => {
error!(target: "lazymc", "Server stopped for held client, disconnecting");
break;
}
// Server stopping/stopped, this shouldn't happen, kick
Ok(Err(())) => {
warn!(target: "lazymc", "Server stopping for held client, disconnecting");
kick(&config.messages.login_stopping, &mut inbound.split().1).await?;
}
// Timeout reached, kick with starting message
Err(_) => {
warn!(target: "lazymc", "Held client reached timeout of {}s, disconnecting", config.time.hold_client_for);
kick(&config.messages.login_starting, &mut inbound.split().1).await?;
}
}