diff --git a/src/service/server.rs b/src/service/server.rs index 19db18e..fc9cb9f 100644 --- a/src/service/server.rs +++ b/src/service/server.rs @@ -1,7 +1,8 @@ use std::sync::Arc; +use bytes::BytesMut; use futures::FutureExt; -use tokio::net::TcpListener; +use tokio::net::{TcpListener, TcpStream}; use crate::config::Config; use crate::proto::Client; @@ -42,32 +43,64 @@ pub async fn service(config: Arc) -> Result<(), ()> { Server::start(config.clone(), server.clone(), None); } - // Proxy all incomming connections + // Route all incomming connections while let Ok((inbound, _)) = listener.accept().await { - let client = Client::default(); - - let online = server.state() == server::State::Started; - if !online { - // When server is not online, spawn a status server - let transfer = - status::serve(client, inbound, config.clone(), server.clone()).map(|r| { - if let Err(err) = r { - warn!(target: "lazymc", "Failed to serve status: {:?}", err); - } - }); - - tokio::spawn(transfer); - } else { - // When server is online, proxy all - let transfer = proxy::proxy(inbound, config.server.address).map(|r| { - if let Err(err) = r { - warn!(target: "lazymc", "Failed to proxy: {}", err); - } - }); - - tokio::spawn(transfer); - } + route(inbound, config.clone(), server.clone()); } Ok(()) } + +/// Route inbound TCP stream to correct service, spawning a new task. +#[inline] +fn route(inbound: TcpStream, config: Arc, server: Arc) { + if server.state() == server::State::Started { + route_proxy(inbound, config) + } else { + route_status(inbound, config, server) + } +} + +/// Route inbound TCP stream to status server, spawning a new task. +#[inline] +fn route_status(inbound: TcpStream, config: Arc, server: Arc) { + // When server is not online, spawn a status server + let client = Client::default(); + let service = status::serve(client, inbound, config.clone(), server.clone()).map(|r| { + if let Err(err) = r { + warn!(target: "lazymc", "Failed to serve status: {:?}", err); + } + }); + + tokio::spawn(service); +} + +/// Route inbound TCP stream to proxy, spawning a new task. +#[inline] +fn route_proxy(inbound: TcpStream, config: Arc) { + // When server is online, proxy all + let service = proxy::proxy(inbound, config.server.address).map(|r| { + if let Err(err) = r { + warn!(target: "lazymc", "Failed to proxy: {}", err); + } + }); + + tokio::spawn(service); +} + +/// Route inbound TCP stream to proxy with queued data, spawning a new task. +#[inline] +pub fn route_proxy_queue<'a>(inbound: TcpStream, config: Arc, queue: BytesMut) { + // When server is online, proxy all + let service = async move { + proxy::proxy_with_queue(inbound, config.server.address, &queue) + .map(|r| { + if let Err(err) = r { + warn!(target: "lazymc", "Failed to proxy: {}", err); + } + }) + .await + }; + + tokio::spawn(service); +} diff --git a/src/status.rs b/src/status.rs index 8bdf02e..9208c4f 100644 --- a/src/status.rs +++ b/src/status.rs @@ -1,10 +1,8 @@ use std::sync::Arc; use std::time::{Duration, Instant}; -use crate::proxy; use crate::server::State; use bytes::BytesMut; -use futures::TryFutureExt; use minecraft_protocol::data::chat::{Message, Payload}; use minecraft_protocol::data::server_status::*; use minecraft_protocol::decoder::Decoder; @@ -20,6 +18,7 @@ use tokio::time; use crate::config::*; use crate::proto::{self, Client, ClientState, RawPacket}; use crate::server::{self, Server}; +use crate::service; /// Client holding server state poll interval. const HOLD_POLL_INTERVAL: Duration = Duration::from_millis(500); @@ -111,7 +110,7 @@ pub async fn serve( hold_queue.extend(buf.split_off(0)); // Start holding - hold(inbound, config, server, &mut hold_queue).await?; + hold(inbound, config, server, hold_queue).await?; return Ok(()); } @@ -150,7 +149,7 @@ pub async fn hold<'a>( mut inbound: TcpStream, config: Arc, server: Arc, - holding: &mut BytesMut, + hold_queue: BytesMut, ) -> Result<(), ()> { trace!(target: "lazymc", "Started holding client"); @@ -186,9 +185,7 @@ pub async fn hold<'a>( // Relay client to proxy info!(target: "lazymc", "Server ready for held client, relaying to server"); - proxy::proxy_with_queue(inbound, config.server.address, &holding) - .map_err(|_| ()) - .await?; + service::server::route_proxy_queue(inbound, config, hold_queue); return Ok(()); }