Probe server on lazymc start to get up-to-date server details for Forge

This commit is contained in:
timvisee
2021-11-22 22:40:37 +01:00
parent 8b09faae3d
commit 9b1f2a7011
14 changed files with 762 additions and 29 deletions

View File

@@ -39,6 +39,9 @@ command = "java -Xmx1G -Xms1G -jar server.jar --nogui"
# Immediately wake server after crash.
#wake_on_crash = false
# Set to true if this server runs Forge.
#forge = false
# Server start/stop timeout in seconds. Force kill server process if it takes too long.
#start_timeout = 300
#stop_timeout = 150

View File

@@ -182,6 +182,10 @@ pub struct Server {
#[serde(default)]
pub wake_on_crash: bool,
/// Whether this server runs forge.
#[serde(default)]
pub forge: bool,
/// Server starting timeout. Force kill server process if it takes longer.
#[serde(default = "u32_300")]
pub start_timeout: u32,

239
src/forge.rs Normal file
View File

@@ -0,0 +1,239 @@
use std::sync::Arc;
use std::time::Duration;
use bytes::BytesMut;
use minecraft_protocol::decoder::Decoder;
use minecraft_protocol::encoder::Encoder;
use minecraft_protocol::version::forge_v1_13::login::{Acknowledgement, LoginWrapper, ModList};
use minecraft_protocol::version::v1_14_4::login::{LoginPluginRequest, LoginPluginResponse};
use minecraft_protocol::version::PacketId;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::WriteHalf;
use tokio::net::TcpStream;
use tokio::time;
use crate::forge;
use crate::proto::client::{Client, ClientState};
use crate::proto::packet::RawPacket;
use crate::proto::{packet, packets};
use crate::server::Server;
/// Forge status magic.
pub const STATUS_MAGIC: &str = "\0FML2\0";
/// Forge plugin wrapper login plugin request channel.
pub const CHANNEL_LOGIN_WRAPPER: &str = "fml:loginwrapper";
/// Forge handshake channel.
pub const CHANNEL_HANDSHAKE: &str = "fml:handshake";
/// Respond with Forge login wrapper packet.
pub async fn respond_forge_login_packet(
client: &Client,
writer: &mut WriteHalf<'_>,
message_id: i32,
forge_channel: String,
forge_packet: impl PacketId + Encoder,
) -> Result<(), ()> {
// Encode Forge packet to data
let mut forge_data = Vec::new();
forge_packet.encode(&mut forge_data).map_err(|_| ())?;
// Encode Forge payload
let forge_payload =
RawPacket::new(forge_packet.packet_id(), forge_data).encode_without_len(client)?;
// Wrap Forge payload in login wrapper
let mut payload = Vec::new();
let packet = LoginWrapper {
channel: forge_channel,
packet: forge_payload,
};
packet.encode(&mut payload).map_err(|_| ())?;
// Write login plugin request with forge payload
packet::write_packet(
LoginPluginResponse {
message_id,
successful: true,
data: payload,
},
client,
writer,
)
.await
}
/// Respond to a Forge login plugin request.
pub async fn respond_login_plugin_request(
client: &Client,
packet: LoginPluginRequest,
writer: &mut WriteHalf<'_>,
) -> Result<(), ()> {
// Decode Forge login wrapper packet
let (message_id, login_wrapper, packet) =
forge::decode_forge_login_packet(&client, packet).await?;
// Determine whether we received the mod list
let is_unknown_header = login_wrapper.channel != forge::CHANNEL_HANDSHAKE;
let is_mod_list = !is_unknown_header && packet.id == packets::forge::login::CLIENT_MOD_LIST;
// If not the mod list, just acknowledge
if !is_mod_list {
trace!(target: "lazymc::forge", "Acknowledging login plugin request");
forge::respond_forge_login_packet(
client,
writer,
message_id,
login_wrapper.channel,
Acknowledgement {},
)
.await
.map_err(|_| {
error!(target: "lazymc::forge", "Failed to send Forge login plugin request acknowledgement");
})?;
return Ok(());
}
trace!(target: "lazymc::forge", "Sending mod list reply to server with same contents");
// Parse mod list, transform into reply
let mod_list = ModList::decode(&mut packet.data.as_slice()).map_err(|err| {
error!(target: "lazymc::forge", "Failed to decode Forge mod list: {:?}", err);
})?;
let mod_list_reply = mod_list.into_reply();
// We got mod list, respond with reply
forge::respond_forge_login_packet(
client,
writer,
message_id,
login_wrapper.channel,
mod_list_reply,
)
.await
.map_err(|_| {
error!(target: "lazymc::forge", "Failed to send Forge login plugin mod list reply");
})?;
Ok(())
}
/// Decode a Forge login wrapper packet from login plugin request.
///
/// Returns (`message_id`, `login_wrapper`, `packet`).
pub async fn decode_forge_login_packet(
client: &Client,
plugin_request: LoginPluginRequest,
) -> Result<(i32, LoginWrapper, RawPacket), ()> {
// Validate channel
assert_eq!(plugin_request.channel, CHANNEL_LOGIN_WRAPPER);
// Decode login wrapped packet
let login_wrapper =
LoginWrapper::decode(&mut plugin_request.data.as_slice()).map_err(|err| {
error!(target: "lazymc::forge", "Failed to decode Forge LoginWrapper packet: {:?}", err);
})?;
// Parse packet
let packet = RawPacket::decode_without_len(client, &login_wrapper.packet).map_err(|err| {
error!(target: "lazymc::forge", "Failed to decode Forge LoginWrapper packet contents: {:?}", err);
})?;
Ok((plugin_request.message_id, login_wrapper, packet))
}
/// Replay the Forge login payload for a client.
pub async fn replay_login_payload(
client: &Client,
inbound: &mut TcpStream,
server: Arc<Server>,
inbound_buf: &mut BytesMut,
) -> Result<(), ()> {
debug!(target: "lazymc::lobby", "Replaying Forge login procedure for lobby client...");
// Replay each Forge packet
for packet in server.forge_payload.lock().await.as_slice() {
inbound.write_all(&packet).await.map_err(|err| {
error!(target: "lazymc::lobby", "Failed to send Forge join payload to lobby client, will likely cause issues: {}", err);
})?;
}
// Drain all responses
let count = server.forge_payload.lock().await.len();
drain_forge_responses(client, inbound, inbound_buf, count).await?;
trace!(target: "lazymc::lobby", "Forge join payload replayed");
Ok(())
}
/// Drain Forge login plugin response packets from stream.
async fn drain_forge_responses(
client: &Client,
inbound: &mut TcpStream,
buf: &mut BytesMut,
mut count: usize,
) -> Result<(), ()> {
let (mut reader, mut _writer) = inbound.split();
loop {
// We're done if count is zero
if count == 0 {
trace!(target: "lazymc::forge", "Drained all plugin responses from client");
return Ok(());
}
// TODO: move timeout into constant
let read_packet_task = packet::read_packet(&client, buf, &mut reader);
let timeout = time::timeout(Duration::from_secs(5), read_packet_task).await;
let read_packet_task = match timeout {
Ok(result) => result,
Err(_) => {
error!(target: "lazymc::forge", "Expected more plugin responses from client, but didn't receive anything in a while, may be problematic");
return Ok(());
}
};
// Read packet from stream
let (packet, _raw) = match read_packet_task {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => {
error!(target: "lazymc::forge", "Closing connection, error occurred");
break;
}
};
// Grab client state
let client_state = client.state();
// Catch login plugin resposne
if client_state == ClientState::Login
&& packet.id == packets::login::SERVER_LOGIN_PLUGIN_RESPONSE
{
trace!(target: "lazymc::forge", "Voiding plugin response from client");
count -= 1;
continue;
}
// TODO: instantly return on this packet?
// // Hijack login success
// if client_state == ClientState::Login && packet.id == packets::login::CLIENT_LOGIN_SUCCESS {
// trace!(target: "lazymc::forge", "Received login success from server connection, change to play mode");
// // Switch to play state
// tmp_client.set_state(ClientState::Play);
// return Ok(forge_payload);
// }
// Show unhandled packet warning
debug!(target: "lazymc::forge", "Received unhandled packet from server in record_forge_response:");
debug!(target: "lazymc::forge", "- State: {:?}", client_state);
debug!(target: "lazymc::forge", "- Packet ID: 0x{:02X} ({})", packet.id, packet.id);
}
Err(())
}

View File

@@ -8,12 +8,20 @@ use bytes::BytesMut;
use futures::FutureExt;
use minecraft_protocol::data::chat::{Message, Payload};
use minecraft_protocol::decoder::Decoder;
use minecraft_protocol::version::v1_14_4::login::{LoginStart, LoginSuccess, SetCompression};
use minecraft_protocol::version::forge_v1_13::login::LoginWrapper;
use minecraft_protocol::version::v1_14_4::login::{
LoginDisconnect, LoginPluginRequest, LoginPluginResponse, LoginStart, LoginSuccess,
SetCompression,
};
use minecraft_protocol::version::v1_16_5::game::{Title, TitleAction};
use minecraft_protocol::version::v1_17_1::game::{
ClientBoundKeepAlive, ClientBoundPluginMessage, JoinGame, NamedSoundEffect,
PlayerPositionAndLook, Respawn, SetTitleSubtitle, SetTitleText, SetTitleTimes, TimeUpdate,
};
use minecraft_protocol::version::PacketId;
use nbt::CompoundTag;
use proto::packet::RawPacket;
use rand::Rng;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::TcpStream;
@@ -21,6 +29,7 @@ use tokio::select;
use tokio::time;
use crate::config::*;
use crate::forge;
use crate::mc::{self, uuid};
use crate::net;
use crate::proto;
@@ -108,6 +117,15 @@ pub async fn serve(
debug!(target: "lazymc::lobby", "Login on lobby server (user: {})", login_start.name);
// Replay Forge payload
if config.server.forge {
forge::replay_login_payload(client, &mut inbound, server.clone(), &mut inbound_buf)
.await?;
let (returned_reader, returned_writer) = inbound.split();
reader = returned_reader;
writer = returned_writer;
}
// Respond with set compression if compression is enabled based on threshold
if proto::COMPRESSION_THRESHOLD >= 0 {
trace!(target: "lazymc::lobby", "Enabling compression for lobby client because server has it enabled (threshold: {})", proto::COMPRESSION_THRESHOLD);
@@ -255,11 +273,18 @@ async fn send_lobby_join_game(
writer: &mut WriteHalf<'_>,
server: &Server,
) -> Result<(), ()> {
// Grab probed dimension codec
let dimension_codec: CompoundTag =
if let Some(ref join_game) = server.probed_join_game.lock().await.as_ref() {
join_game.dimension_codec.clone()
} else {
snbt_to_compound_tag(include_str!("../res/dimension_codec.snbt"))
};
// Send Minecrafts default states, slightly customised for lobby world
packet::write_packet(
{
let status = server.status().await;
JoinGame {
// Player ID must be unique, if it collides with another server entity ID the player gets
// in a weird state and cannot move
@@ -273,7 +298,7 @@ async fn send_lobby_join_game(
"minecraft:the_nether".into(),
"minecraft:the_end".into(),
],
dimension_codec: snbt_to_compound_tag(include_str!("../res/dimension_codec.snbt")),
dimension_codec,
dimension: snbt_to_compound_tag(include_str!("../res/dimension.snbt")),
world_name: "lazymc:lobby".into(),
hashed_seed: 0,
@@ -647,9 +672,7 @@ async fn connect_to_server_no_timeout(
{
// Decode compression packet
let set_compression =
SetCompression::decode(&mut packet.data.as_slice()).map_err(|err| {
dbg!(err);
})?;
SetCompression::decode(&mut packet.data.as_slice()).map_err(|_| ())?;
// Client and server compression threshold should match, show warning if not
if set_compression.threshold != proto::COMPRESSION_THRESHOLD {
@@ -666,6 +689,27 @@ async fn connect_to_server_no_timeout(
continue;
}
// Hijack login plugin request
if client_state == ClientState::Login
&& packet.id == packets::login::CLIENT_LOGIN_PLUGIN_REQUEST
{
// TODO: update message if not on forge
trace!(target: "lazymc::lobby", "Received login plugin request from server, assuming we should send Forge payload now");
// Decode login plugin request
let plugin_request =
LoginPluginRequest::decode(&mut packet.data.as_slice()).map_err(|err| {
dbg!(err);
})?;
// Respond to Forge login plugin request
forge::respond_login_plugin_request(&tmp_client, plugin_request, &mut writer).await?;
// TODO: if not on forge, respond with empty payload because we don't understand
continue;
}
// Hijack login success
if client_state == ClientState::Login && packet.id == packets::login::CLIENT_LOGIN_SUCCESS {
trace!(target: "lazymc::lobby", "Received login success from server connection, change to play mode");
@@ -688,6 +732,24 @@ async fn connect_to_server_no_timeout(
return Ok((tmp_client, outbound, buf));
}
// Hijack disconnect
if client_state == ClientState::Login && packet.id == packets::login::CLIENT_DISCONNECT {
error!(target: "lazymc::lobby", "Received disconnect from server connection");
// // Decode disconnect packet
// let login_disconnect =
// LoginDisconnect::decode(&mut packet.data.as_slice()).map_err(|err| {
// dbg!(err);
// })?;
// TODO: report/forward error to client
break;
}
// TODO: if receiving encryption request, disconnect with error because we don't support
// online mode!
// Show unhandled packet warning
debug!(target: "lazymc::lobby", "Received unhandled packet from server in connect_to_server:");
debug!(target: "lazymc::lobby", "- State: {:?}", client_state);
@@ -799,7 +861,6 @@ async fn drain_stream(reader: &mut ReadHalf<'_>) -> Result<(), ()> {
/// Read NBT CompoundTag from SNBT.
fn snbt_to_compound_tag(data: &str) -> CompoundTag {
use nbt::decode::read_compound_tag;
use quartz_nbt::io::{write_nbt, Flavor};
use quartz_nbt::snbt;
@@ -812,5 +873,11 @@ fn snbt_to_compound_tag(data: &str) -> CompoundTag {
.expect("failed to encode NBT CompoundTag as binary");
// Parse binary with usable NBT create
read_compound_tag(&mut &*binary).unwrap()
bin_to_compound_tag(&mut &*binary)
}
/// Read NBT CompoundTag from SNBT.
fn bin_to_compound_tag(data: &[u8]) -> CompoundTag {
use nbt::decode::read_compound_tag;
read_compound_tag(&mut &*data).unwrap()
}

View File

@@ -10,6 +10,7 @@ extern crate log;
pub(crate) mod action;
pub(crate) mod cli;
pub(crate) mod config;
pub(crate) mod forge;
pub(crate) mod join;
#[cfg(feature = "lobby")]
pub(crate) mod lobby;
@@ -17,6 +18,7 @@ pub(crate) mod mc;
pub(crate) mod monitor;
pub(crate) mod net;
pub(crate) mod os;
pub(crate) mod probe;
pub(crate) mod proto;
pub(crate) mod proxy;
pub(crate) mod server;

View File

@@ -5,7 +5,7 @@ use uuid::Uuid;
const OFFLINE_PLAYER_NAMESPACE: &str = "OfflinePlayer:";
/// Get UUID for given player username.
pub fn player_uuid(username: &str) -> Uuid {
fn player_uuid(username: &str) -> Uuid {
java_name_uuid_from_bytes(username.as_bytes())
}

353
src/probe.rs Normal file
View File

@@ -0,0 +1,353 @@
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use bytes::BytesMut;
use minecraft_protocol::decoder::Decoder;
use minecraft_protocol::version::v1_14_4::handshake::Handshake;
use minecraft_protocol::version::v1_14_4::login::{
LoginPluginRequest, LoginPluginResponse, LoginStart, SetCompression,
};
use minecraft_protocol::version::v1_17_1::game::JoinGame;
use tokio::net::TcpStream;
use tokio::time;
use crate::config::Config;
use crate::forge;
use crate::net;
use crate::proto::client::{Client, ClientState};
use crate::proto::{self, packet, packets};
use crate::server::{Server, State};
/// Minecraft username to use for probing the server.
const PROBE_USER: &str = "_lazymc_probe";
/// Timeout for probe user connecting to the server.
const PROBE_CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
/// Maximum time the probe may wait for the server to come online.
const PROBE_ONLINE_TIMEOUT: Duration = Duration::from_secs(10 * 60);
/// Timeout for receiving join game packet.
///
/// When the play state is reached, the server should immeditely respond with a join game packet.
/// This defines the maximum timeout for waiting on it.
const PROBE_JOIN_GAME_TIMEOUT: Duration = Duration::from_secs(20);
/// Connect to the Minecraft server and probe useful details from it.
pub async fn probe(config: Arc<Config>, server: Arc<Server>) -> Result<(), ()> {
debug!(target: "lazymc::probe", "Starting server probe...");
// Start server if not starting already
if Server::start(config.clone(), server.clone(), None).await {
warn!(target: "lazymc::probe", "Starting server to probe required details...");
}
// Wait for server to come online
if !wait_until_online(&server).await? {
warn!(target: "lazymc::probe", "Couldn't probe server, failed to wait for server to come online");
return Err(());
}
debug!(target: "lazymc::probe", "Connecting to server to probe details...");
// Connect to server, record Forge payload
let forge_payload = connect_to_server(&config, &server).await?;
*server.forge_payload.lock().await = forge_payload.into();
Ok(())
}
/// Wait for the server to come online.
///
/// Returns `true` when it is online.
async fn wait_until_online<'a>(server: &Server) -> Result<bool, ()> {
trace!(target: "lazymc::probe", "Waiting for server to come online...");
// A task to wait for suitable server state
// Waits for started state, errors if stopping/stopped state is reached
let task_wait = async {
let mut state = server.state_receiver();
loop {
// Wait for state change
state.changed().await.unwrap();
match state.borrow().deref() {
// Still waiting on server start
State::Starting => {
continue;
}
// Server started, start relaying and proxy
State::Started => {
break true;
}
// Server stopping, this shouldn't happen, skip
State::Stopping => {
warn!(target: "lazymc::probe", "Server stopping while trying to probe, skipping");
break false;
}
// Server stopped, this shouldn't happen, skip
State::Stopped => {
error!(target: "lazymc::probe", "Server stopped while trying to probe, skipping");
break false;
}
}
}
};
// Wait for server state with timeout
match time::timeout(PROBE_ONLINE_TIMEOUT, task_wait).await {
Ok(online) => Ok(online),
// Timeout reached, kick with starting message
Err(_) => {
warn!(target: "lazymc::probe", "Probe waited for server to come online but timed out after {}s", PROBE_ONLINE_TIMEOUT.as_secs());
Ok(false)
}
}
}
/// Create connection to the server, with timeout.
///
/// This will initialize the connection to the play state. Client details are used.
///
/// Returns recorded Forge login payload if any.
async fn connect_to_server(config: &Config, server: &Server) -> Result<Vec<Vec<u8>>, ()> {
time::timeout(
PROBE_CONNECT_TIMEOUT,
connect_to_server_no_timeout(config, server),
)
.await
.map_err(|_| {
error!(target: "lazymc::probe", "Probe tried to connect to server but timed out after {}s", PROBE_CONNECT_TIMEOUT.as_secs());
})?
}
/// Create connection to the server, with no timeout.
///
/// This will initialize the connection to the play state. Client details are used.
///
/// Returns recorded Forge login payload if any.
// TODO: clean this up
async fn connect_to_server_no_timeout(
config: &Config,
server: &Server,
) -> Result<Vec<Vec<u8>>, ()> {
// Open connection
// TODO: on connect fail, ping server and redirect to serve_status if offline
let mut outbound = TcpStream::connect(config.server.address)
.await
.map_err(|_| ())?;
// Construct temporary server client
let tmp_client = match outbound.local_addr() {
Ok(addr) => Client::new(addr),
Err(_) => Client::dummy(),
};
tmp_client.set_state(ClientState::Login);
let (mut reader, mut writer) = outbound.split();
// Select server address to use, add magic if Forge
let server_addr = if config.server.forge {
format!(
"{}{}",
config.server.address.ip().to_string(),
forge::STATUS_MAGIC,
)
} else {
config.server.address.ip().to_string()
};
// Send handshake packet
packet::write_packet(
Handshake {
protocol_version: config.public.protocol as i32,
server_addr,
server_port: config.server.address.port(),
next_state: ClientState::Login.to_id(),
},
&tmp_client,
&mut writer,
)
.await?;
// Request login start
packet::write_packet(
LoginStart {
name: PROBE_USER.into(),
},
&tmp_client,
&mut writer,
)
.await?;
// Incoming buffer, record Forge plugin request payload
let mut buf = BytesMut::new();
let mut forge_payload = Vec::new();
loop {
// Read packet from stream
let (packet, raw) = match packet::read_packet(&tmp_client, &mut buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => {
error!(target: "lazymc::forge", "Closing connection, error occurred");
break;
}
};
// Grab client state
let client_state = tmp_client.state();
// Catch set compression
if client_state == ClientState::Login && packet.id == packets::login::CLIENT_SET_COMPRESSION
{
// Decode compression packet
let set_compression =
SetCompression::decode(&mut packet.data.as_slice()).map_err(|_| ())?;
// Client and server compression threshold should match, show warning if not
if set_compression.threshold != proto::COMPRESSION_THRESHOLD {
error!(
target: "lazymc::forge",
"Compression threshold sent to lobby client does not match threshold from server, this may cause errors (client: {}, server: {})",
proto::COMPRESSION_THRESHOLD,
set_compression.threshold
);
}
// Set client compression
tmp_client.set_compression(set_compression.threshold);
continue;
}
// Catch login plugin request
if client_state == ClientState::Login
&& packet.id == packets::login::CLIENT_LOGIN_PLUGIN_REQUEST
{
// Decode login plugin request packet
let plugin_request = LoginPluginRequest::decode(&mut packet.data.as_slice()).map_err(|err| {
error!(target: "lazymc::probe", "Failed to decode login plugin request from server, cannot respond properly: {:?}", err);
})?;
// Handle plugin requests for Forge
if config.server.forge {
// Record Forge login payload
forge_payload.push(raw);
// Respond to Forge login plugin request
forge::respond_login_plugin_request(&tmp_client, plugin_request, &mut writer)
.await?;
continue;
}
warn!(target: "lazymc::probe", "Received unexpected login plugin request, responding with error");
// Respond with plugin response failure
packet::write_packet(
LoginPluginResponse {
message_id: plugin_request.message_id,
successful: false,
data: vec![],
},
&tmp_client,
&mut writer,
)
.await?;
continue;
}
// Hijack login success
if client_state == ClientState::Login && packet.id == packets::login::CLIENT_LOGIN_SUCCESS {
trace!(target: "lazymc::probe", "Received login success from server connection, change to play mode");
// Switch to play state
tmp_client.set_state(ClientState::Play);
// Wait to catch join game packet
let join_game = wait_for_server_join_game(&tmp_client, &mut outbound, &mut buf).await?;
server.probed_join_game.lock().await.replace(join_game);
// Gracefully close connection
let _ = net::close_tcp_stream(outbound).await;
return Ok(forge_payload);
}
// Show unhandled packet warning
debug!(target: "lazymc::forge", "Received unhandled packet from server in connect_to_server:");
debug!(target: "lazymc::forge", "- State: {:?}", client_state);
debug!(target: "lazymc::forge", "- Packet ID: 0x{:02X} ({})", packet.id, packet.id);
}
// Gracefully close connection
net::close_tcp_stream(outbound).await.map_err(|_| ())?;
Err(())
}
/// Wait for join game packet on server connection, with timeout.
///
/// This parses, consumes and returns the packet.
async fn wait_for_server_join_game(
client: &Client,
outbound: &mut TcpStream,
buf: &mut BytesMut,
) -> Result<JoinGame, ()> {
time::timeout(
PROBE_JOIN_GAME_TIMEOUT,
wait_for_server_join_game_no_timeout(client, outbound, buf),
)
.await
.map_err(|_| {
error!(target: "lazymc::probe", "Waiting for for game data from server for probe client timed out after {}s", PROBE_JOIN_GAME_TIMEOUT.as_secs());
})?
}
/// Wait for join game packet on server connection, with no timeout.
///
/// This parses, consumes and returns the packet.
// TODO: clean this up
// TODO: do not drop error here, return Box<dyn Error>
async fn wait_for_server_join_game_no_timeout(
client: &Client,
outbound: &mut TcpStream,
buf: &mut BytesMut,
) -> Result<JoinGame, ()> {
let (mut reader, mut _writer) = outbound.split();
loop {
// Read packet from stream
let (packet, _raw) = match packet::read_packet(client, buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => {
error!(target: "lazymc::lobby", "Closing connection, error occurred");
break;
}
};
// Catch join game
if packet.id == packets::play::CLIENT_JOIN_GAME {
let join_game = JoinGame::decode(&mut packet.data.as_slice()).map_err(|err| {
error!(target: "lazymc::probe", "Failed to decode join game packet, ignoring: {:?}", err);
})?;
return Ok(join_game);
}
// Show unhandled packet warning
debug!(target: "lazymc::lobby", "Received unhandled packet from server in wait_for_server_join_game:");
debug!(target: "lazymc::lobby", "- Packet ID: 0x{:02X} ({})", packet.id, packet.id);
}
// Gracefully close connection
net::close_tcp_stream_ref(outbound).await.map_err(|_| ())?;
Err(())
}

View File

@@ -1,3 +1,4 @@
use std::fmt::Debug;
use std::io::prelude::*;
use bytes::BytesMut;
@@ -44,11 +45,22 @@ impl RawPacket {
///
/// This decodes both compressed and uncompressed packets based on the client threshold
/// preference.
pub fn decode(client: &Client, mut buf: &[u8]) -> Result<Self, ()> {
pub fn decode_with_len(client: &Client, mut buf: &[u8]) -> Result<Self, ()> {
// Read length
let (read, len) = types::read_var_int(buf)?;
buf = &buf[read..][..len as usize];
// TODO: assert buffer length!
Self::decode_without_len(client, buf)
}
/// Decode packet from raw buffer without packet length.
///
/// This decodes both compressed and uncompressed packets based on the client threshold
/// preference.
/// The length is given, and not included in the buffer itself.
pub fn decode_without_len(client: &Client, mut buf: &[u8]) -> Result<Self, ()> {
// If no compression is used, read remaining packet ID and data
if !client.is_compressed() {
// Read packet ID and data
@@ -85,7 +97,20 @@ impl RawPacket {
/// Encode packet to raw buffer.
///
/// This compresses packets based on the client threshold preference.
pub fn encode(&self, client: &Client) -> Result<Vec<u8>, ()> {
pub fn encode_with_len(&self, client: &Client) -> Result<Vec<u8>, ()> {
// Encode packet without length
let mut payload = self.encode_without_len(client)?;
// Add length header
let mut packet = types::encode_var_int(payload.len() as i32)?;
packet.append(&mut payload);
Ok(packet)
}
/// Encode packet to raw buffer without length header.
///
/// This compresses packets based on the client threshold preference.
pub fn encode_without_len(&self, client: &Client) -> Result<Vec<u8>, ()> {
let threshold = client.compressed();
if threshold >= 0 {
self.encode_compressed(threshold)
@@ -103,8 +128,7 @@ impl RawPacket {
// Determine whether to compress, encode data length bytes
let data_len = payload.len() as i32;
let compress = data_len > threshold;
let mut data_len_bytes =
types::encode_var_int(if compress { data_len } else { 0 }).unwrap();
let data_len_header = if compress { data_len } else { 0 };
// Compress payload
if compress {
@@ -117,10 +141,8 @@ impl RawPacket {
})?;
}
// Encapsulate payload with packet and data length
let len = data_len_bytes.len() as i32 + payload.len() as i32;
let mut packet = types::encode_var_int(len)?;
packet.append(&mut data_len_bytes);
// Add data length header
let mut packet = types::encode_var_int(data_len_header).unwrap();
packet.append(&mut payload);
Ok(packet)
@@ -128,12 +150,8 @@ impl RawPacket {
/// Encode uncompressed packet to raw buffer.
fn encode_uncompressed(&self) -> Result<Vec<u8>, ()> {
let mut data = types::encode_var_int(self.id as i32)?;
data.extend_from_slice(&self.data);
let len = data.len() as i32;
let mut packet = types::encode_var_int(len)?;
packet.append(&mut data);
let mut packet = types::encode_var_int(self.id as i32)?;
packet.extend_from_slice(&self.data);
Ok(packet)
}
@@ -194,22 +212,23 @@ pub async fn read_packet(
}
// Parse packet, use full buffer since we'll read the packet length again
// TODO: use decode_without_len, strip len from buffer
let raw = buf.split_to(consumed + len as usize);
let packet = RawPacket::decode(client, &raw)?;
let packet = RawPacket::decode_with_len(client, &raw)?;
Ok(Some((packet, raw.to_vec())))
}
/// Write packet to stream writer.
pub async fn write_packet(
packet: impl PacketId + Encoder,
packet: impl PacketId + Encoder + Debug,
client: &Client,
writer: &mut WriteHalf<'_>,
) -> Result<(), ()> {
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(packet.packet_id(), data).encode(client)?;
let response = RawPacket::new(packet.packet_id(), data).encode_with_len(client)?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())

View File

@@ -17,7 +17,9 @@ pub mod login {
pub const CLIENT_DISCONNECT: u8 = 0x00;
pub const CLIENT_LOGIN_SUCCESS: u8 = 0x02;
pub const CLIENT_SET_COMPRESSION: u8 = 0x03;
pub const CLIENT_LOGIN_PLUGIN_REQUEST: u8 = 0x04;
pub const SERVER_LOGIN_START: u8 = 0x00;
pub const SERVER_LOGIN_PLUGIN_RESPONSE: u8 = 0x02;
}
pub mod play {
@@ -35,7 +37,18 @@ pub mod play {
pub const CLIENT_SET_TITLE_TEXT: u8 = 0x59;
pub const CLIENT_SET_TITLE_TIMES: u8 = 0x5A;
pub const SERVER_CLIENT_SETTINGS: u8 = 0x05;
pub const SERVER_PLUGIN_MESSAGE: u8 = 0x0A;
// TODO: update
pub const SERVER_PLUGIN_MESSAGE: u8 = 0x0B; //0A
pub const SERVER_PLAYER_POS: u8 = 0x11;
pub const SERVER_PLAYER_POS_ROT: u8 = 0x12;
}
pub mod forge {
pub mod login {
pub const CLIENT_MOD_LIST: u8 = 1;
pub const CLIENT_SERVER_REGISTRY: u8 = 3;
pub const CLIENT_CONFIG_DATA: u8 = 4;
pub const SERVER_MOD_LIST_REPLY: u8 = 2;
pub const SERVER_ACKNOWLEDGEMENT: u8 = 99;
}
}

View File

@@ -5,6 +5,7 @@ use std::time::{Duration, Instant};
use futures::FutureExt;
use minecraft_protocol::data::server_status::ServerStatus;
use minecraft_protocol::version::v1_17_1::game::JoinGame;
use tokio::process::Command;
use tokio::sync::watch;
#[cfg(feature = "rcon")]
@@ -79,6 +80,11 @@ pub struct Server {
/// Last time server was stopped over RCON.
#[cfg(feature = "rcon")]
rcon_last_stop: Mutex<Option<Instant>>,
// TODO: dont use mutex, do not make public, dont use bytesmut
pub forge_payload: Mutex<Vec<Vec<u8>>>,
// TODO: dont use mutex, do not make public, dont use bytesmut
pub probed_join_game: Mutex<Option<JoinGame>>,
}
impl Server {
@@ -366,6 +372,8 @@ impl Default for Server {
rcon_lock: Semaphore::new(1),
#[cfg(feature = "rcon")]
rcon_last_stop: Default::default(),
forge_payload: Default::default(),
probed_join_game: Default::default(),
}
}
}

View File

@@ -1,4 +1,5 @@
pub mod ban_reload;
pub mod monitor;
pub mod probe;
pub mod server;
pub mod signal;

23
src/service/probe.rs Normal file
View File

@@ -0,0 +1,23 @@
use std::sync::Arc;
use crate::config::Config;
use crate::probe;
use crate::server::Server;
/// Probe server.
pub async fn service(config: Arc<Config>, state: Arc<Server>) {
// Only probe if Forge is enabled
// TODO: do more comprehensive check for probe, only with forge and lobby?
// TODO: add config option to probe on start
if !config.server.forge {
return;
}
// Probe
match probe::probe(config, state).await {
Ok(_) => info!(target: "lazymc::probe", "Succesfully probed server"),
Err(_) => {
error!(target: "lazymc::probe", "Failed to probe server, this may limit lazymc features")
}
}
}

View File

@@ -46,9 +46,10 @@ pub async fn service(config: Arc<Config>) -> Result<(), ()> {
);
}
// Spawn server monitor and signal handler services
// Spawn server monitor, signal handler, probe and ban services
tokio::spawn(service::monitor::service(config.clone(), server.clone()));
tokio::spawn(service::signal::service(config.clone(), server.clone()));
tokio::spawn(service::probe::service(config.clone(), server.clone()));
tokio::task::spawn_blocking({
let (config, server) = (config.clone(), server.clone());
|| service::ban_reload::service(config, server)

View File

@@ -103,7 +103,7 @@ pub async fn serve(
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(0, data).encode(&client)?;
let response = RawPacket::new(0, data).encode_with_len(&client)?;
writer.write_all(&response).await.map_err(|_| ())?;
continue;