Add client holding feature

Don't disconnect client when server starts, hold them for a while, and
relay when the server is ready. This makes the start-and-connect process
much more transparent.
This commit is contained in:
timvisee 2021-11-10 20:50:49 +01:00
parent d10cf48de1
commit cbe35e5585
No known key found for this signature in database
GPG Key ID: B8DB720BC383E172
4 changed files with 188 additions and 43 deletions

View File

@ -43,6 +43,10 @@ command = "java -Xmx1G -Xms1G -jar server.jar --nogui"
# Minimum time in seconds to stay online when server is started.
#minimum_online_time = 60
# Hold client for number of seconds on connect while server starts (instead of kicking immediately).
# 0 to disable, keep below Minecraft timeout of 30 seconds.
#hold_client_for = 25
[messages]
# MOTDs, shown in server browser.
#motd_sleeping = "☠ Server is sleeping\n§2☻ Join to start it up"

View File

@ -144,6 +144,16 @@ pub struct Time {
/// Minimum time in seconds to stay online when server is started.
#[serde(default, alias = "minimum_online_time")]
pub min_online_time: u32,
/// Hold client for number of seconds while server starts, instead of kicking immediately.
pub hold_client_for: u32,
}
impl Time {
/// Whether to hold clients.
pub fn hold(&self) -> bool {
self.hold_client_for > 0
}
}
impl Default for Time {
@ -151,6 +161,7 @@ impl Default for Time {
Self {
sleep_after: 60,
min_online_time: 60,
hold_client_for: 25,
}
}
}

View File

