Refactor, cleanup status logic, extract join occupy logic into modules

This commit is contained in:
timvisee 2021-11-16 17:05:44 +01:00
parent 4510586169
commit b06f26b3e8
No known key found for this signature in database
GPG Key ID: B8DB720BC383E172
17 changed files with 656 additions and 451 deletions

30
src/join/forward.rs Normal file
View File

@ -0,0 +1,30 @@
use std::sync::Arc;
use bytes::BytesMut;
use tokio::net::TcpStream;
use crate::config::*;
use crate::service;
use super::MethodResult;
/// Forward the client.
pub async fn occupy(
config: Arc<Config>,
inbound: TcpStream,
inbound_history: &mut BytesMut,
) -> Result<MethodResult, ()> {
trace!(target: "lazymc", "Using forward method to occupy joining client");
debug!(target: "lazymc", "Forwarding client to {:?}!", config.join.forward.address);
service::server::route_proxy_address_queue(
inbound,
config.join.forward.address,
inbound_history.clone(),
);
// TODO: do not consume, continue on proxy connect failure
Ok(MethodResult::Consumed)
}

101
src/join/hold.rs Normal file
View File

@ -0,0 +1,101 @@
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use bytes::BytesMut;
use tokio::net::TcpStream;
use tokio::time;
use crate::config::*;
use crate::server::{Server, State};
use crate::service;
use super::MethodResult;
/// Hold the client.
pub async fn occupy(
config: Arc<Config>,
server: Arc<Server>,
inbound: TcpStream,
inbound_history: &mut BytesMut,
) -> Result<MethodResult, ()> {
trace!(target: "lazymc", "Using hold method to occupy joining client");
// Server must be starting
if server.state() != State::Starting {
return Ok(MethodResult::Continue(inbound));
}
// Start holding, consume client
if hold(&config, &server).await? {
service::server::route_proxy_queue(inbound, config, inbound_history.clone());
return Ok(MethodResult::Consumed);
}
Ok(MethodResult::Continue(inbound))
}
/// Hold a client while server starts.
///
/// Returns holding status. `true` if client is held and it should be proxied, `false` it was held
/// but it timed out.
async fn hold<'a>(config: &Config, server: &Server) -> Result<bool, ()> {
trace!(target: "lazymc", "Started holding client");
// A task to wait for suitable server state
// Waits for started state, errors if stopping/stopped state is reached
let task_wait = async {
let mut state = server.state_receiver();
loop {
// Wait for state change
state.changed().await.unwrap();
match state.borrow().deref() {
// Still waiting on server start
State::Starting => {
trace!(target: "lazymc", "Server not ready, holding client for longer");
continue;
}
// Server started, start relaying and proxy
State::Started => {
break true;
}
// Server stopping, this shouldn't happen, kick
State::Stopping => {
warn!(target: "lazymc", "Server stopping for held client, disconnecting");
break false;
}
// Server stopped, this shouldn't happen, disconnect
State::Stopped => {
error!(target: "lazymc", "Server stopped for held client, disconnecting");
break false;
}
}
}
};
// Wait for server state with timeout
let timeout = Duration::from_secs(config.join.hold.timeout as u64);
match time::timeout(timeout, task_wait).await {
// Relay client to proxy
Ok(true) => {
info!(target: "lazymc", "Server ready for held client, relaying to server");
Ok(true)
}
// Server stopping/stopped, this shouldn't happen, kick
Ok(false) => {
warn!(target: "lazymc", "Server stopping for held client");
Ok(false)
}
// Timeout reached, kick with starting message
Err(_) => {
warn!(target: "lazymc", "Held client reached timeout of {}s", config.join.hold.timeout);
Ok(false)
}
}
}

33
src/join/kick.rs Normal file
View File

@ -0,0 +1,33 @@
use tokio::net::TcpStream;
use crate::config::*;
use crate::net;
use crate::proto::action;
use crate::proto::client::Client;
use crate::server::{self, Server};
use super::MethodResult;
/// Kick the client.
pub async fn occupy(
client: &Client,
config: &Config,
server: &Server,
mut inbound: TcpStream,
) -> Result<MethodResult, ()> {
trace!(target: "lazymc", "Using kick method to occupy joining client");
// Select message and kick
let msg = match server.state() {
server::State::Starting | server::State::Stopped | server::State::Started => {
&config.join.kick.starting
}
server::State::Stopping => &config.join.kick.stopping,
};
action::kick(client, msg, &mut inbound.split().1).await?;
// Gracefully close connection
net::close_tcp_stream(inbound).await.map_err(|_| ())?;
Ok(MethodResult::Consumed)
}

30
src/join/lobby.rs Normal file
View File

@ -0,0 +1,30 @@
use std::sync::Arc;
use bytes::BytesMut;
use tokio::net::TcpStream;
use crate::config::*;
use crate::lobby;
use crate::proto::client::{Client, ClientInfo};
use crate::server::Server;
use super::MethodResult;
/// Lobby the client.
pub async fn occupy(
client: &Client,
client_info: ClientInfo,
config: Arc<Config>,
server: Arc<Server>,
inbound: TcpStream,
inbound_queue: BytesMut,
) -> Result<MethodResult, ()> {
trace!(target: "lazymc", "Using lobby method to occupy joining client");
// Start lobby
lobby::serve(client, client_info, inbound, config, server, inbound_queue).await?;
// TODO: do not consume client here, allow other join method on fail
Ok(MethodResult::Consumed)
}

106
src/join/mod.rs Normal file
View File

