diff --git a/src/monitor.rs b/src/monitor.rs index d643178..01b543f 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -43,10 +43,10 @@ pub async fn monitor_server(config: Arc, server: Arc) { let status = poll_server(&config, &server, addr).await; match status { // Got status, update - Ok(Some(status)) => server.update_status(&config, Some(status)), + Ok(Some(status)) => server.update_status(&config, Some(status)).await, // Error, reset status - Err(_) => server.update_status(&config, None), + Err(_) => server.update_status(&config, None).await, // Didn't get status, but ping fallback worked, leave as-is, show warning Ok(None) => { @@ -55,13 +55,13 @@ pub async fn monitor_server(config: Arc, server: Arc) { } // Sleep server when it's bedtime - if server.should_sleep(&config) { + if server.should_sleep(&config).await { info!(target: "lazymc::montior", "Server has been idle, sleeping..."); server.stop(&config).await; } // Check whether we should force kill server - if server.should_kill() { + if server.should_kill().await { error!(target: "lazymc::montior", "Force killing server, took too long to start or stop"); if !server.force_kill().await { warn!(target: "lazymc", "Failed to force kill server"); diff --git a/src/server.rs b/src/server.rs index 3b39c35..9769eb5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,12 +1,14 @@ use std::sync::atomic::{AtomicU8, Ordering}; -use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard}; +use std::sync::Arc; use std::time::{Duration, Instant}; use futures::FutureExt; use minecraft_protocol::data::server_status::ServerStatus; use tokio::process::Command; use tokio::sync::watch; +#[cfg(feature = "rcon")] use tokio::sync::Semaphore; +use tokio::sync::{Mutex, RwLock, RwLockReadGuard}; use tokio::time; use crate::config::Config; @@ -125,8 +127,8 @@ impl Server { /// This updates various other internal things depending on how the state changes. /// /// Returns false if the state didn't change, in which case nothing happens. - fn update_state(&self, state: State, config: &Config) -> bool { - self.update_state_from(None, state, config) + async fn update_state(&self, state: State, config: &Config) -> bool { + self.update_state_from(None, state, config).await } /// Set new state, from a current state. @@ -134,7 +136,7 @@ impl Server { /// This updates various other internal things depending on how the state changes. /// /// Returns false if current state didn't match `from` or if nothing changed. - fn update_state_from(&self, from: Option, new: State, config: &Config) -> bool { + async fn update_state_from(&self, from: Option, new: State, config: &Config) -> bool { // Atomically swap state to new, return if from doesn't match let old = State::from_u8(match from { Some(from) => match self.state.compare_exchange( @@ -160,7 +162,7 @@ impl Server { let _ = self.state_watch_sender.send(new); // Update kill at time for starting/stopping state - *self.kill_at.write().unwrap() = match new { + *self.kill_at.write().await = match new { State::Starting if config.time.start_timeout > 0 => { Some(Instant::now() + Duration::from_secs(config.time.start_timeout as u64)) } @@ -179,8 +181,9 @@ impl Server { // If Starting -> Started, update active time and keep it online for configured time if old == State::Starting && new == State::Started { - self.update_last_active(); - self.keep_online_for(Some(config.time.min_online_time)); + self.update_last_active().await; + self.keep_online_for(Some(config.time.min_online_time)) + .await; } true @@ -190,14 +193,14 @@ impl Server { /// /// This updates various other internal things depending on the current state and the given /// status. - pub fn update_status(&self, config: &Config, status: Option) { + pub async fn update_status(&self, config: &Config, status: Option) { // Update state based on curren match (self.state(), &status) { (State::Stopped | State::Starting, Some(_)) => { - self.update_state(State::Started, config); + self.update_state(State::Started, config).await; } (State::Started, None) => { - self.update_state(State::Stopped, config); + self.update_state(State::Stopped, config).await; } _ => {} } @@ -206,19 +209,22 @@ impl Server { if let Some(status) = status { // Update last active time if there are online players if status.players.online > 0 { - self.update_last_active(); + self.update_last_active().await; } - self.status.write().unwrap().replace(status); + self.status.write().await.replace(status); } } /// Try to start the server. /// /// Does nothing if currently not in stopped state. - pub fn start(config: Arc, server: Arc, username: Option) -> bool { + pub async fn start(config: Arc, server: Arc, username: Option) -> bool { // Must set state from stopped to starting - if !server.update_state_from(Some(State::Stopped), State::Starting, &config) { + if !server + .update_state_from(Some(State::Stopped), State::Starting, &config) + .await + { return false; } @@ -228,11 +234,19 @@ impl Server { None => info!(target: "lazymc", "Starting server..."), } - // Invoke server command in separate task - tokio::spawn(invoke_server_cmd(config, server).map(|_| ())); + // Spawn server in new task + Self::spawn_server_task(config, server); + true } + /// Spawn the server task. + /// + /// This should not be called directly. + fn spawn_server_task(config: Arc, server: Arc) { + tokio::spawn(invoke_server_cmd(config, server).map(|_| ())); + } + /// Stop running server. /// /// This will attempt to stop the server with all available methods. @@ -246,7 +260,7 @@ impl Server { // Try to stop through signal #[cfg(unix)] - if stop_server_signal(config, self) { + if stop_server_signal(config, self).await { return true; } @@ -258,7 +272,7 @@ impl Server { /// /// This requires the server PID to be known. pub async fn force_kill(&self) -> bool { - if let Some(pid) = *self.pid.lock().unwrap() { + if let Some(pid) = *self.pid.lock().await { return os::force_kill(pid); } false @@ -267,7 +281,7 @@ impl Server { /// Decide whether the server should sleep. /// /// Always returns false if it is currently not online. - pub fn should_sleep(&self, config: &Config) -> bool { + pub async fn should_sleep(&self, config: &Config) -> bool { // Server must be online if self.state() != State::Started { return false; @@ -277,7 +291,7 @@ impl Server { let players_online = self .status .read() - .unwrap() + .await .as_ref() .map(|status| status.players.online > 0) .unwrap_or(false); @@ -290,7 +304,7 @@ impl Server { let keep_online = self .keep_online_until .read() - .unwrap() + .await .map(|i| i >= Instant::now()) .unwrap_or(false); if keep_online { @@ -299,7 +313,7 @@ impl Server { } // Last active time must have passed sleep threshold - if let Some(last_idle) = self.last_active.read().unwrap().as_ref() { + if let Some(last_idle) = self.last_active.read().await.as_ref() { return last_idle.elapsed() >= Duration::from_secs(config.time.sleep_after as u64); } @@ -307,27 +321,27 @@ impl Server { } /// Decide whether to force kill the server process. - pub fn should_kill(&self) -> bool { + pub async fn should_kill(&self) -> bool { self.kill_at .read() - .unwrap() + .await .map(|t| t <= Instant::now()) .unwrap_or(false) } /// Read last known server status. - pub fn status(&self) -> RwLockReadGuard> { - self.status.read().unwrap() + pub async fn status<'a>(&'a self) -> RwLockReadGuard<'a, Option> { + self.status.read().await } /// Update the last active time. - fn update_last_active(&self) { - self.last_active.write().unwrap().replace(Instant::now()); + async fn update_last_active(&self) { + self.last_active.write().await.replace(Instant::now()); } /// Force the server to be online for the given number of seconds. - fn keep_online_for(&self, duration: Option) { - *self.keep_online_until.write().unwrap() = duration + async fn keep_online_for(&self, duration: Option) { + *self.keep_online_until.write().await = duration .filter(|d| *d > 0) .map(|d| Instant::now() + Duration::from_secs(d as u64)); } @@ -359,7 +373,7 @@ pub async fn invoke_server_cmd( config: Arc, state: Arc, ) -> Result<(), Box> { - // Build command + // Configure command let args = shlex::split(&config.server.command).expect("invalid server command"); let mut cmd = Command::new(&args[0]); cmd.args(args.iter().skip(1)); @@ -383,7 +397,7 @@ pub async fn invoke_server_cmd( state .pid .lock() - .unwrap() + .await .replace(child.id().expect("unknown server PID")); // Wait for process to exit, handle status @@ -404,18 +418,18 @@ pub async fn invoke_server_cmd( }; // Forget server PID - state.pid.lock().unwrap().take(); + state.pid.lock().await.take(); // Give server a little more time to quit forgotten threads time::sleep(SERVER_QUIT_COOLDOWN).await; // Set server state to stopped - state.update_state(State::Stopped, &config); + state.update_state(State::Stopped, &config).await; // Restart on crash if crashed && config.server.wake_on_crash { warn!(target: "lazymc", "Server crashed, restarting..."); - Server::start(config, state, None); + Server::start(config, state, None).await; } Ok(()) @@ -439,7 +453,7 @@ async fn stop_server_rcon(config: &Config, server: &Server) -> bool { let rcon_cooled_down = server .rcon_last_stop .lock() - .unwrap() + .await .map(|t| t.elapsed() >= RCON_COOLDOWN) .unwrap_or(true); if !rcon_cooled_down { @@ -468,12 +482,8 @@ async fn stop_server_rcon(config: &Config, server: &Server) -> bool { } // Set server to stopping state, update last RCON time - server - .rcon_last_stop - .lock() - .unwrap() - .replace(Instant::now()); - server.update_state(State::Stopping, config); + server.rcon_last_stop.lock().await.replace(Instant::now()); + server.update_state(State::Stopping, config).await; drop(rcon_lock); @@ -487,9 +497,9 @@ async fn stop_server_rcon(config: &Config, server: &Server) -> bool { /// /// Only available on Unix. #[cfg(unix)] -fn stop_server_signal(config: &Config, server: &Server) -> bool { +async fn stop_server_signal(config: &Config, server: &Server) -> bool { // Grab PID - let pid = match *server.pid.lock().unwrap() { + let pid = match *server.pid.lock().await { Some(pid) => pid, None => { debug!(target: "lazymc", "Could not send stop signal to server process, PID unknown"); @@ -504,8 +514,12 @@ fn stop_server_signal(config: &Config, server: &Server) -> bool { } // Update from starting/started to stopping - server.update_state_from(Some(State::Starting), State::Stopping, config); - server.update_state_from(Some(State::Started), State::Stopping, config); + server + .update_state_from(Some(State::Starting), State::Stopping, config) + .await; + server + .update_state_from(Some(State::Started), State::Stopping, config) + .await; true } diff --git a/src/service/server.rs b/src/service/server.rs index 62bada4..408406a 100644 --- a/src/service/server.rs +++ b/src/service/server.rs @@ -44,7 +44,7 @@ pub async fn service(config: Arc) -> Result<(), ()> { // Initiate server start if config.server.wake_on_start { - Server::start(config.clone(), server.clone(), None); + Server::start(config.clone(), server.clone(), None).await; } // Route all incomming connections diff --git a/src/status.rs b/src/status.rs index 6e8dacb..c0dda5d 100644 --- a/src/status.rs +++ b/src/status.rs @@ -79,7 +79,7 @@ pub async fn serve( // Hijack server status packet if client_state == ClientState::Status && packet.id == proto::STATUS_PACKET_ID_STATUS { - let server_status = server_status(&config, &server); + let server_status = server_status(&config, &server).await; let packet = StatusResponse { server_status }; let mut data = Vec::new(); @@ -105,7 +105,7 @@ pub async fn serve( .map(|p| p.name); // Start server if not starting yet - Server::start(config.clone(), server.clone(), username); + Server::start(config.clone(), server.clone(), username).await; // Hold client if enabled and starting if config.time.hold() && server.state() == State::Starting { @@ -241,8 +241,8 @@ async fn kick(msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> { } /// Build server status object to respond to client with. -fn server_status(config: &Config, server: &Server) -> ServerStatus { - let status = server.status(); +async fn server_status(config: &Config, server: &Server) -> ServerStatus { + let status = server.status().await; // Select version and player max from last known server status let (version, max) = match status.as_ref() {