@ -6,7 +6,18 @@ use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
/// Proxy the inbound stream to a target address.
pub async fn proxy(mut inbound: TcpStream, addr_target: SocketAddr) -> Result<(), Box<dyn Error>> {
pub async fn proxy(inbound: TcpStream, addr_target: SocketAddr) -> Result<(), Box<dyn Error>> {
proxy_with_queue(inbound, addr_target, &[]).await
}
/// Proxy the inbound stream to a target address.
///
/// Send the queue to the target server before proxying.
pub async fn proxy_with_queue(
mut inbound: TcpStream,
addr_target: SocketAddr,
queue: &[u8],
) -> Result<(), Box<dyn Error>> {
// Set up connection to server
// TODO: on connect fail, ping server and redirect to serve_status if offline
let mut outbound = TcpStream::connect(addr_target).await?;
@ -14,11 +25,17 @@ pub async fn proxy(mut inbound: TcpStream, addr_target: SocketAddr) -> Result<()
let (mut ri, mut wi) = inbound.split();
let (mut ro, mut wo) = outbound.split();
// Forward queued bytes to server once writable
if !queue.is_empty() {
wo.writable().await?;
trace!(target: "lazymc", "Relaying {} queued bytes to server", queue.len());
wo.write_all(&queue).await?;
}
let client_to_server = async {
io::copy(&mut ri, &mut wo).await?;
wo.shutdown().await
};
let server_to_client = async {
io::copy(&mut ro, &mut wi).await?;
wi.shutdown().await

View File

@ -1,6 +1,10 @@
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::proxy;
use crate::server::State;
use bytes::BytesMut;
use futures::TryFutureExt;
use minecraft_protocol::data::chat::{Message, Payload};
use minecraft_protocol::data::server_status::*;
use minecraft_protocol::decoder::Decoder;
@ -8,14 +12,18 @@ use minecraft_protocol::encoder::Encoder;
use minecraft_protocol::version::v1_14_4::handshake::Handshake;
use minecraft_protocol::version::v1_14_4::login::{LoginDisconnect, LoginStart};
use minecraft_protocol::version::v1_14_4::status::StatusResponse;
use tokio::io;
use tokio::io::AsyncWriteExt;
use tokio::io::{self, AsyncWriteExt};
use tokio::net::tcp::WriteHalf;
use tokio::net::TcpStream;
use tokio::time;
use crate::config::*;
use crate::proto::{self, Client, ClientState, RawPacket};
use crate::server::{self, Server};
/// Client holding server state poll interval.
const HOLD_POLL_INTERVAL: Duration = Duration::from_millis(500);
/// Proxy the given inbound stream to a target address.
// TODO: do not drop error here, return Box<dyn Error>
pub async fn serve(
@ -26,8 +34,9 @@ pub async fn serve(
) -> Result<(), ()> {
let (mut reader, mut writer) = inbound.split();
// Incoming buffer
// Incoming buffer and packet holding queue
let mut buf = BytesMut::new();
let mut hold_queue = BytesMut::new();
loop {
// Read packet from stream
@ -40,52 +49,33 @@ pub async fn serve(
}
};
// Hijack login start
if client.state() == ClientState::Login && packet.id == proto::LOGIN_PACKET_ID_LOGIN_START {
// Try to get login username
let username = LoginStart::decode(&mut packet.data.as_slice())
.ok()
.map(|p| p.name);
// Select message
let msg = match server.state() {
server::State::Starting | server::State::Stopped | server::State::Started => {
&config.messages.login_starting
}
server::State::Stopping => &config.messages.login_stopping,
};
let packet = LoginDisconnect {
reason: Message::new(Payload::text(msg)),
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(0, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
// Start server if not starting yet
Server::start(config, server, username);
break;
}
// Grab client state
let client_state = client.state();
// Hijack handshake
if client.state() == ClientState::Handshake && packet.id == proto::STATUS_PACKET_ID_STATUS {
match Handshake::decode(&mut packet.data.as_slice()) {
if client_state == ClientState::Handshake && packet.id == proto::STATUS_PACKET_ID_STATUS {
// Parse handshake, grab new state
let new_state = match Handshake::decode(&mut packet.data.as_slice()) {
Ok(handshake) => {
// TODO: do not panic here
client.set_state(
ClientState::from_id(handshake.next_state)
.expect("unknown next client state"),
);
ClientState::from_id(handshake.next_state).expect("unknown next client state")
}
Err(_) => break,
};
// Update client state
client.set_state(new_state);
// If login handshake and holding is enabled, hold packets
if new_state == ClientState::Login && config.time.hold() {
hold_queue.extend(raw);
}
continue;
}
// Hijack server status packet
if client.state() == ClientState::Status && packet.id == proto::STATUS_PACKET_ID_STATUS {
if client_state == ClientState::Status && packet.id == proto::STATUS_PACKET_ID_STATUS {
// Select version and player max from last known server status
let (version, max) = match server.clone_status() {
Some(status) => (status.version, status.players.max),
@ -122,18 +112,52 @@ pub async fn serve(
let response = RawPacket::new(0, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
continue;
}
// Hijack ping packet
if client.state() == ClientState::Status && packet.id == proto::STATUS_PACKET_ID_PING {
if client_state == ClientState::Status && packet.id == proto::STATUS_PACKET_ID_PING {
writer.write_all(&raw).await.map_err(|_| ())?;
continue;
}
// Hijack login start
if client_state == ClientState::Login && packet.id == proto::LOGIN_PACKET_ID_LOGIN_START {
// Try to get login username
let username = LoginStart::decode(&mut packet.data.as_slice())
.ok()
.map(|p| p.name);
// Start server if not starting yet
Server::start(config.clone(), server.clone(), username);
// Hold client if enabled and starting
if config.time.hold() && server.state() == State::Starting {
// Hold login packet and remaining read bytes
hold_queue.extend(raw);
hold_queue.extend(buf.split_off(0));
// Start holding
hold(inbound, config, server, &mut hold_queue).await?;
return Ok(());
}
// Select message and kick
let msg = match server.state() {
server::State::Starting | server::State::Stopped | server::State::Started => {
&config.messages.login_starting
}
server::State::Stopping => &config.messages.login_stopping,
};
kick(msg, &mut writer).await?;
break;
}
// Show unhandled packet warning
debug!(target: "lazymc", "Received unhandled packet:");
debug!(target: "lazymc", "- State: {:?}", client.state());
debug!(target: "lazymc", "- State: {:?}", client_state);
debug!(target: "lazymc", "- Packet ID: {}", packet.id);
}
@ -146,3 +170,92 @@ pub async fn serve(
Ok(())
}
/// Hold a client while server starts.
///
/// Relays client to proxy once server is ready.
pub async fn hold<'a>(
mut inbound: TcpStream,
config: Arc<Config>,
server: Arc<Server>,
holding: &mut BytesMut,
) -> Result<(), ()> {
trace!(target: "lazymc", "Started holding client");
// Set up polling interval, get timeout
let mut poll_interval = time::interval(HOLD_POLL_INTERVAL);
let since = Instant::now();
let timeout = config.time.hold_client_for as u64;
loop {
// TODO: do not poll, wait for started signal instead (with timeout)
poll_interval.tick().await;
trace!("Polling server state for holding client...");
match server.state() {
// Still waiting on server start
State::Starting => {
trace!(target: "lazymc", "Server not ready, holding client for longer");
// If hold timeout is reached, kick client
if since.elapsed().as_secs() >= timeout {
warn!(target: "lazymc", "Holding client reached timeout of {}s, disconnecting", timeout);
kick(&config.messages.login_starting, &mut inbound.split().1).await?;
return Ok(());
}
continue;
}
// Server started, start relaying and proxy
State::Started => {
// TODO: drop client if already disconnected
// Relay client to proxy
info!(target: "lazymc", "Server ready for held client, relaying to server");
proxy::proxy_with_queue(inbound, config.server.address, &holding)
.map_err(|_| ())
.await?;
return Ok(());
}
// Server stopping, this shouldn't happen, kick
State::Stopping => {
warn!(target: "lazymc", "Server stopping for held client, disconnecting");
kick(&config.messages.login_stopping, &mut inbound.split().1).await?;
break;
}
// Server stopped, this shouldn't happen, disconnect
State::Stopped => {
error!(target: "lazymc", "Server stopped for held client, disconnecting");
break;
}
}
}
// Gracefully close connection
match inbound.shutdown().await {
Ok(_) => {}
Err(err) if err.kind() == io::ErrorKind::NotConnected => {}
Err(_) => return Err(()),
}
Ok(())
}
/// Kick client with a message.
///
/// Should close connection afterwards.
async fn kick<'a>(msg: &str, writer: &mut WriteHalf<'a>) -> Result<(), ()> {
let packet = LoginDisconnect {
reason: Message::new(Payload::text(msg)),
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(0, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())
}