Use future tokio supported sync types throughout server handling logic

This commit is contained in:
timvisee
2021-11-15 14:41:01 +01:00
parent dabeabeff4
commit 901fb62f25
4 changed files with 69 additions and 55 deletions

View File

@@ -43,10 +43,10 @@ pub async fn monitor_server(config: Arc<Config>, server: Arc<Server>) {
let status = poll_server(&config, &server, addr).await;
match status {
// Got status, update
Ok(Some(status)) => server.update_status(&config, Some(status)),
Ok(Some(status)) => server.update_status(&config, Some(status)).await,
// Error, reset status
Err(_) => server.update_status(&config, None),
Err(_) => server.update_status(&config, None).await,
// Didn't get status, but ping fallback worked, leave as-is, show warning
Ok(None) => {
@@ -55,13 +55,13 @@ pub async fn monitor_server(config: Arc<Config>, server: Arc<Server>) {
}
// Sleep server when it's bedtime
if server.should_sleep(&config) {
if server.should_sleep(&config).await {
info!(target: "lazymc::montior", "Server has been idle, sleeping...");
server.stop(&config).await;
}
// Check whether we should force kill server
if server.should_kill() {
if server.should_kill().await {
error!(target: "lazymc::montior", "Force killing server, took too long to start or stop");
if !server.force_kill().await {
warn!(target: "lazymc", "Failed to force kill server");

View File

@@ -1,12 +1,14 @@
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
use std::sync::Arc;
use std::time::{Duration, Instant};
use futures::FutureExt;
use minecraft_protocol::data::server_status::ServerStatus;
use tokio::process::Command;
use tokio::sync::watch;
#[cfg(feature = "rcon")]
use tokio::sync::Semaphore;
use tokio::sync::{Mutex, RwLock, RwLockReadGuard};
use tokio::time;
use crate::config::Config;
@@ -125,8 +127,8 @@ impl Server {
/// This updates various other internal things depending on how the state changes.
///
/// Returns false if the state didn't change, in which case nothing happens.
fn update_state(&self, state: State, config: &Config) -> bool {
self.update_state_from(None, state, config)
async fn update_state(&self, state: State, config: &Config) -> bool {
self.update_state_from(None, state, config).await
}
/// Set new state, from a current state.
@@ -134,7 +136,7 @@ impl Server {
/// This updates various other internal things depending on how the state changes.
///
/// Returns false if current state didn't match `from` or if nothing changed.
fn update_state_from(&self, from: Option<State>, new: State, config: &Config) -> bool {
async fn update_state_from(&self, from: Option<State>, new: State, config: &Config) -> bool {
// Atomically swap state to new, return if from doesn't match
let old = State::from_u8(match from {
Some(from) => match self.state.compare_exchange(
@@ -160,7 +162,7 @@ impl Server {
let _ = self.state_watch_sender.send(new);
// Update kill at time for starting/stopping state
*self.kill_at.write().unwrap() = match new {
*self.kill_at.write().await = match new {
State::Starting if config.time.start_timeout > 0 => {
Some(Instant::now() + Duration::from_secs(config.time.start_timeout as u64))
}
@@ -179,8 +181,9 @@ impl Server {
// If Starting -> Started, update active time and keep it online for configured time
if old == State::Starting && new == State::Started {
self.update_last_active();
self.keep_online_for(Some(config.time.min_online_time));
self.update_last_active().await;
self.keep_online_for(Some(config.time.min_online_time))
.await;
}
true
@@ -190,14 +193,14 @@ impl Server {
///
/// This updates various other internal things depending on the current state and the given
/// status.
pub fn update_status(&self, config: &Config, status: Option<ServerStatus>) {
pub async fn update_status(&self, config: &Config, status: Option<ServerStatus>) {
// Update state based on curren
match (self.state(), &status) {
(State::Stopped | State::Starting, Some(_)) => {
self.update_state(State::Started, config);
self.update_state(State::Started, config).await;
}
(State::Started, None) => {
self.update_state(State::Stopped, config);
self.update_state(State::Stopped, config).await;
}
_ => {}
}
@@ -206,19 +209,22 @@ impl Server {
if let Some(status) = status {
// Update last active time if there are online players
if status.players.online > 0 {
self.update_last_active();
self.update_last_active().await;
}
self.status.write().unwrap().replace(status);
self.status.write().await.replace(status);
}
}
/// Try to start the server.
///
/// Does nothing if currently not in stopped state.
pub fn start(config: Arc<Config>, server: Arc<Server>, username: Option<String>) -> bool {
pub async fn start(config: Arc<Config>, server: Arc<Server>, username: Option<String>) -> bool {
// Must set state from stopped to starting
if !server.update_state_from(Some(State::Stopped), State::Starting, &config) {
if !server
.update_state_from(Some(State::Stopped), State::Starting, &config)
.await
{
return false;
}
@@ -228,11 +234,19 @@ impl Server {
None => info!(target: "lazymc", "Starting server..."),
}
// Invoke server command in separate task
tokio::spawn(invoke_server_cmd(config, server).map(|_| ()));
// Spawn server in new task
Self::spawn_server_task(config, server);
true
}
/// Spawn the server task.
///
/// This should not be called directly.
fn spawn_server_task(config: Arc<Config>, server: Arc<Server>) {
tokio::spawn(invoke_server_cmd(config, server).map(|_| ()));
}
/// Stop running server.
///
/// This will attempt to stop the server with all available methods.
@@ -246,7 +260,7 @@ impl Server {
// Try to stop through signal
#[cfg(unix)]
if stop_server_signal(config, self) {
if stop_server_signal(config, self).await {
return true;
}
@@ -258,7 +272,7 @@ impl Server {
///
/// This requires the server PID to be known.
pub async fn force_kill(&self) -> bool {
if let Some(pid) = *self.pid.lock().unwrap() {
if let Some(pid) = *self.pid.lock().await {
return os::force_kill(pid);
}
false
@@ -267,7 +281,7 @@ impl Server {
/// Decide whether the server should sleep.
///
/// Always returns false if it is currently not online.
pub fn should_sleep(&self, config: &Config) -> bool {
pub async fn should_sleep(&self, config: &Config) -> bool {
// Server must be online
if self.state() != State::Started {
return false;
@@ -277,7 +291,7 @@ impl Server {
let players_online = self
.status
.read()
.unwrap()
.await
.as_ref()
.map(|status| status.players.online > 0)
.unwrap_or(false);
@@ -290,7 +304,7 @@ impl Server {
let keep_online = self
.keep_online_until
.read()
.unwrap()
.await
.map(|i| i >= Instant::now())
.unwrap_or(false);
if keep_online {
@@ -299,7 +313,7 @@ impl Server {
}
// Last active time must have passed sleep threshold
if let Some(last_idle) = self.last_active.read().unwrap().as_ref() {
if let Some(last_idle) = self.last_active.read().await.as_ref() {
return last_idle.elapsed() >= Duration::from_secs(config.time.sleep_after as u64);
}
@@ -307,27 +321,27 @@ impl Server {
}
/// Decide whether to force kill the server process.
pub fn should_kill(&self) -> bool {
pub async fn should_kill(&self) -> bool {
self.kill_at
.read()
.unwrap()
.await
.map(|t| t <= Instant::now())
.unwrap_or(false)
}
/// Read last known server status.
pub fn status(&self) -> RwLockReadGuard<Option<ServerStatus>> {
self.status.read().unwrap()
pub async fn status<'a>(&'a self) -> RwLockReadGuard<'a, Option<ServerStatus>> {
self.status.read().await
}
/// Update the last active time.
fn update_last_active(&self) {
self.last_active.write().unwrap().replace(Instant::now());
async fn update_last_active(&self) {
self.last_active.write().await.replace(Instant::now());
}
/// Force the server to be online for the given number of seconds.
fn keep_online_for(&self, duration: Option<u32>) {
*self.keep_online_until.write().unwrap() = duration
async fn keep_online_for(&self, duration: Option<u32>) {
*self.keep_online_until.write().await = duration
.filter(|d| *d > 0)
.map(|d| Instant::now() + Duration::from_secs(d as u64));
}
@@ -359,7 +373,7 @@ pub async fn invoke_server_cmd(
config: Arc<Config>,
state: Arc<Server>,
) -> Result<(), Box<dyn std::error::Error>> {
// Build command
// Configure command
let args = shlex::split(&config.server.command).expect("invalid server command");
let mut cmd = Command::new(&args[0]);
cmd.args(args.iter().skip(1));
@@ -383,7 +397,7 @@ pub async fn invoke_server_cmd(
state
.pid
.lock()
.unwrap()
.await
.replace(child.id().expect("unknown server PID"));
// Wait for process to exit, handle status
@@ -404,18 +418,18 @@ pub async fn invoke_server_cmd(
};
// Forget server PID
state.pid.lock().unwrap().take();
state.pid.lock().await.take();
// Give server a little more time to quit forgotten threads
time::sleep(SERVER_QUIT_COOLDOWN).await;
// Set server state to stopped
state.update_state(State::Stopped, &config);
state.update_state(State::Stopped, &config).await;
// Restart on crash
if crashed && config.server.wake_on_crash {
warn!(target: "lazymc", "Server crashed, restarting...");
Server::start(config, state, None);
Server::start(config, state, None).await;
}
Ok(())
@@ -439,7 +453,7 @@ async fn stop_server_rcon(config: &Config, server: &Server) -> bool {
let rcon_cooled_down = server
.rcon_last_stop
.lock()
.unwrap()
.await
.map(|t| t.elapsed() >= RCON_COOLDOWN)
.unwrap_or(true);
if !rcon_cooled_down {
@@ -468,12 +482,8 @@ async fn stop_server_rcon(config: &Config, server: &Server) -> bool {
}
// Set server to stopping state, update last RCON time
server
.rcon_last_stop
.lock()
.unwrap()
.replace(Instant::now());
server.update_state(State::Stopping, config);
server.rcon_last_stop.lock().await.replace(Instant::now());
server.update_state(State::Stopping, config).await;
drop(rcon_lock);
@@ -487,9 +497,9 @@ async fn stop_server_rcon(config: &Config, server: &Server) -> bool {
///
/// Only available on Unix.
#[cfg(unix)]
fn stop_server_signal(config: &Config, server: &Server) -> bool {
async fn stop_server_signal(config: &Config, server: &Server) -> bool {
// Grab PID
let pid = match *server.pid.lock().unwrap() {
let pid = match *server.pid.lock().await {
Some(pid) => pid,
None => {
debug!(target: "lazymc", "Could not send stop signal to server process, PID unknown");
@@ -504,8 +514,12 @@ fn stop_server_signal(config: &Config, server: &Server) -> bool {
}
// Update from starting/started to stopping
server.update_state_from(Some(State::Starting), State::Stopping, config);
server.update_state_from(Some(State::Started), State::Stopping, config);
server
.update_state_from(Some(State::Starting), State::Stopping, config)
.await;
server
.update_state_from(Some(State::Started), State::Stopping, config)
.await;
true
}

View File

@@ -44,7 +44,7 @@ pub async fn service(config: Arc<Config>) -> Result<(), ()> {
// Initiate server start
if config.server.wake_on_start {
Server::start(config.clone(), server.clone(), None);
Server::start(config.clone(), server.clone(), None).await;
}
// Route all incomming connections

View File

@@ -79,7 +79,7 @@ pub async fn serve(
// Hijack server status packet
if client_state == ClientState::Status && packet.id == proto::STATUS_PACKET_ID_STATUS {
let server_status = server_status(&config, &server);
let server_status = server_status(&config, &server).await;
let packet = StatusResponse { server_status };
let mut data = Vec::new();
@@ -105,7 +105,7 @@ pub async fn serve(
.map(|p| p.name);
// Start server if not starting yet
Server::start(config.clone(), server.clone(), username);
Server::start(config.clone(), server.clone(), username).await;
// Hold client if enabled and starting
if config.time.hold() && server.state() == State::Starting {
@@ -241,8 +241,8 @@ async fn kick(msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> {
}
/// Build server status object to respond to client with.
fn server_status(config: &Config, server: &Server) -> ServerStatus {
let status = server.status();
async fn server_status(config: &Config, server: &Server) -> ServerStatus {
let status = server.status().await;
// Select version and player max from last known server status
let (version, max) = match status.as_ref() {