Add support for packet compression

This commit is contained in:
timvisee
2021-11-15 20:18:52 +01:00
parent ae6e877f17
commit 90e64297c0
7 changed files with 334 additions and 95 deletions

1
Cargo.lock generated
View File

@@ -628,6 +628,7 @@ dependencies = [
"colored",
"derive_builder",
"dotenv",
"flate2",
"futures",
"libc",
"log",

View File

@@ -29,6 +29,7 @@ clap = { version = "3.0.0-beta.5", default-features = false, features = [ "std",
colored = "2.0"
derive_builder = "0.10"
dotenv = "0.15"
flate2 = { version = "1.0", default-features = false, features = ["default"] }
futures = { version = "0.3", default-features = false }
log = "0.4"
minecraft-protocol = { git = "https://github.com/timvisee/rust-minecraft-protocol", branch = "lazymc-v1_17_1" }

View File

@@ -5,6 +5,7 @@ use clap::ArgMatches;
use crate::config::{self, Config};
use crate::mc::server_properties;
use crate::proto;
use crate::service;
/// RCON randomized password length.
@@ -141,6 +142,14 @@ fn rewrite_server_properties(config: &Config) {
changes.extend([("prevent-proxy-connections", "false".into())]);
}
// Update network compression threshold for lobby mode
if config.join.methods.contains(&config::Method::Lobby) {
changes.extend([(
"network-compression-threshold",
proto::COMPRESSION_THRESHOLD.to_string(),
)]);
}
// Add RCON configuration
#[cfg(feature = "rcon")]
if config.rcon.enabled {

View File

@@ -10,7 +10,7 @@ use minecraft_protocol::data::chat::{Message, Payload};
use minecraft_protocol::decoder::Decoder;
use minecraft_protocol::encoder::Encoder;
use minecraft_protocol::version::v1_14_4::handshake::Handshake;
use minecraft_protocol::version::v1_14_4::login::{LoginStart, LoginSuccess};
use minecraft_protocol::version::v1_14_4::login::{LoginStart, LoginSuccess, SetCompression};
use minecraft_protocol::version::v1_17_1::game::{
ClientBoundKeepAlive, JoinGame, NamedSoundEffect, PlayerPositionAndLook, PluginMessage,
Respawn, SetTitleSubtitle, SetTitleText, SetTitleTimes, TimeUpdate,
@@ -91,7 +91,8 @@ pub async fn serve(
loop {
// Read packet from stream
let (packet, _raw) = match proto::read_packet(&mut inbound_buf, &mut reader).await {
let (packet, _raw) = match proto::read_packet(&client, &mut inbound_buf, &mut reader).await
{
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => {
@@ -112,27 +113,36 @@ pub async fn serve(
debug!(target: "lazymc::lobby", "Login on lobby server (user: {})", login_start.name);
// Respond with set compression if compression is enabled based on threshold
if proto::COMPRESSION_THRESHOLD >= 0 {
trace!(target: "lazymc::lobby", "Enabling compression for lobby client because server has it enabled (threshold: {})", proto::COMPRESSION_THRESHOLD);
respond_set_compression(&client, &mut writer, proto::COMPRESSION_THRESHOLD).await?;
client.set_compression(proto::COMPRESSION_THRESHOLD);
}
// Respond with login success, switch to play state
respond_login_success(&mut writer, &login_start).await?;
respond_login_success(&client, &mut writer, &login_start).await?;
client.set_state(ClientState::Play);
trace!(target: "lazymc::lobby", "Client login success, sending required play packets for lobby world");
// Send packets to client required to get into workable play state for lobby world
send_lobby_play_packets(&mut writer, &server).await?;
send_lobby_play_packets(&client, &mut writer, &server).await?;
// Wait for server to come online, then set up new connection to it
stage_wait(&server, &config, &mut writer).await?;
let (mut outbound, mut server_buf) = connect_to_server(client_info, &config).await?;
stage_wait(&client, &server, &config, &mut writer).await?;
let (server_client, mut outbound, mut server_buf) =
connect_to_server(client_info, &config).await?;
// Grab join game packet from server
let join_game = wait_for_server_join_game(&mut outbound, &mut server_buf).await?;
let join_game =
wait_for_server_join_game(&server_client, &mut outbound, &mut server_buf).await?;
// Reset lobby title
send_lobby_title(&mut writer, "").await?;
send_lobby_title(&client, &mut writer, "").await?;
// Play ready sound if configured
play_lobby_ready_sound(&mut writer, &config).await?;
play_lobby_ready_sound(&client, &mut writer, &config).await?;
// Wait a second because Notchian servers are slow
// See: https://wiki.vg/Protocol#Login_Success
@@ -140,7 +150,7 @@ pub async fn serve(
time::sleep(SERVER_WARMUP).await;
// Send respawn packet, initiates teleport to real server world
send_respawn_from_join(&mut writer, join_game).await?;
send_respawn_from_join(&client, &mut writer, join_game).await?;
// Drain inbound connection so we don't confuse the server
// TODO: can we void everything? we might need to forward everything to server except
@@ -174,9 +184,28 @@ pub async fn serve(
Ok(())
}
/// Respond to client with a set compression packet.
async fn respond_set_compression(
client: &Client,
writer: &mut WriteHalf<'_>,
threshold: i32,
) -> Result<(), ()> {
let packet = SetCompression { threshold };
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response =
RawPacket::new(proto::packets::login::CLIENT_SET_COMPRESSION, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
}
/// Respond to client with login success packet
// TODO: support online mode here
async fn respond_login_success(
client: &Client,
writer: &mut WriteHalf<'_>,
login_start: &LoginStart,
) -> Result<(), ()> {
@@ -191,14 +220,19 @@ async fn respond_login_success(
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::login::CLIENT_LOGIN_SUCCESS, data).encode()?;
let response =
RawPacket::new(proto::packets::login::CLIENT_LOGIN_SUCCESS, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
}
/// Play lobby ready sound effect if configured.
async fn play_lobby_ready_sound(writer: &mut WriteHalf<'_>, config: &Config) -> Result<(), ()> {
async fn play_lobby_ready_sound(
client: &Client,
writer: &mut WriteHalf<'_>,
config: &Config,
) -> Result<(), ()> {
if let Some(sound_name) = config.join.lobby.ready_sound.as_ref() {
// Must not be empty string
if sound_name.trim().is_empty() {
@@ -207,34 +241,42 @@ async fn play_lobby_ready_sound(writer: &mut WriteHalf<'_>, config: &Config) ->
}
// Play sound effect
send_lobby_player_pos(writer).await?;
send_lobby_sound_effect(writer, sound_name).await?;
send_lobby_player_pos(client, writer).await?;
send_lobby_sound_effect(client, writer, sound_name).await?;
}
Ok(())
}
/// Send packets to client to get workable play state for lobby world.
async fn send_lobby_play_packets(writer: &mut WriteHalf<'_>, server: &Server) -> Result<(), ()> {
async fn send_lobby_play_packets(
client: &Client,
writer: &mut WriteHalf<'_>,
server: &Server,
) -> Result<(), ()> {
// See: https://wiki.vg/Protocol_FAQ#What.27s_the_normal_login_sequence_for_a_client.3F
// Send initial game join
send_lobby_join_game(writer, server).await?;
send_lobby_join_game(client, writer, server).await?;
// Send server brand
send_lobby_brand(writer).await?;
send_lobby_brand(client, writer).await?;
// Send spawn and player position, disables 'download terrain' screen
send_lobby_player_pos(writer).await?;
send_lobby_player_pos(client, writer).await?;
// Notify client of world time, required once before keep-alive packets
send_lobby_time_update(writer).await?;
send_lobby_time_update(client, writer).await?;
Ok(())
}
/// Send initial join game packet to client for lobby.
async fn send_lobby_join_game(writer: &mut WriteHalf<'_>, server: &Server) -> Result<(), ()> {
async fn send_lobby_join_game(
client: &Client,
writer: &mut WriteHalf<'_>,
server: &Server,
) -> Result<(), ()> {
// Send Minecrafts default states, slightly customised for lobby world
let packet = {
let status = server.status().await;
@@ -271,14 +313,14 @@ async fn send_lobby_join_game(writer: &mut WriteHalf<'_>, server: &Server) -> Re
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_JOIN_GAME, data).encode()?;
let response = RawPacket::new(proto::packets::play::CLIENT_JOIN_GAME, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
}
/// Send lobby brand to client.
async fn send_lobby_brand(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
async fn send_lobby_brand(client: &Client, writer: &mut WriteHalf<'_>) -> Result<(), ()> {
let packet = PluginMessage {
channel: "minecraft:brand".into(),
data: SERVER_BRAND.into(),
@@ -287,14 +329,15 @@ async fn send_lobby_brand(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_PLUGIN_MESSAGE, data).encode()?;
let response =
RawPacket::new(proto::packets::play::CLIENT_PLUGIN_MESSAGE, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
}
/// Send lobby player position to client.
async fn send_lobby_player_pos(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
async fn send_lobby_player_pos(client: &Client, writer: &mut WriteHalf<'_>) -> Result<(), ()> {
// Send player location, disables download terrain screen
let packet = PlayerPositionAndLook {
x: 0.0,
@@ -310,14 +353,15 @@ async fn send_lobby_player_pos(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_PLAYER_POS_LOOK, data).encode()?;
let response =
RawPacket::new(proto::packets::play::CLIENT_PLAYER_POS_LOOK, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
}
/// Send lobby time update to client.
async fn send_lobby_time_update(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
async fn send_lobby_time_update(client: &Client, writer: &mut WriteHalf<'_>) -> Result<(), ()> {
const MC_TIME_NOON: i64 = 6000;
// Send time update, required once for keep-alive packets
@@ -329,7 +373,7 @@ async fn send_lobby_time_update(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_TIME_UPDATE, data).encode()?;
let response = RawPacket::new(proto::packets::play::CLIENT_TIME_UPDATE, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
@@ -338,7 +382,7 @@ async fn send_lobby_time_update(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
/// Send keep alive packet to client.
///
/// Required periodically in play mode to prevent client timeout.
async fn send_keep_alive(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
async fn send_keep_alive(client: &Client, writer: &mut WriteHalf<'_>) -> Result<(), ()> {
let packet = ClientBoundKeepAlive {
// Keep sending new IDs
id: KEEP_ALIVE_ID.fetch_add(1, Ordering::Relaxed),
@@ -347,7 +391,7 @@ async fn send_keep_alive(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_KEEP_ALIVE, data).encode()?;
let response = RawPacket::new(proto::packets::play::CLIENT_KEEP_ALIVE, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
// TODO: verify we receive keep alive response with same ID from client
@@ -360,7 +404,11 @@ async fn send_keep_alive(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
/// This will show the given text for two keep-alive periods. Use a newline for the subtitle.
///
/// If an empty string is given, the title times will be reset to default.
async fn send_lobby_title(writer: &mut WriteHalf<'_>, text: &str) -> Result<(), ()> {
async fn send_lobby_title(
client: &Client,
writer: &mut WriteHalf<'_>,
text: &str,
) -> Result<(), ()> {
// Grab title and subtitle bits
let title = text.lines().next().unwrap_or("");
let subtitle = text.lines().skip(1).collect::<Vec<_>>().join("\n");
@@ -373,7 +421,8 @@ async fn send_lobby_title(writer: &mut WriteHalf<'_>, text: &str) -> Result<(),
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_TEXT, data).encode()?;
let response =
RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_TEXT, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
// Set subtitle
@@ -385,7 +434,7 @@ async fn send_lobby_title(writer: &mut WriteHalf<'_>, text: &str) -> Result<(),
packet.encode(&mut data).map_err(|_| ())?;
let response =
RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_SUBTITLE, data).encode()?;
RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_SUBTITLE, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
// Set title times
@@ -407,14 +456,19 @@ async fn send_lobby_title(writer: &mut WriteHalf<'_>, text: &str) -> Result<(),
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_TIMES, data).encode()?;
let response =
RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_TIMES, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
}
/// Send lobby ready sound effect to client.
async fn send_lobby_sound_effect(writer: &mut WriteHalf<'_>, sound_name: &str) -> Result<(), ()> {
async fn send_lobby_sound_effect(
client: &Client,
writer: &mut WriteHalf<'_>,
sound_name: &str,
) -> Result<(), ()> {
let packet = NamedSoundEffect {
sound_name: sound_name.into(),
sound_category: 0,
@@ -429,7 +483,7 @@ async fn send_lobby_sound_effect(writer: &mut WriteHalf<'_>, sound_name: &str) -
packet.encode(&mut data).map_err(|_| ())?;
let response =
RawPacket::new(proto::packets::play::CLIENT_NAMED_SOUND_EFFECT, data).encode()?;
RawPacket::new(proto::packets::play::CLIENT_NAMED_SOUND_EFFECT, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
@@ -438,7 +492,11 @@ async fn send_lobby_sound_effect(writer: &mut WriteHalf<'_>, sound_name: &str) -
/// Send respawn packet to client to jump from lobby into now loaded server.
///
/// The required details will be fetched from the `join_game` packet as provided by the server.
async fn send_respawn_from_join(writer: &mut WriteHalf<'_>, join_game: JoinGame) -> Result<(), ()> {
async fn send_respawn_from_join(
client: &Client,
writer: &mut WriteHalf<'_>,
join_game: JoinGame,
) -> Result<(), ()> {
let packet = Respawn {
dimension: join_game.dimension,
world_name: join_game.world_name,
@@ -453,7 +511,7 @@ async fn send_respawn_from_join(writer: &mut WriteHalf<'_>, join_game: JoinGame)
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_RESPAWN, data).encode()?;
let response = RawPacket::new(proto::packets::play::CLIENT_RESPAWN, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
@@ -462,7 +520,11 @@ async fn send_respawn_from_join(writer: &mut WriteHalf<'_>, join_game: JoinGame)
/// An infinite keep-alive loop.
///
/// This will keep sending keep-alive and title packets to the client until it is dropped.
async fn keep_alive_loop(writer: &mut WriteHalf<'_>, config: &Config) -> Result<(), ()> {
async fn keep_alive_loop(
client: &Client,
writer: &mut WriteHalf<'_>,
config: &Config,
) -> Result<(), ()> {
let mut interval = time::interval(KEEP_ALIVE_INTERVAL);
loop {
@@ -471,8 +533,8 @@ async fn keep_alive_loop(writer: &mut WriteHalf<'_>, config: &Config) -> Result<
trace!(target: "lazymc::lobby", "Sending keep-alive sequence to lobby client");
// Send keep alive and title packets
send_keep_alive(writer).await?;
send_lobby_title(writer, &config.join.lobby.message).await?;
send_keep_alive(client, writer).await?;
send_lobby_title(client, writer, &config.join.lobby.message).await?;
}
}
@@ -482,12 +544,13 @@ async fn keep_alive_loop(writer: &mut WriteHalf<'_>, config: &Config) -> Result<
///
/// During this stage we keep sending keep-alive and title packets to the client to keep it active.
async fn stage_wait<'a>(
client: &Client,
server: &Server,
config: &Config,
writer: &mut WriteHalf<'a>,
) -> Result<(), ()> {
select! {
a = keep_alive_loop(writer, config) => a,
a = keep_alive_loop(client, writer, config) => a,
b = wait_for_server(server, config) => b,
}
}
@@ -553,7 +616,7 @@ async fn wait_for_server<'a>(server: &Server, config: &Config) -> Result<(), ()>
async fn connect_to_server(
client_info: ClientInfo,
config: &Config,
) -> Result<(TcpStream, BytesMut), ()> {
) -> Result<(Client, TcpStream, BytesMut), ()> {
time::timeout(
SERVER_CONNECT_TIMEOUT,
connect_to_server_no_timeout(client_info, config),
@@ -561,7 +624,6 @@ async fn connect_to_server(
.await
.map_err(|_| {
error!(target: "lazymc::lobby", "Creating new server connection for lobby client timed out after {}s", SERVER_CONNECT_TIMEOUT.as_secs());
()
})?
}
@@ -572,7 +634,7 @@ async fn connect_to_server(
async fn connect_to_server_no_timeout(
client_info: ClientInfo,
config: &Config,
) -> Result<(TcpStream, BytesMut), ()> {
) -> Result<(Client, TcpStream, BytesMut), ()> {
// Open connection
// TODO: on connect fail, ping server and redirect to serve_status if offline
let mut outbound = TcpStream::connect(config.server.address)
@@ -595,7 +657,8 @@ async fn connect_to_server_no_timeout(
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let request = RawPacket::new(proto::packets::handshake::SERVER_HANDSHAKE, data).encode()?;
let request =
RawPacket::new(proto::packets::handshake::SERVER_HANDSHAKE, data).encode(&tmp_client)?;
writer.write_all(&request).await.map_err(|_| ())?;
// Request login start
@@ -606,7 +669,8 @@ async fn connect_to_server_no_timeout(
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let request = RawPacket::new(proto::packets::login::SERVER_LOGIN_START, data).encode()?;
let request =
RawPacket::new(proto::packets::login::SERVER_LOGIN_START, data).encode(&tmp_client)?;
writer.write_all(&request).await.map_err(|_| ())?;
// Incoming buffer
@@ -614,7 +678,7 @@ async fn connect_to_server_no_timeout(
loop {
// Read packet from stream
let (packet, _raw) = match proto::read_packet(&mut buf, &mut reader).await {
let (packet, _raw) = match proto::read_packet(&tmp_client, &mut buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => {
@@ -626,6 +690,32 @@ async fn connect_to_server_no_timeout(
// Grab client state
let client_state = tmp_client.state();
// Catch set compression
if client_state == ClientState::Login
&& packet.id == proto::packets::login::CLIENT_SET_COMPRESSION
{
// Decode compression packet
let set_compression =
SetCompression::decode(&mut packet.data.as_slice()).map_err(|err| {
dbg!(err);
()
})?;
// Client and server compression threshold should match, show warning if not
if set_compression.threshold != proto::COMPRESSION_THRESHOLD {
error!(
target: "lazymc::lobby",
"Compression threshold sent to lobby client does not match threshold from server, this may cause errors (client: {}, server: {})",
proto::COMPRESSION_THRESHOLD,
set_compression.threshold
);
}
// Set client compression
tmp_client.set_compression(set_compression.threshold);
continue;
}
// Hijack login success
if client_state == ClientState::Login
&& packet.id == proto::packets::login::CLIENT_LOGIN_SUCCESS
@@ -642,7 +732,12 @@ async fn connect_to_server_no_timeout(
// Switch to play state
tmp_client.set_state(ClientState::Play);
return Ok((outbound, buf));
// Server must enable compression if enabled for client, show warning otherwise
if tmp_client.is_compressed() != (proto::COMPRESSION_THRESHOLD >= 0) {
error!(target: "lazymc::lobby", "Compression enabled for lobby client while the server did not, this will cause errors");
}
return Ok((tmp_client, outbound, buf));
}
// Show unhandled packet warning
@@ -665,17 +760,17 @@ async fn connect_to_server_no_timeout(
///
/// This parses, consumes and returns the packet.
async fn wait_for_server_join_game(
client: &Client,
outbound: &mut TcpStream,
buf: &mut BytesMut,
) -> Result<JoinGame, ()> {
time::timeout(
SERVER_JOIN_GAME_TIMEOUT,
wait_for_server_join_game_no_timeout(outbound, buf),
wait_for_server_join_game_no_timeout(client, outbound, buf),
)
.await
.map_err(|_| {
error!(target: "lazymc::lobby", "Waiting for for game data from server for lobby client timed out after {}s", SERVER_JOIN_GAME_TIMEOUT.as_secs());
()
})?
}
@@ -685,6 +780,7 @@ async fn wait_for_server_join_game(
// TODO: clean this up
// TODO: do not drop error here, return Box<dyn Error>
async fn wait_for_server_join_game_no_timeout(
client: &Client,
outbound: &mut TcpStream,
buf: &mut BytesMut,
) -> Result<JoinGame, ()> {
@@ -692,7 +788,7 @@ async fn wait_for_server_join_game_no_timeout(
loop {
// Read packet from stream
let (packet, _raw) = match proto::read_packet(buf, &mut reader).await {
let (packet, _raw) = match proto::read_packet(&client, buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => {
@@ -705,7 +801,6 @@ async fn wait_for_server_join_game_no_timeout(
if packet.id == proto::packets::play::CLIENT_JOIN_GAME {
let join_game = JoinGame::decode(&mut packet.data.as_slice()).map_err(|err| {
dbg!(err);
()
})?;
return Ok(join_game);

View File

@@ -16,7 +16,7 @@ use tokio::net::TcpStream;
use tokio::time;
use crate::config::Config;
use crate::proto::{self, ClientState, RawPacket};
use crate::proto::{self, Client, ClientState, RawPacket};
use crate::server::{Server, State};
/// Monitor ping inverval in seconds.
@@ -96,22 +96,29 @@ pub async fn poll_server(
async fn fetch_status(config: &Config, addr: SocketAddr) -> Result<ServerStatus, ()> {
let mut stream = TcpStream::connect(addr).await.map_err(|_| ())?;
send_handshake(&mut stream, config, addr).await?;
request_status(&mut stream).await?;
wait_for_status_timeout(&mut stream).await
// Dummy client
let client = Client::default();
send_handshake(&client, &mut stream, config, addr).await?;
request_status(&client, &mut stream).await?;
wait_for_status_timeout(&client, &mut stream).await
}
/// Attemp to ping server.
async fn do_ping(config: &Config, addr: SocketAddr) -> Result<(), ()> {
let mut stream = TcpStream::connect(addr).await.map_err(|_| ())?;
send_handshake(&mut stream, config, addr).await?;
let token = send_ping(&mut stream).await?;
wait_for_ping_timeout(&mut stream, token).await
// Dummy client
let client = Client::default();
send_handshake(&client, &mut stream, config, addr).await?;
let token = send_ping(&client, &mut stream).await?;
wait_for_ping_timeout(&client, &mut stream, token).await
}
/// Send handshake.
async fn send_handshake(
client: &Client,
stream: &mut TcpStream,
config: &Config,
addr: SocketAddr,
@@ -127,7 +134,7 @@ async fn send_handshake(
handshake.encode(&mut packet).map_err(|_| ())?;
let raw = RawPacket::new(proto::packets::handshake::SERVER_HANDSHAKE, packet)
.encode()
.encode(&client)
.map_err(|_| ())?;
stream.write_all(&raw).await.map_err(|_| ())?;
@@ -135,16 +142,16 @@ async fn send_handshake(
}
/// Send status request.
async fn request_status(stream: &mut TcpStream) -> Result<(), ()> {
async fn request_status(client: &Client, stream: &mut TcpStream) -> Result<(), ()> {
let raw = RawPacket::new(proto::packets::status::SERVER_STATUS, vec![])
.encode()
.encode(client)
.map_err(|_| ())?;
stream.write_all(&raw).await.map_err(|_| ())?;
Ok(())
}
/// Send status request.
async fn send_ping(stream: &mut TcpStream) -> Result<u64, ()> {
async fn send_ping(client: &Client, stream: &mut TcpStream) -> Result<u64, ()> {
let token = rand::thread_rng().gen();
let ping = PingRequest { time: token };
@@ -152,21 +159,21 @@ async fn send_ping(stream: &mut TcpStream) -> Result<u64, ()> {
ping.encode(&mut packet).map_err(|_| ())?;
let raw = RawPacket::new(proto::packets::status::SERVER_PING, packet)
.encode()
.encode(client)
.map_err(|_| ())?;
stream.write_all(&raw).await.map_err(|_| ())?;
Ok(token)
}
/// Wait for a status response.
async fn wait_for_status(stream: &mut TcpStream) -> Result<ServerStatus, ()> {
async fn wait_for_status(client: &Client, stream: &mut TcpStream) -> Result<ServerStatus, ()> {
// Get stream reader, set up buffer
let (mut reader, mut _writer) = stream.split();
let mut buf = BytesMut::new();
loop {
// Read packet from stream
let (packet, _raw) = match proto::read_packet(&mut buf, &mut reader).await {
let (packet, _raw) = match proto::read_packet(client, &mut buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => continue,
@@ -184,22 +191,25 @@ async fn wait_for_status(stream: &mut TcpStream) -> Result<ServerStatus, ()> {
}
/// Wait for a status response.
async fn wait_for_status_timeout(stream: &mut TcpStream) -> Result<ServerStatus, ()> {
let status = wait_for_status(stream);
async fn wait_for_status_timeout(
client: &Client,
stream: &mut TcpStream,
) -> Result<ServerStatus, ()> {
let status = wait_for_status(client, stream);
tokio::time::timeout(Duration::from_secs(STATUS_TIMEOUT), status)
.await
.map_err(|_| ())?
}
/// Wait for a status response.
async fn wait_for_ping(stream: &mut TcpStream, token: u64) -> Result<(), ()> {
async fn wait_for_ping(client: &Client, stream: &mut TcpStream, token: u64) -> Result<(), ()> {
// Get stream reader, set up buffer
let (mut reader, mut _writer) = stream.split();
let mut buf = BytesMut::new();
loop {
// Read packet from stream
let (packet, _raw) = match proto::read_packet(&mut buf, &mut reader).await {
let (packet, _raw) = match proto::read_packet(client, &mut buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => continue,
@@ -223,8 +233,12 @@ async fn wait_for_ping(stream: &mut TcpStream, token: u64) -> Result<(), ()> {
}
/// Wait for a status response.
async fn wait_for_ping_timeout(stream: &mut TcpStream, token: u64) -> Result<(), ()> {
let status = wait_for_ping(stream, token);
async fn wait_for_ping_timeout(
client: &Client,
stream: &mut TcpStream,
token: u64,
) -> Result<(), ()> {
let status = wait_for_ping(client, stream, token);
tokio::time::timeout(Duration::from_secs(PING_TIMEOUT), status)
.await
.map_err(|_| ())?

View File

@@ -1,6 +1,11 @@
use std::io::prelude::*;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Mutex;
use bytes::BytesMut;
use flate2::read::ZlibDecoder;
use flate2::write::ZlibEncoder;
use flate2::Compression;
use tokio::io;
use tokio::io::AsyncReadExt;
use tokio::net::tcp::ReadHalf;
@@ -23,6 +28,10 @@ pub const PROTO_DEFAULT_VERSION: &str = "1.17.1";
/// Should be kept up-to-date with latest supported Minecraft version by lazymc.
pub const PROTO_DEFAULT_PROTOCOL: u32 = 756;
/// Compression threshold to use.
// TODO: read this from server.properties instead
pub const COMPRESSION_THRESHOLD: i32 = 256;
/// Minecraft protocol packet IDs.
#[allow(unused)]
pub mod packets {
@@ -38,9 +47,10 @@ pub mod packets {
}
pub mod login {
pub const CLIENT_DISCONNECT: i32 = 0;
pub const CLIENT_LOGIN_SUCCESS: i32 = 2;
pub const SERVER_LOGIN_START: i32 = 0;
pub const CLIENT_DISCONNECT: i32 = 0x00;
pub const CLIENT_LOGIN_SUCCESS: i32 = 0x02;
pub const CLIENT_SET_COMPRESSION: i32 = 0x03;
pub const SERVER_LOGIN_START: i32 = 0x00;
}
pub mod play {
@@ -68,10 +78,15 @@ pub mod packets {
///
/// Note: this does not keep track of compression/encryption states because packets are never
/// inspected when these modes are enabled.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct Client {
/// Current client state.
pub state: Mutex<ClientState>,
/// Compression state.
///
/// 0 or positive if enabled, negative if disabled.
pub compression: AtomicI32,
}
impl Client {
@@ -84,6 +99,31 @@ impl Client {
pub fn set_state(&self, state: ClientState) {
*self.state.lock().unwrap() = state;
}
/// Get compression threshold.
pub fn compressed(&self) -> i32 {
self.compression.load(Ordering::Relaxed)
}
/// Whether compression is used.
pub fn is_compressed(&self) -> bool {
self.compressed() >= 0
}
/// Set compression value.
pub fn set_compression(&self, threshold: i32) {
trace!(target: "lazymc", "Client now uses compression threshold of {}", threshold);
self.compression.store(threshold, Ordering::Relaxed);
}
}
impl Default for Client {
fn default() -> Self {
Self {
state: Default::default(),
compression: AtomicI32::new(-1),
}
}
}
/// Protocol state a client may be in.
@@ -166,12 +206,8 @@ impl RawPacket {
Self { id, data }
}
/// Decode packet from raw buffer.
pub fn decode(mut buf: &[u8]) -> Result<Self, ()> {
// Read length
let (read, len) = types::read_var_int(buf)?;
buf = &buf[read..][..len as usize];
/// Read packet ID from buffer, use remaining buffer as data.
fn read_packet_id_data(mut buf: &[u8]) -> Result<Self, ()> {
// Read packet ID, select buf
let (read, packet_id) = types::read_var_int(buf)?;
buf = &buf[read..];
@@ -179,8 +215,94 @@ impl RawPacket {
Ok(Self::new(packet_id, buf.to_vec()))
}
/// Decode packet from raw buffer.
///
/// This decodes both compressed and uncompressed packets based on the client threshold
/// preference.
pub fn decode(client: &Client, mut buf: &[u8]) -> Result<Self, ()> {
// Read length
let (read, len) = types::read_var_int(buf)?;
buf = &buf[read..][..len as usize];
// If no compression is used, read remaining packet ID and data
if !client.is_compressed() {
// Read packet ID and data
return Self::read_packet_id_data(buf);
}
// Read data length
let (read, data_len) = types::read_var_int(buf)?;
buf = &buf[read..];
// If data length is zero, the rest is not compressed
if data_len == 0 {
return Self::read_packet_id_data(buf);
}
// Decompress packet ID and data section
let mut decompressed = Vec::with_capacity(data_len as usize);
ZlibDecoder::new(buf)
.read_to_end(&mut decompressed)
.map_err(|err| {
error!(target: "lazymc", "Packet decompression error: {}", err);
})?;
// Decompressed data must match length
if decompressed.len() != data_len as usize {
error!(target: "lazymc", "Decompressed packet has different length than expected ({}b != {}b)", decompressed.len(), data_len);
return Err(());
}
// Read decompressed packet ID
return Self::read_packet_id_data(&decompressed);
}
/// Encode packet to raw buffer.
pub fn encode(&self) -> Result<Vec<u8>, ()> {
///
/// This compresses packets based on the client threshold preference.
pub fn encode(&self, client: &Client) -> Result<Vec<u8>, ()> {
let threshold = client.compressed();
if threshold >= 0 {
self.encode_compressed(threshold)
} else {
self.encode_uncompressed()
}
}
/// Encode compressed packet to raw buffer.
fn encode_compressed(&self, threshold: i32) -> Result<Vec<u8>, ()> {
// Packet payload: packet ID and data buffer
let mut payload = types::encode_var_int(self.id)?;
payload.extend_from_slice(&self.data);
// Determine whether to compress, encode data length bytes
let data_len = payload.len() as i32;
let compress = data_len > threshold;
let mut data_len_bytes =
types::encode_var_int(if compress { data_len } else { 0 }).unwrap();
// Compress payload
if compress {
let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&payload).map_err(|err| {
error!(target: "lazymc", "Failed to compress packet: {}", err);
})?;
payload = encoder.finish().map_err(|err| {
error!(target: "lazymc", "Failed to compress packet: {}", err);
})?;
}
// Encapsulate payload with packet and data length
let len = data_len_bytes.len() as i32 + payload.len() as i32;
let mut packet = types::encode_var_int(len)?;
packet.append(&mut data_len_bytes);
packet.append(&mut payload);
Ok(packet)
}
/// Encode uncompressed packet to raw buffer.
fn encode_uncompressed(&self) -> Result<Vec<u8>, ()> {
let mut data = types::encode_var_int(self.id)?;
data.extend_from_slice(&self.data);
@@ -193,11 +315,8 @@ impl RawPacket {
}
/// Read raw packet from stream.
///
/// Note: this does not support reading compressed/encrypted packets.
/// We should never need this though, as we're done reading user packets before any of this is
/// enabled. See: https://wiki.vg/Protocol#Packet_format
pub async fn read_packet(
client: &Client,
buf: &mut BytesMut,
stream: &mut ReadHalf<'_>,
) -> Result<Option<(RawPacket, Vec<u8>)>, ()> {
@@ -249,9 +368,9 @@ pub async fn read_packet(
buf.extend(tmp);
}
// Parse packet
// Parse packet, use full buffer since we'll read the packet length again
let raw = buf.split_to(consumed + len as usize);
let packet = RawPacket::decode(&raw)?;
let packet = RawPacket::decode(client, &raw)?;
Ok(Some((packet, raw.to_vec())))
}

View File

@@ -43,7 +43,7 @@ pub async fn serve(
loop {
// Read packet from stream
let (packet, raw) = match proto::read_packet(&mut buf, &mut reader).await {
let (packet, raw) = match proto::read_packet(&client, &mut buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => {
@@ -100,7 +100,7 @@ pub async fn serve(
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(0, data).encode()?;
let response = RawPacket::new(0, data).encode(&client)?;
writer.write_all(&response).await.map_err(|_| ())?;
continue;
@@ -131,7 +131,7 @@ pub async fn serve(
}
None => info!(target: "lazymc", "Kicked player because lockout is enabled"),
}
kick(&config.lockout.message, &mut writer).await?;
kick(&client, &config.lockout.message, &mut writer).await?;
break;
}
@@ -154,7 +154,7 @@ pub async fn serve(
| server::State::Started => &config.join.kick.starting,
server::State::Stopping => &config.join.kick.stopping,
};
kick(msg, &mut writer).await?;
kick(&client, msg, &mut writer).await?;
break;
}
@@ -308,7 +308,7 @@ pub async fn hold<'a>(config: &Config, server: &Server) -> Result<bool, ()> {
/// Kick client with a message.
///
/// Should close connection afterwards.
async fn kick(msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> {
async fn kick(client: &Client, msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> {
let packet = LoginDisconnect {
reason: Message::new(Payload::text(msg)),
};
@@ -316,7 +316,7 @@ async fn kick(msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> {
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::login::CLIENT_DISCONNECT, data).encode()?;
let response = RawPacket::new(proto::packets::login::CLIENT_DISCONNECT, data).encode(client)?;
writer.write_all(&response).await.map_err(|_| ())
}