@ -0,0 +1,106 @@
use std::sync::Arc;
use bytes::BytesMut;
use tokio::net::TcpStream;
use crate::config::*;
use crate::net;
use crate::proto::client::{Client, ClientInfo, ClientState};
use crate::server::Server;
pub mod forward;
pub mod hold;
pub mod kick;
#[cfg(feature = "lobby")]
pub mod lobby;
/// A result returned by a join occupy method.
pub enum MethodResult {
/// Client is consumed.
Consumed,
/// Method is done, continue with the next.
Continue(TcpStream),
}
/// Start occupying client.
///
/// This assumes the login start packet has just been received.
pub async fn occupy(
client: Client,
#[allow(unused_variables)] client_info: ClientInfo,
config: Arc<Config>,
server: Arc<Server>,
mut inbound: TcpStream,
mut inbound_history: BytesMut,
#[allow(unused_variables)] login_queue: BytesMut,
) -> Result<(), ()> {
// Assert state is correct
assert_eq!(
client.state(),
ClientState::Login,
"when occupying client, it should be in login state"
);
// Go through all configured join methods
for method in &config.join.methods {
// Invoke method, take result
let result = match method {
// Kick method, immediately kick client
Method::Kick => kick::occupy(&client, &config, &server, inbound).await?,
// Hold method, hold client connection while server starts
Method::Hold => {
hold::occupy(
config.clone(),
server.clone(),
inbound,
&mut inbound_history,
)
.await?
}
// Forward method, forward client connection while server starts
Method::Forward => {
forward::occupy(config.clone(), inbound, &mut inbound_history).await?
}
// Lobby method, keep client in lobby while server starts
#[cfg(feature = "lobby")]
Method::Lobby => {
lobby::occupy(
&client,
client_info.clone(),
config.clone(),
server.clone(),
inbound,
login_queue.clone(),
)
.await?
}
// Lobby method, keep client in lobby while server starts
#[cfg(not(feature = "lobby"))]
Method::Lobby => {
error!(target: "lazymc", "Lobby join method not supported in this lazymc build");
MethodResult::Continue(inbound)
}
};
// Handle method result
match result {
MethodResult::Consumed => return Ok(()),
MethodResult::Continue(stream) => {
inbound = stream;
continue;
}
}
}
debug!(target: "lazymc", "No method left to occupy joining client, disconnecting");
// Gracefully close connection
net::close_tcp_stream(inbound).await.map_err(|_| ())?;
Ok(())
}

View File

