Spawn new task for relayed clients that were held

This commit is contained in:
timvisee 2021-11-11 11:53:19 +01:00
parent 4d51113bd9
commit 2c5a76f92a
No known key found for this signature in database
GPG Key ID: B8DB720BC383E172
2 changed files with 62 additions and 32 deletions

View File

@ -1,7 +1,8 @@
use std::sync::Arc;
use bytes::BytesMut;
use futures::FutureExt;
use tokio::net::TcpListener;
use tokio::net::{TcpListener, TcpStream};
use crate::config::Config;
use crate::proto::Client;
@ -42,32 +43,64 @@ pub async fn service(config: Arc<Config>) -> Result<(), ()> {
Server::start(config.clone(), server.clone(), None);
}
// Proxy all incomming connections
// Route all incomming connections
while let Ok((inbound, _)) = listener.accept().await {
let client = Client::default();
route(inbound, config.clone(), server.clone());
}
let online = server.state() == server::State::Started;
if !online {
Ok(())
}
/// Route inbound TCP stream to correct service, spawning a new task.
#[inline]
fn route(inbound: TcpStream, config: Arc<Config>, server: Arc<Server>) {
if server.state() == server::State::Started {
route_proxy(inbound, config)
} else {
route_status(inbound, config, server)
}
}
/// Route inbound TCP stream to status server, spawning a new task.
#[inline]
fn route_status(inbound: TcpStream, config: Arc<Config>, server: Arc<Server>) {
// When server is not online, spawn a status server
let transfer =
status::serve(client, inbound, config.clone(), server.clone()).map(|r| {
let client = Client::default();
let service = status::serve(client, inbound, config.clone(), server.clone()).map(|r| {
if let Err(err) = r {
warn!(target: "lazymc", "Failed to serve status: {:?}", err);
}
});
tokio::spawn(transfer);
} else {
tokio::spawn(service);
}
/// Route inbound TCP stream to proxy, spawning a new task.
#[inline]
fn route_proxy(inbound: TcpStream, config: Arc<Config>) {
// When server is online, proxy all
let transfer = proxy::proxy(inbound, config.server.address).map(|r| {
let service = proxy::proxy(inbound, config.server.address).map(|r| {
if let Err(err) = r {
warn!(target: "lazymc", "Failed to proxy: {}", err);
}
});
tokio::spawn(transfer);
}
tokio::spawn(service);
}
Ok(())
/// Route inbound TCP stream to proxy with queued data, spawning a new task.
#[inline]
pub fn route_proxy_queue<'a>(inbound: TcpStream, config: Arc<Config>, queue: BytesMut) {
// When server is online, proxy all
let service = async move {
proxy::proxy_with_queue(inbound, config.server.address, &queue)
.map(|r| {
if let Err(err) = r {
warn!(target: "lazymc", "Failed to proxy: {}", err);
}
})
.await
};
tokio::spawn(service);
}

View File

@ -1,10 +1,8 @@
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::proxy;
use crate::server::State;
use bytes::BytesMut;
use futures::TryFutureExt;
use minecraft_protocol::data::chat::{Message, Payload};
use minecraft_protocol::data::server_status::*;
use minecraft_protocol::decoder::Decoder;
@ -20,6 +18,7 @@ use tokio::time;
use crate::config::*;
use crate::proto::{self, Client, ClientState, RawPacket};
use crate::server::{self, Server};
use crate::service;
/// Client holding server state poll interval.
const HOLD_POLL_INTERVAL: Duration = Duration::from_millis(500);
@ -111,7 +110,7 @@ pub async fn serve(
hold_queue.extend(buf.split_off(0));
// Start holding
hold(inbound, config, server, &mut hold_queue).await?;
hold(inbound, config, server, hold_queue).await?;
return Ok(());
}
@ -150,7 +149,7 @@ pub async fn hold<'a>(
mut inbound: TcpStream,
config: Arc<Config>,
server: Arc<Server>,
holding: &mut BytesMut,
hold_queue: BytesMut,
) -> Result<(), ()> {
trace!(target: "lazymc", "Started holding client");
@ -186,9 +185,7 @@ pub async fn hold<'a>(
// Relay client to proxy
info!(target: "lazymc", "Server ready for held client, relaying to server");
proxy::proxy_with_queue(inbound, config.server.address, &holding)
.map_err(|_| ())
.await?;
service::server::route_proxy_queue(inbound, config, hold_queue);
return Ok(());
}