Implement join method configuration and handling

This commit is contained in:
timvisee
2021-11-15 16:52:18 +01:00
parent 17ec663e15
commit e11eca1d5a
2 changed files with 61 additions and 56 deletions

@@ -81,14 +81,6 @@ pub struct Config {
#[serde(default)] #[serde(default)]
pub join: Join, pub join: Join,
/// Join kick configuration.
#[serde(default)]
pub join_kick: JoinKick,
/// Join hold configuration.
#[serde(default)]
pub join_hold: JoinHold,
/// Lockout feature. /// Lockout feature.
#[serde(default)] #[serde(default)]
pub lockout: Lockout, pub lockout: Lockout,
@@ -248,12 +240,22 @@ pub enum Method {
pub struct Join { pub struct Join {
/// Join methods. /// Join methods.
pub methods: Vec<Method>, pub methods: Vec<Method>,
/// Join kick configuration.
#[serde(default)]
pub kick: JoinKick,
/// Join hold configuration.
#[serde(default)]
pub hold: JoinHold,
} }
impl Default for Join { impl Default for Join {
fn default() -> Self { fn default() -> Self {
Self { Self {
methods: vec![Method::Hold, Method::Kick], methods: vec![Method::Hold, Method::Kick],
kick: Default::default(),
hold: Default::default(),
} }
} }
} }
@@ -286,14 +288,6 @@ pub struct JoinHold {
pub timeout: u32, pub timeout: u32,
} }
impl JoinHold {
/// Whether to hold clients.
// TODO: remove this
pub fn hold(&self) -> bool {
self.timeout > 0
}
}
impl Default for JoinHold { impl Default for JoinHold {
fn default() -> Self { fn default() -> Self {
Self { timeout: 25 } Self { timeout: 25 }

@@ -33,6 +33,7 @@ pub async fn serve(
// Incoming buffer and packet holding queue // Incoming buffer and packet holding queue
let mut buf = BytesMut::new(); let mut buf = BytesMut::new();
let may_hold = config.join.methods.contains(&Method::Hold);
let mut hold_queue = BytesMut::new(); let mut hold_queue = BytesMut::new();
loop { loop {
@@ -70,7 +71,7 @@ pub async fn serve(
client.set_state(new_state); client.set_state(new_state);
// If login handshake and holding is enabled, hold packets // If login handshake and holding is enabled, hold packets
if new_state == ClientState::Login && config.join_hold.hold() { if new_state == ClientState::Login && may_hold {
hold_queue.extend(raw); hold_queue.extend(raw);
} }
@@ -119,26 +120,49 @@ pub async fn serve(
// Start server if not starting yet // Start server if not starting yet
Server::start(config.clone(), server.clone(), username).await; Server::start(config.clone(), server.clone(), username).await;
// Hold client if enabled and starting // Use join occupy methods
if config.join_hold.hold() && server.state() == State::Starting { for method in &config.join.methods {
// Hold login packet and remaining read bytes match method {
hold_queue.extend(raw); // Hold method, hold client connection while server starts
hold_queue.extend(buf.split_off(0)); Method::Hold => {
trace!(target: "lazymc", "Using hold method to occupy joining client");
// Start holding // Server must be starting
hold(inbound, config, server, hold_queue).await?; if server.state() != State::Starting {
return Ok(()); continue;
}
// Hold login packet and remaining read bytes
hold_queue.extend(&raw);
hold_queue.extend(buf.split_off(0));
// Start holding
if hold(&config, &server).await? {
service::server::route_proxy_queue(inbound, config, hold_queue);
return Ok(());
}
}
// Kick method, immediately kick client
Method::Kick => {
trace!(target: "lazymc", "Using kick method to occupy joining client");
// Select message and kick
let msg = match server.state() {
server::State::Starting
| server::State::Stopped
| server::State::Started => &config.join.kick.starting,
server::State::Stopping => &config.join.kick.stopping,
};
kick(msg, &mut writer).await?;
break;
}
}
} }
// Select message and kick debug!(target: "lazymc", "No method left to occupy joining client, disconnecting");
let msg = match server.state() {
server::State::Starting | server::State::Stopped | server::State::Started => {
&config.join_kick.starting
}
server::State::Stopping => &config.join_kick.stopping,
};
kick(msg, &mut writer).await?;
// Done occupying client, just disconnect
break; break;
} }
@@ -160,13 +184,10 @@ pub async fn serve(
/// Hold a client while server starts. /// Hold a client while server starts.
/// ///
/// Relays client to proxy once server is ready. /// Returns holding status. `true` if client is held and it should be proxied, `false` it was held
pub async fn hold<'a>( /// but it timed out.
mut inbound: TcpStream, #[must_use]
config: Arc<Config>, pub async fn hold<'a>(config: &Config, server: &Server) -> Result<bool, ()> {
server: Arc<Server>,
hold_queue: BytesMut,
) -> Result<(), ()> {
trace!(target: "lazymc", "Started holding client"); trace!(target: "lazymc", "Started holding client");
// A task to wait for suitable server state // A task to wait for suitable server state
@@ -205,36 +226,26 @@ pub async fn hold<'a>(
}; };
// Wait for server state with timeout // Wait for server state with timeout
let timeout = Duration::from_secs(config.join_hold.timeout as u64); let timeout = Duration::from_secs(config.join.hold.timeout as u64);
match time::timeout(timeout, task_wait).await { match time::timeout(timeout, task_wait).await {
// Relay client to proxy // Relay client to proxy
Ok(true) => { Ok(true) => {
info!(target: "lazymc", "Server ready for held client, relaying to server"); info!(target: "lazymc", "Server ready for held client, relaying to server");
service::server::route_proxy_queue(inbound, config, hold_queue); return Ok(true);
return Ok(());
} }
// Server stopping/stopped, this shouldn't happen, kick // Server stopping/stopped, this shouldn't happen, kick
Ok(false) => { Ok(false) => {
warn!(target: "lazymc", "Server stopping for held client, disconnecting"); warn!(target: "lazymc", "Server stopping for held client");
kick(&config.join_kick.stopping, &mut inbound.split().1).await?; return Ok(false);
} }
// Timeout reached, kick with starting message // Timeout reached, kick with starting message
Err(_) => { Err(_) => {
warn!(target: "lazymc", "Held client reached timeout of {}s, disconnecting", config.join_hold.timeout); warn!(target: "lazymc", "Held client reached timeout of {}s", config.join.hold.timeout);
kick(&config.join_kick.starting, &mut inbound.split().1).await?; return Ok(false);
} }
} }
// Gracefully close connection
match inbound.shutdown().await {
Ok(_) => {}
Err(err) if err.kind() == io::ErrorKind::NotConnected => {}
Err(_) => return Err(()),
}
Ok(())
} }
/// Kick client with a message. /// Kick client with a message.