@ -16,7 +16,7 @@ use minecraft_protocol::version::v1_17_1::game::{
Respawn, SetTitleSubtitle, SetTitleText, SetTitleTimes, TimeUpdate,
};
use nbt::CompoundTag;
use tokio::io::{self, AsyncWriteExt};
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use tokio::select;
@ -25,7 +25,11 @@ use uuid::Uuid;
use crate::config::*;
use crate::mc;
use crate::proto::{self, Client, ClientInfo, ClientState, RawPacket};
use crate::net;
use crate::proto;
use crate::proto::client::{Client, ClientInfo, ClientState};
use crate::proto::packet::{self, RawPacket};
use crate::proto::packets;
use crate::proxy;
use crate::server::{Server, State};
@ -62,7 +66,7 @@ const SERVER_BRAND: &[u8] = b"lazymc";
// TODO: do not drop error here, return Box<dyn Error>
// TODO: on error, nicely kick client with message
pub async fn serve(
client: Client,
client: &Client,
client_info: ClientInfo,
mut inbound: TcpStream,
config: Arc<Config>,
@ -88,7 +92,7 @@ pub async fn serve(
loop {
// Read packet from stream
let (packet, _raw) = match proto::read_packet(&client, &mut inbound_buf, &mut reader).await
let (packet, _raw) = match packet::read_packet(client, &mut inbound_buf, &mut reader).await
{
Ok(Some(packet)) => packet,
Ok(None) => break,
@ -102,9 +106,7 @@ pub async fn serve(
let client_state = client.state();
// Hijack login start
if client_state == ClientState::Login
&& packet.id == proto::packets::login::SERVER_LOGIN_START
{
if client_state == ClientState::Login && packet.id == packets::login::SERVER_LOGIN_START {
// Parse login start packet
let login_start = LoginStart::decode(&mut packet.data.as_slice()).map_err(|_| ())?;
@ -113,21 +115,21 @@ pub async fn serve(
// Respond with set compression if compression is enabled based on threshold
if proto::COMPRESSION_THRESHOLD >= 0 {
trace!(target: "lazymc::lobby", "Enabling compression for lobby client because server has it enabled (threshold: {})", proto::COMPRESSION_THRESHOLD);
respond_set_compression(&client, &mut writer, proto::COMPRESSION_THRESHOLD).await?;
respond_set_compression(client, &mut writer, proto::COMPRESSION_THRESHOLD).await?;
client.set_compression(proto::COMPRESSION_THRESHOLD);
}
// Respond with login success, switch to play state
respond_login_success(&client, &mut writer, &login_start).await?;
respond_login_success(client, &mut writer, &login_start).await?;
client.set_state(ClientState::Play);
trace!(target: "lazymc::lobby", "Client login success, sending required play packets for lobby world");
// Send packets to client required to get into workable play state for lobby world
send_lobby_play_packets(&client, &mut writer, &server).await?;
send_lobby_play_packets(client, &mut writer, &server).await?;
// Wait for server to come online, then set up new connection to it
stage_wait(&client, &server, &config, &mut writer).await?;
stage_wait(client, &server, &config, &mut writer).await?;
let (server_client, mut outbound, mut server_buf) =
connect_to_server(client_info, &config).await?;
@ -136,10 +138,10 @@ pub async fn serve(
wait_for_server_join_game(&server_client, &mut outbound, &mut server_buf).await?;
// Reset lobby title
send_lobby_title(&client, &mut writer, "").await?;
send_lobby_title(client, &mut writer, "").await?;
// Play ready sound if configured
play_lobby_ready_sound(&client, &mut writer, &config).await?;
play_lobby_ready_sound(client, &mut writer, &config).await?;
// Wait a second because Notchian servers are slow
// See: https://wiki.vg/Protocol#Login_Success
@ -147,7 +149,7 @@ pub async fn serve(
time::sleep(SERVER_WARMUP).await;
// Send respawn packet, initiates teleport to real server world
send_respawn_from_join(&client, &mut writer, join_game).await?;
send_respawn_from_join(client, &mut writer, join_game).await?;
// Drain inbound connection so we don't confuse the server
// TODO: can we void everything? we might need to forward everything to server except
@ -172,11 +174,7 @@ pub async fn serve(
}
// Gracefully close connection
match writer.shutdown().await {
Ok(_) => {}
Err(err) if err.kind() == io::ErrorKind::NotConnected => {}
Err(_) => return Err(()),
}
net::close_tcp_stream(inbound).await.map_err(|_| ())?;
Ok(())
}
@ -192,8 +190,7 @@ async fn respond_set_compression(
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response =
RawPacket::new(proto::packets::login::CLIENT_SET_COMPRESSION, data).encode(client)?;
let response = RawPacket::new(packets::login::CLIENT_SET_COMPRESSION, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
@ -217,8 +214,7 @@ async fn respond_login_success(
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response =
RawPacket::new(proto::packets::login::CLIENT_LOGIN_SUCCESS, data).encode(client)?;
let response = RawPacket::new(packets::login::CLIENT_LOGIN_SUCCESS, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
@ -310,7 +306,7 @@ async fn send_lobby_join_game(
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_JOIN_GAME, data).encode(client)?;
let response = RawPacket::new(packets::play::CLIENT_JOIN_GAME, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
@ -326,8 +322,7 @@ async fn send_lobby_brand(client: &Client, writer: &mut WriteHalf<'_>) -> Result
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response =
RawPacket::new(proto::packets::play::CLIENT_PLUGIN_MESSAGE, data).encode(client)?;
let response = RawPacket::new(packets::play::CLIENT_PLUGIN_MESSAGE, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
@ -350,8 +345,7 @@ async fn send_lobby_player_pos(client: &Client, writer: &mut WriteHalf<'_>) -> R
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response =
RawPacket::new(proto::packets::play::CLIENT_PLAYER_POS_LOOK, data).encode(client)?;
let response = RawPacket::new(packets::play::CLIENT_PLAYER_POS_LOOK, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
@ -370,7 +364,7 @@ async fn send_lobby_time_update(client: &Client, writer: &mut WriteHalf<'_>) ->
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_TIME_UPDATE, data).encode(client)?;
let response = RawPacket::new(packets::play::CLIENT_TIME_UPDATE, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
@ -388,7 +382,7 @@ async fn send_keep_alive(client: &Client, writer: &mut WriteHalf<'_>) -> Result<
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_KEEP_ALIVE, data).encode(client)?;
let response = RawPacket::new(packets::play::CLIENT_KEEP_ALIVE, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
// TODO: verify we receive keep alive response with same ID from client
@ -418,8 +412,7 @@ async fn send_lobby_title(
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response =
RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_TEXT, data).encode(client)?;
let response = RawPacket::new(packets::play::CLIENT_SET_TITLE_TEXT, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
// Set subtitle
@ -430,8 +423,7 @@ async fn send_lobby_title(
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response =
RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_SUBTITLE, data).encode(client)?;
let response = RawPacket::new(packets::play::CLIENT_SET_TITLE_SUBTITLE, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
// Set title times
@ -453,8 +445,7 @@ async fn send_lobby_title(
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response =
RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_TIMES, data).encode(client)?;
let response = RawPacket::new(packets::play::CLIENT_SET_TITLE_TIMES, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
@ -479,8 +470,7 @@ async fn send_lobby_sound_effect(
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response =
RawPacket::new(proto::packets::play::CLIENT_NAMED_SOUND_EFFECT, data).encode(client)?;
let response = RawPacket::new(packets::play::CLIENT_NAMED_SOUND_EFFECT, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
@ -508,7 +498,7 @@ async fn send_respawn_from_join(
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_RESPAWN, data).encode(client)?;
let response = RawPacket::new(packets::play::CLIENT_RESPAWN, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
@ -654,8 +644,7 @@ async fn connect_to_server_no_timeout(
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let request =
RawPacket::new(proto::packets::handshake::SERVER_HANDSHAKE, data).encode(&tmp_client)?;
let request = RawPacket::new(packets::handshake::SERVER_HANDSHAKE, data).encode(&tmp_client)?;
writer.write_all(&request).await.map_err(|_| ())?;
// Request login start
@ -666,8 +655,7 @@ async fn connect_to_server_no_timeout(
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let request =
RawPacket::new(proto::packets::login::SERVER_LOGIN_START, data).encode(&tmp_client)?;
let request = RawPacket::new(packets::login::SERVER_LOGIN_START, data).encode(&tmp_client)?;
writer.write_all(&request).await.map_err(|_| ())?;
// Incoming buffer
@ -675,7 +663,7 @@ async fn connect_to_server_no_timeout(
loop {
// Read packet from stream
let (packet, _raw) = match proto::read_packet(&tmp_client, &mut buf, &mut reader).await {
let (packet, _raw) = match packet::read_packet(&tmp_client, &mut buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => {
@ -688,8 +676,7 @@ async fn connect_to_server_no_timeout(
let client_state = tmp_client.state();
// Catch set compression
if client_state == ClientState::Login
&& packet.id == proto::packets::login::CLIENT_SET_COMPRESSION
if client_state == ClientState::Login && packet.id == packets::login::CLIENT_SET_COMPRESSION
{
// Decode compression packet
let set_compression =
@ -713,9 +700,7 @@ async fn connect_to_server_no_timeout(
}
// Hijack login success
if client_state == ClientState::Login
&& packet.id == proto::packets::login::CLIENT_LOGIN_SUCCESS
{
if client_state == ClientState::Login && packet.id == packets::login::CLIENT_LOGIN_SUCCESS {
trace!(target: "lazymc::lobby", "Received login success from server connection, change to play mode");
// TODO: parse this packet to ensure it's fine
@ -743,11 +728,7 @@ async fn connect_to_server_no_timeout(
}
// Gracefully close connection
match writer.shutdown().await {
Ok(_) => {}
Err(err) if err.kind() == io::ErrorKind::NotConnected => {}
Err(_) => return Err(()),
}
net::close_tcp_stream(outbound).await.map_err(|_| ())?;
Err(())
}
@ -780,11 +761,11 @@ async fn wait_for_server_join_game_no_timeout(
outbound: &mut TcpStream,
buf: &mut BytesMut,
) -> Result<JoinGame, ()> {
let (mut reader, mut writer) = outbound.split();
let (mut reader, mut _writer) = outbound.split();
loop {
// Read packet from stream
let (packet, _raw) = match proto::read_packet(client, buf, &mut reader).await {
let (packet, _raw) = match packet::read_packet(client, buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => {
@ -794,7 +775,7 @@ async fn wait_for_server_join_game_no_timeout(
};
// Catch join game
if packet.id == proto::packets::play::CLIENT_JOIN_GAME {
if packet.id == packets::play::CLIENT_JOIN_GAME {
let join_game = JoinGame::decode(&mut packet.data.as_slice()).map_err(|err| {
dbg!(err);
})?;
@ -808,11 +789,7 @@ async fn wait_for_server_join_game_no_timeout(
}
// Gracefully close connection
match writer.shutdown().await {
Ok(_) => {}
Err(err) if err.kind() == io::ErrorKind::NotConnected => {}
Err(_) => return Err(()),
}
net::close_tcp_stream_ref(outbound).await.map_err(|_| ())?;
Err(())
}

View File

@ -10,10 +10,12 @@ extern crate log;
pub(crate) mod action;
pub(crate) mod cli;
pub(crate) mod config;
pub(crate) mod join;
#[cfg(feature = "lobby")]
pub(crate) mod lobby;
pub(crate) mod mc;
pub(crate) mod monitor;
pub(crate) mod net;
pub(crate) mod os;
pub(crate) mod proto;
pub(crate) mod proxy;

View File

@ -16,7 +16,9 @@ use tokio::net::TcpStream;
use tokio::time;
use crate::config::Config;
use crate::proto::{self, Client, ClientState, RawPacket};
use crate::proto::client::{Client, ClientState};
use crate::proto::packet::{self, RawPacket};
use crate::proto::packets;
use crate::server::{Server, State};
/// Monitor ping inverval in seconds.
@ -133,7 +135,7 @@ async fn send_handshake(
let mut packet = Vec::new();
handshake.encode(&mut packet).map_err(|_| ())?;
let raw = RawPacket::new(proto::packets::handshake::SERVER_HANDSHAKE, packet)
let raw = RawPacket::new(packets::handshake::SERVER_HANDSHAKE, packet)
.encode(client)
.map_err(|_| ())?;
stream.write_all(&raw).await.map_err(|_| ())?;
@ -143,7 +145,7 @@ async fn send_handshake(
/// Send status request.
async fn request_status(client: &Client, stream: &mut TcpStream) -> Result<(), ()> {
let raw = RawPacket::new(proto::packets::status::SERVER_STATUS, vec![])
let raw = RawPacket::new(packets::status::SERVER_STATUS, vec![])
.encode(client)
.map_err(|_| ())?;
stream.write_all(&raw).await.map_err(|_| ())?;
@ -158,7 +160,7 @@ async fn send_ping(client: &Client, stream: &mut TcpStream) -> Result<u64, ()> {
let mut packet = Vec::new();
ping.encode(&mut packet).map_err(|_| ())?;
let raw = RawPacket::new(proto::packets::status::SERVER_PING, packet)
let raw = RawPacket::new(packets::status::SERVER_PING, packet)
.encode(client)
.map_err(|_| ())?;
stream.write_all(&raw).await.map_err(|_| ())?;
@ -173,14 +175,14 @@ async fn wait_for_status(client: &Client, stream: &mut TcpStream) -> Result<Serv
loop {
// Read packet from stream
let (packet, _raw) = match proto::read_packet(client, &mut buf, &mut reader).await {
let (packet, _raw) = match packet::read_packet(client, &mut buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => continue,
};
// Catch status response
if packet.id == proto::packets::status::CLIENT_STATUS {
if packet.id == packets::status::CLIENT_STATUS {
let status = StatusResponse::decode(&mut packet.data.as_slice()).map_err(|_| ())?;
return Ok(status.server_status);
}
@ -209,14 +211,14 @@ async fn wait_for_ping(client: &Client, stream: &mut TcpStream, token: u64) -> R
loop {
// Read packet from stream
let (packet, _raw) = match proto::read_packet(client, &mut buf, &mut reader).await {
let (packet, _raw) = match packet::read_packet(client, &mut buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => continue,
};
// Catch ping response
if packet.id == proto::packets::status::CLIENT_PING {
if packet.id == packets::status::CLIENT_PING {
let ping = PingResponse::decode(&mut packet.data.as_slice()).map_err(|_| ())?;
// Ping token must match

22
src/net.rs Normal file
View File

@ -0,0 +1,22 @@
use std::error::Error;
use std::io;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
/// Gracefully close given TCP stream.
///
/// Intended as helper to make code less messy. This also succeeds if already closed.
pub async fn close_tcp_stream(mut stream: TcpStream) -> Result<(), Box<dyn Error>> {
close_tcp_stream_ref(&mut stream).await
}
/// Gracefully close given TCP stream.
///
/// Intended as helper to make code less messy. This also succeeds if already closed.
pub async fn close_tcp_stream_ref(stream: &mut TcpStream) -> Result<(), Box<dyn Error>> {
match stream.shutdown().await {
Ok(_) => Ok(()),
Err(err) if err.kind() == io::ErrorKind::NotConnected => Ok(()),
Err(err) => Err(err.into()),
}
}

51
src/proto/action.rs Normal file
View File

@ -0,0 +1,51 @@
use minecraft_protocol::data::chat::{Message, Payload};
use minecraft_protocol::encoder::Encoder;
use minecraft_protocol::version::v1_14_4::game::GameDisconnect;
use minecraft_protocol::version::v1_14_4::login::LoginDisconnect;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::WriteHalf;
use crate::proto::client::{Client, ClientState};
use crate::proto::packet::RawPacket;
use crate::proto::packets;
/// Kick client with a message.
///
/// Should close connection afterwards.
pub async fn kick(client: &Client, msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> {
match client.state() {
ClientState::Login => login_kick(client, msg, writer).await,
ClientState::Play => play_kick(client, msg, writer).await,
_ => Err(()),
}
}
/// Kick client with a message in login state.
///
/// Should close connection afterwards.
async fn login_kick(client: &Client, msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> {
let packet = LoginDisconnect {
reason: Message::new(Payload::text(msg)),
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(packets::login::CLIENT_DISCONNECT, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())
}
/// Kick client with a message in play state.
///
/// Should close connection afterwards.
async fn play_kick(client: &Client, msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> {
let packet = GameDisconnect {
reason: Message::new(Payload::text(msg)),
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(packets::play::CLIENT_DISCONNECT, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())
}

118
src/proto/client.rs Normal file
View File

@ -0,0 +1,118 @@
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Mutex;
/// Client state.
///
/// Note: this does not keep track of encryption states.
#[derive(Debug)]
pub struct Client {
/// Current client state.
pub state: Mutex<ClientState>,
/// Compression state.
///
/// 0 or positive if enabled, negative if disabled.
pub compression: AtomicI32,
}
impl Client {
/// Get client state.
pub fn state(&self) -> ClientState {
*self.state.lock().unwrap()
}
/// Set client state.
pub fn set_state(&self, state: ClientState) {
*self.state.lock().unwrap() = state;
}
/// Get compression threshold.
pub fn compressed(&self) -> i32 {
self.compression.load(Ordering::Relaxed)
}
/// Whether compression is used.
pub fn is_compressed(&self) -> bool {
self.compressed() >= 0
}
/// Set compression value.
#[allow(unused)]
pub fn set_compression(&self, threshold: i32) {
trace!(target: "lazymc", "Client now uses compression threshold of {}", threshold);
self.compression.store(threshold, Ordering::Relaxed);
}
}
impl Default for Client {
fn default() -> Self {
Self {
state: Default::default(),
compression: AtomicI32::new(-1),
}
}
}
/// Protocol state a client may be in.
///
/// Note: this does not include the `play` state, because this is never used anymore when a client
/// reaches this state.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ClientState {
/// Initial client state.
Handshake,
/// State to query server status.
Status,
/// State to login to server.
Login,
/// State to play on the server.
#[allow(unused)]
Play,
}
impl ClientState {
/// From state ID.
pub fn from_id(id: i32) -> Option<Self> {
match id {
0 => Some(Self::Handshake),
1 => Some(Self::Status),
2 => Some(Self::Login),
_ => None,
}
}
/// Get state ID.
pub fn to_id(self) -> i32 {
match self {
Self::Handshake => 0,
Self::Status => 1,
Self::Login => 2,
Self::Play => -1,
}
}
}
impl Default for ClientState {
fn default() -> Self {
Self::Handshake
}
}
/// Client info, useful during connection handling.
#[derive(Debug, Clone, Default)]
pub struct ClientInfo {
/// Client protocol version.
pub protocol_version: Option<i32>,
/// Client username.
pub username: Option<String>,
}
impl ClientInfo {
pub fn empty() -> Self {
Self::default()
}
}

27
src/proto/mod.rs Normal file
View File

@ -0,0 +1,27 @@
pub mod action;
pub mod client;
pub mod packet;
pub mod packets;
/// Default minecraft protocol version name.
///
/// Just something to default to when real server version isn't known or when no hint is specified
/// in the configuration.
///
/// Should be kept up-to-date with latest supported Minecraft version by lazymc.
pub const PROTO_DEFAULT_VERSION: &str = "1.17.1";
/// Default minecraft protocol version.
///
/// Just something to default to when real server version isn't known or when no hint is specified
/// in the configuration.
///
/// Should be kept up-to-date with latest supported Minecraft version by lazymc.
pub const PROTO_DEFAULT_PROTOCOL: u32 = 756;
/// Compression threshold to use.
// TODO: read this from server.properties instead
pub const COMPRESSION_THRESHOLD: i32 = 256;
/// Default buffer size when reading packets.
pub(super) const BUF_SIZE: usize = 8 * 1024;

View File

@ -1,6 +1,4 @@
use std::io::prelude::*;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Mutex;
use bytes::BytesMut;
use flate2::read::ZlibDecoder;
@ -10,189 +8,10 @@ use tokio::io;
use tokio::io::AsyncReadExt;
use tokio::net::tcp::ReadHalf;
use crate::proto::client::Client;
use crate::proto::BUF_SIZE;
use crate::types;
/// Default minecraft protocol version name.
///
/// Just something to default to when real server version isn't known or when no hint is specified
/// in the configuration.
///
/// Should be kept up-to-date with latest supported Minecraft version by lazymc.
pub const PROTO_DEFAULT_VERSION: &str = "1.17.1";
/// Default minecraft protocol version.
///
/// Just something to default to when real server version isn't known or when no hint is specified
/// in the configuration.
///
/// Should be kept up-to-date with latest supported Minecraft version by lazymc.
pub const PROTO_DEFAULT_PROTOCOL: u32 = 756;
/// Compression threshold to use.
// TODO: read this from server.properties instead
pub const COMPRESSION_THRESHOLD: i32 = 256;
/// Default buffer size when reading packets.
const BUF_SIZE: usize = 8 * 1024;
/// Minecraft protocol packet IDs.
#[allow(unused)]
pub mod packets {
pub mod handshake {
pub const SERVER_HANDSHAKE: i32 = 0;
}
pub mod status {
pub const CLIENT_STATUS: i32 = 0;
pub const CLIENT_PING: i32 = 1;
pub const SERVER_STATUS: i32 = 0;
pub const SERVER_PING: i32 = 1;
}
pub mod login {
pub const CLIENT_DISCONNECT: i32 = 0x00;
pub const CLIENT_LOGIN_SUCCESS: i32 = 0x02;
pub const CLIENT_SET_COMPRESSION: i32 = 0x03;
pub const SERVER_LOGIN_START: i32 = 0x00;
}
pub mod play {
pub const CLIENT_CHAT_MSG: i32 = 0x0F;
pub const CLIENT_PLUGIN_MESSAGE: i32 = 0x18;
pub const CLIENT_NAMED_SOUND_EFFECT: i32 = 0x19;
pub const CLIENT_DISCONNECT: i32 = 0x1A;
pub const CLIENT_KEEP_ALIVE: i32 = 0x21;
pub const CLIENT_JOIN_GAME: i32 = 0x26;
pub const CLIENT_PLAYER_POS_LOOK: i32 = 0x38;
pub const CLIENT_RESPAWN: i32 = 0x3D;
pub const CLIENT_SPAWN_POS: i32 = 0x4B;
pub const CLIENT_SET_TITLE_SUBTITLE: i32 = 0x57;
pub const CLIENT_TIME_UPDATE: i32 = 0x58;
pub const CLIENT_SET_TITLE_TEXT: i32 = 0x59;
pub const CLIENT_SET_TITLE_TIMES: i32 = 0x5A;
pub const SERVER_CLIENT_SETTINGS: i32 = 0x05;
pub const SERVER_PLUGIN_MESSAGE: i32 = 0x0A;
pub const SERVER_PLAYER_POS: i32 = 0x11;
pub const SERVER_PLAYER_POS_ROT: i32 = 0x12;
}
}
/// Client state.
///
/// Note: this does not keep track of encryption states.
#[derive(Debug)]
pub struct Client {
/// Current client state.
pub state: Mutex<ClientState>,
/// Compression state.
///
/// 0 or positive if enabled, negative if disabled.
pub compression: AtomicI32,
}
impl Client {
/// Get client state.
pub fn state(&self) -> ClientState {
*self.state.lock().unwrap()
}
/// Set client state.
pub fn set_state(&self, state: ClientState) {
*self.state.lock().unwrap() = state;
}
/// Get compression threshold.
pub fn compressed(&self) -> i32 {
self.compression.load(Ordering::Relaxed)
}
/// Whether compression is used.
pub fn is_compressed(&self) -> bool {
self.compressed() >= 0
}
/// Set compression value.
#[allow(unused)]
pub fn set_compression(&self, threshold: i32) {
trace!(target: "lazymc", "Client now uses compression threshold of {}", threshold);
self.compression.store(threshold, Ordering::Relaxed);
}
}
impl Default for Client {
fn default() -> Self {
Self {
state: Default::default(),
compression: AtomicI32::new(-1),
}
}
}
/// Protocol state a client may be in.
///
/// Note: this does not include the `play` state, because this is never used anymore when a client
/// reaches this state.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ClientState {
/// Initial client state.
Handshake,
/// State to query server status.
Status,
/// State to login to server.
Login,
/// State to play on the server.
#[allow(unused)]
Play,
}
impl ClientState {
/// From state ID.
pub fn from_id(id: i32) -> Option<Self> {
match id {
0 => Some(Self::Handshake),
1 => Some(Self::Status),
2 => Some(Self::Login),
_ => None,
}
}
/// Get state ID.
pub fn to_id(self) -> i32 {
match self {
Self::Handshake => 0,
Self::Status => 1,
Self::Login => 2,
Self::Play => -1,
}
}
}
impl Default for ClientState {
fn default() -> Self {
Self::Handshake
}
}
/// Client info, useful during connection handling.
#[derive(Debug, Default)]
pub struct ClientInfo {
/// Client protocol version.
pub protocol_version: Option<i32>,
/// Client username.
pub username: Option<String>,
}
impl ClientInfo {
pub fn empty() -> Self {
Self::default()
}
}
/// Raw Minecraft packet.
///
/// Having a packet ID and a raw data byte array.

41
src/proto/packets.rs Normal file
View File

@ -0,0 +1,41 @@
//! Minecraft protocol packet IDs.
#![allow(unused)]
pub mod handshake {
pub const SERVER_HANDSHAKE: i32 = 0;
}
pub mod status {
pub const CLIENT_STATUS: i32 = 0;
pub const CLIENT_PING: i32 = 1;
pub const SERVER_STATUS: i32 = 0;
pub const SERVER_PING: i32 = 1;
}
pub mod login {
pub const CLIENT_DISCONNECT: i32 = 0x00;
pub const CLIENT_LOGIN_SUCCESS: i32 = 0x02;
pub const CLIENT_SET_COMPRESSION: i32 = 0x03;
pub const SERVER_LOGIN_START: i32 = 0x00;
}
pub mod play {
pub const CLIENT_CHAT_MSG: i32 = 0x0F;
pub const CLIENT_PLUGIN_MESSAGE: i32 = 0x18;
pub const CLIENT_NAMED_SOUND_EFFECT: i32 = 0x19;
pub const CLIENT_DISCONNECT: i32 = 0x1A;
pub const CLIENT_KEEP_ALIVE: i32 = 0x21;
pub const CLIENT_JOIN_GAME: i32 = 0x26;
pub const CLIENT_PLAYER_POS_LOOK: i32 = 0x38;
pub const CLIENT_RESPAWN: i32 = 0x3D;
pub const CLIENT_SPAWN_POS: i32 = 0x4B;
pub const CLIENT_SET_TITLE_SUBTITLE: i32 = 0x57;
pub const CLIENT_TIME_UPDATE: i32 = 0x58;
pub const CLIENT_SET_TITLE_TEXT: i32 = 0x59;
pub const CLIENT_SET_TITLE_TIMES: i32 = 0x5A;
pub const SERVER_CLIENT_SETTINGS: i32 = 0x05;
pub const SERVER_PLUGIN_MESSAGE: i32 = 0x0A;
pub const SERVER_PLAYER_POS: i32 = 0x11;
pub const SERVER_PLAYER_POS_ROT: i32 = 0x12;
}

View File

@ -5,6 +5,8 @@ use tokio::io;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use crate::net;
/// Proxy the inbound stream to a target address.
pub async fn proxy(inbound: TcpStream, addr_target: SocketAddr) -> Result<(), Box<dyn Error>> {
proxy_with_queue(inbound, addr_target, &[]).await
@ -64,5 +66,8 @@ pub async fn proxy_inbound_outbound_with_queue(
tokio::try_join!(client_to_server, server_to_client)?;
// Gracefully close connection if not done already
net::close_tcp_stream(inbound).await?;
Ok(())
}

View File

@ -6,7 +6,7 @@ use futures::FutureExt;
use tokio::net::{TcpListener, TcpStream};
use crate::config::Config;
use crate::proto::Client;
use crate::proto::client::Client;
use crate::proxy;
use crate::server::{self, Server};
use crate::service;

View File

@ -1,6 +1,4 @@
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use bytes::BytesMut;
use minecraft_protocol::data::chat::{Message, Payload};
@ -8,19 +6,18 @@ use minecraft_protocol::data::server_status::*;
use minecraft_protocol::decoder::Decoder;
use minecraft_protocol::encoder::Encoder;
use minecraft_protocol::version::v1_14_4::handshake::Handshake;
use minecraft_protocol::version::v1_14_4::login::{LoginDisconnect, LoginStart};
use minecraft_protocol::version::v1_14_4::login::LoginStart;
use minecraft_protocol::version::v1_14_4::status::StatusResponse;
use tokio::io::{self, AsyncWriteExt};
use tokio::net::tcp::WriteHalf;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio::time;
use crate::config::*;
#[cfg(feature = "lobby")]
use crate::lobby;
use crate::proto::{self, Client, ClientInfo, ClientState, RawPacket};
use crate::server::{self, Server, State};
use crate::service;
use crate::join;
use crate::proto::action;
use crate::proto::client::{Client, ClientInfo, ClientState};
use crate::proto::packet::{self, RawPacket};
use crate::proto::packets;
use crate::server::{self, Server};
/// Proxy the given inbound stream to a target address.
// TODO: do not drop error here, return Box<dyn Error>
@ -35,16 +32,13 @@ pub async fn serve(
// Incoming buffer and packet holding queue
let mut buf = BytesMut::new();
// Remember inbound packets, used for client holding and forwarding
let remember_inbound = config.join.methods.contains(&Method::Hold)
|| config.join.methods.contains(&Method::Forward);
// Remember inbound packets, track client info
let mut inbound_history = BytesMut::new();
let mut client_info = ClientInfo::empty();
loop {
// Read packet from stream
let (packet, raw) = match proto::read_packet(&client, &mut buf, &mut reader).await {
let (packet, raw) = match packet::read_packet(&client, &mut buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => {
@ -58,7 +52,7 @@ pub async fn serve(
// Hijack handshake
if client_state == ClientState::Handshake
&& packet.id == proto::packets::handshake::SERVER_HANDSHAKE
&& packet.id == packets::handshake::SERVER_HANDSHAKE
{
// Parse handshake
let handshake = match Handshake::decode(&mut packet.data.as_slice()) {
@ -84,8 +78,8 @@ pub async fn serve(
.replace(handshake.protocol_version);
client.set_state(new_state);
// If login handshake and holding is enabled, hold packets
if new_state == ClientState::Login && remember_inbound {
// If loggin in with handshake, remember inbound
if new_state == ClientState::Login {
inbound_history.extend(raw);
}
@ -93,8 +87,7 @@ pub async fn serve(
}
// Hijack server status packet
if client_state == ClientState::Status && packet.id == proto::packets::status::SERVER_STATUS
{
if client_state == ClientState::Status && packet.id == packets::status::SERVER_STATUS {
let server_status = server_status(&config, &server).await;
let packet = StatusResponse { server_status };
@ -108,15 +101,13 @@ pub async fn serve(
}
// Hijack ping packet
if client_state == ClientState::Status && packet.id == proto::packets::status::SERVER_PING {
if client_state == ClientState::Status && packet.id == packets::status::SERVER_PING {
writer.write_all(&raw).await.map_err(|_| ())?;
continue;
}
// Hijack login start
if client_state == ClientState::Login
&& packet.id == proto::packets::login::SERVER_LOGIN_START
{
if client_state == ClientState::Login && packet.id == packets::login::SERVER_LOGIN_START {
// Try to get login username, update client info
// TODO: we should always parse this packet successfully
let username = LoginStart::decode(&mut packet.data.as_slice())
@ -132,100 +123,37 @@ pub async fn serve(
}
None => info!(target: "lazymc", "Kicked player because lockout is enabled"),
}
kick(&client, &config.lockout.message, &mut writer).await?;
action::kick(&client, &config.lockout.message, &mut writer).await?;
break;
}
// Start server if not starting yet
Server::start(config.clone(), server.clone(), username).await;
// Use join occupy methods
for method in &config.join.methods {
match method {
// Kick method, immediately kick client
Method::Kick => {
trace!(target: "lazymc", "Using kick method to occupy joining client");
// Remember inbound packets
inbound_history.extend(&raw);
inbound_history.extend(&buf);
// Select message and kick
let msg = match server.state() {
server::State::Starting
| server::State::Stopped
| server::State::Started => &config.join.kick.starting,
server::State::Stopping => &config.join.kick.stopping,
};
kick(&client, msg, &mut writer).await?;
break;
}
// Build inbound packet queue with everything from login start (including this)
let mut login_queue = BytesMut::with_capacity(raw.len() + buf.len());
login_queue.extend(&raw);
login_queue.extend(&buf);
// Hold method, hold client connection while server starts
Method::Hold => {
trace!(target: "lazymc", "Using hold method to occupy joining client");
// Buf is fully consumed here
buf.clear();
// Server must be starting
if server.state() != State::Starting {
continue;
}
// Hold login packet and remaining read bytes
inbound_history.extend(&raw);
inbound_history.extend(buf.split_off(0));
// Start holding
if hold(&config, &server).await? {
service::server::route_proxy_queue(inbound, config, inbound_history);
return Ok(());
}
}
// Forward method, forward client connection while server starts
Method::Forward => {
trace!(target: "lazymc", "Using forward method to occupy joining client");
// Hold login packet and remaining read bytes
inbound_history.extend(&raw);
inbound_history.extend(buf.split_off(0));
// Forward client
debug!(target: "lazymc", "Forwarding client to {:?}!", config.join.forward.address);
service::server::route_proxy_address_queue(
inbound,
config.join.forward.address,
inbound_history,
);
return Ok(());
// TODO: do not consume client here, allow other join method on fail
}
// Lobby method, keep client in lobby while server starts
#[cfg(feature = "lobby")]
Method::Lobby => {
trace!(target: "lazymc", "Using lobby method to occupy joining client");
// Build queue with login packet and any additionally received
let mut queue = BytesMut::with_capacity(raw.len() + buf.len());
queue.extend(raw);
queue.extend(buf.split_off(0));
// Start lobby
lobby::serve(client, client_info, inbound, config, server, queue).await?;
return Ok(());
// TODO: do not consume client here, allow other join method on fail
}
// Lobby method, keep client in lobby while server starts
#[cfg(not(feature = "lobby"))]
Method::Lobby => {
error!(target: "lazymc", "Lobby join method not supported in this lazymc build");
}
}
}
debug!(target: "lazymc", "No method left to occupy joining client, disconnecting");
// Done occupying client, just disconnect
break;
// Start occupying client
join::occupy(
client,
client_info,
config,
server,
inbound,
inbound_history,
login_queue,
)
.await?;
return Ok(());
}
// Show unhandled packet warning
@ -234,96 +162,9 @@ pub async fn serve(
debug!(target: "lazymc", "- Packet ID: {}", packet.id);
}
// Gracefully close connection
match writer.shutdown().await {
Ok(_) => {}
Err(err) if err.kind() == io::ErrorKind::NotConnected => {}
Err(_) => return Err(()),
}
Ok(())
}
/// Hold a client while server starts.
///
/// Returns holding status. `true` if client is held and it should be proxied, `false` it was held
/// but it timed out.
pub async fn hold<'a>(config: &Config, server: &Server) -> Result<bool, ()> {
trace!(target: "lazymc", "Started holding client");
// A task to wait for suitable server state
// Waits for started state, errors if stopping/stopped state is reached
let task_wait = async {
let mut state = server.state_receiver();
loop {
// Wait for state change
state.changed().await.unwrap();
match state.borrow().deref() {
// Still waiting on server start
State::Starting => {
trace!(target: "lazymc", "Server not ready, holding client for longer");
continue;
}
// Server started, start relaying and proxy
State::Started => {
break true;
}
// Server stopping, this shouldn't happen, kick
State::Stopping => {
warn!(target: "lazymc", "Server stopping for held client, disconnecting");
break false;
}
// Server stopped, this shouldn't happen, disconnect
State::Stopped => {
error!(target: "lazymc", "Server stopped for held client, disconnecting");
break false;
}
}
}
};
// Wait for server state with timeout
let timeout = Duration::from_secs(config.join.hold.timeout as u64);
match time::timeout(timeout, task_wait).await {
// Relay client to proxy
Ok(true) => {
info!(target: "lazymc", "Server ready for held client, relaying to server");
Ok(true)
}
// Server stopping/stopped, this shouldn't happen, kick
Ok(false) => {
warn!(target: "lazymc", "Server stopping for held client");
Ok(false)
}
// Timeout reached, kick with starting message
Err(_) => {
warn!(target: "lazymc", "Held client reached timeout of {}s", config.join.hold.timeout);
Ok(false)
}
}
}
/// Kick client with a message.
///
/// Should close connection afterwards.
async fn kick(client: &Client, msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> {
let packet = LoginDisconnect {
reason: Message::new(Payload::text(msg)),
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::login::CLIENT_DISCONNECT, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())
}
/// Build server status object to respond to client with.
async fn server_status(config: &Config, server: &Server) -> ServerStatus {
let status = server.status().await;