Improve lobby handling, implement teleport and proxy to real server

This commit is contained in:
timvisee 2021-11-13 22:36:46 +01:00
parent e01fd212f7
commit 518fca90eb
No known key found for this signature in database
GPG Key ID: B8DB720BC383E172
5 changed files with 1448 additions and 161 deletions

4
Cargo.lock generated
View File

@ -684,7 +684,7 @@ checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
[[package]]
name = "minecraft-protocol"
version = "0.1.0"
source = "git+https://github.com/timvisee/rust-minecraft-protocol?branch=lazymc-v1_17_1#d26a525c7b29b61d2db64805181fb5471ea4317a"
source = "git+https://github.com/timvisee/rust-minecraft-protocol?branch=lazymc-v1_17_1#a4fc2bcf7bced11fa255c371d8d5c780157ecbb4"
dependencies = [
"byteorder",
"minecraft-protocol-derive",
@ -697,7 +697,7 @@ dependencies = [
[[package]]
name = "minecraft-protocol-derive"
version = "0.0.0"
source = "git+https://github.com/timvisee/rust-minecraft-protocol?branch=lazymc-v1_17_1#d26a525c7b29b61d2db64805181fb5471ea4317a"
source = "git+https://github.com/timvisee/rust-minecraft-protocol?branch=lazymc-v1_17_1#a4fc2bcf7bced11fa255c371d8d5c780157ecbb4"
dependencies = [
"proc-macro2",
"quote",

953
src/lobby.backup.rs Normal file
View File

@ -0,0 +1,953 @@
// TODO: remove this before feature release!
#![allow(unused)]
use std::sync::Arc;
use std::time::{Duration, Instant};
use bytes::BytesMut;
use futures::FutureExt;
use minecraft_protocol::data::chat::{Message, Payload};
use minecraft_protocol::data::server_status::*;
use minecraft_protocol::decoder::Decoder;
use minecraft_protocol::encoder::Encoder;
use minecraft_protocol::version::v1_14_4::game::{GameMode, MessagePosition};
use minecraft_protocol::version::v1_14_4::handshake::Handshake;
use minecraft_protocol::version::v1_14_4::login::{LoginDisconnect, LoginStart, LoginSuccess};
use minecraft_protocol::version::v1_14_4::status::StatusResponse;
use minecraft_protocol::version::v1_17_1::game::{
ChunkData, ClientBoundChatMessage, ClientBoundKeepAlive, GameDisconnect, JoinGame,
PlayerPositionAndLook, Respawn, SetTitleSubtitle, SetTitleText, SetTitleTimes, SpawnPosition,
TimeUpdate,
};
use nbt::CompoundTag;
use tokio::io::{self, AsyncWriteExt};
use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use tokio::time;
use uuid::Uuid;
use crate::config::*;
use crate::proto::{self, Client, ClientState, RawPacket};
use crate::proxy;
use crate::server::{self, Server, State};
use crate::service;
// TODO: remove this before releasing feature
pub const USE_LOBBY: bool = true;
pub const DONT_START_SERVER: bool = false;
const STARTING_BANNER: &str = "§2Server is starting";
const STARTING_BANNER_SUB: &str = "§7⌛ Please wait...";
/// Client holding server state poll interval.
const HOLD_POLL_INTERVAL: Duration = Duration::from_secs(1);
// TODO: do not drop error here, return Box<dyn Error>
pub async fn serve(
client: Client,
mut inbound: TcpStream,
config: Arc<Config>,
server: Arc<Server>,
queue: BytesMut,
) -> Result<(), ()> {
let (mut reader, mut writer) = inbound.split();
// TODO: note this assumes the first receiving packet (over queue) is login start
// TODO: assert client is in login mode!
// Incoming buffer and packet holding queue
let mut buf = queue;
let mut server_queue = BytesMut::new();
loop {
// Read packet from stream
let (packet, raw) = match proto::read_packet(&mut buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => {
error!(target: "lazymc", "Closing connection, error occurred");
break;
}
};
// Grab client state
let client_state = client.state();
// Hijack login start
if client_state == ClientState::Login && packet.id == proto::LOGIN_PACKET_ID_LOGIN_START {
// Try to get login username
let login_start = LoginStart::decode(&mut packet.data.as_slice()).map_err(|_| ())?;
// TODO: remove debug message
debug!(target: "LOBBY", "Login {:?}", login_start.name);
// Respond with login success
let packet = LoginSuccess {
// TODO: use correct username here
uuid: Uuid::new_v3(
&Uuid::new_v3(&Uuid::NAMESPACE_OID, b"OfflinePlayer"),
login_start.name.as_bytes(),
),
username: login_start.name,
// uuid: Uuid::parse_str("35ee313b-d89a-41b8-b25e-d32e8aff0389").unwrap(),
// username: "Username".into(),
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::LOGIN_PACKET_ID_LOGIN_SUCCESS, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
// Update client state to play
client.set_state(ClientState::Play);
// TODO: remove debug message
debug!(target: "LOBBY", "Sent login success, moving to play state");
// TODO: handle errors here
play_packets(&mut writer).await;
send_keep_alive(&mut writer).await?;
send_title(&mut writer).await?;
// Wait for server to come online
wait_for_server(config.clone(), server.clone(), &mut writer).await?;
// Connect to server
let (mut outbound, client_queue) = connect_to_server(client, &config, server).await?;
// Wait for join game packet
// TODO: do something with this excess buffer
let (join_game, client_buf) = wait_for_server_join_game(&mut outbound).await?;
dbg!(join_game.entity_id);
send_title_reset(&mut writer).await?;
// Send respawn apcket
send_respawn(&mut writer, join_game.clone()).await?;
send_respawn(&mut writer, join_game).await?;
if !client_buf.is_empty() {
// TODO: handle this
// TODO: remove error message
error!(target: "LOBBY", "Got excess data from server for client! ({} bytes)", client_buf.len());
}
// Drain inbound
// TODO: do not drain, send packets to server, except keep-alive
drain_stream(&mut reader).await?;
// TODO: route any following packets to client
// Wait a little because Notchian servers are slow
// See: https://wiki.vg/Protocol#Login_Success
// TODO: improve this
// debug!(target: "LOBBY", "Waiting 2 sec for server");
// time::sleep(Duration::from_secs(2)).await;
debug!(target: "LOBBY", "Moving client to proxy");
route_proxy_queue(inbound, outbound, config, client_queue, server_queue);
return Ok(());
}
if client_state == ClientState::Play
&& packet.id == proto::packets::play::SERVER_CLIENT_SETTINGS
{
debug!(target: "LOBBY", "Ignoring client settings packet");
continue;
}
if client_state == ClientState::Play
&& packet.id == proto::packets::play::SERVER_PLUGIN_MESSAGE
{
debug!(target: "LOBBY", "Ignoring plugin message packet");
continue;
}
if client_state == ClientState::Play
&& packet.id == proto::packets::play::SERVER_PLAYER_POS_ROT
{
debug!(target: "LOBBY", "Ignoring player pos rot packet");
continue;
}
if client_state == ClientState::Play && packet.id == proto::packets::play::SERVER_PLAYER_POS
{
debug!(target: "LOBBY", "Ignoring player pos packet");
continue;
}
// Show unhandled packet warning
debug!(target: "lazymc", "Received unhandled packet:");
debug!(target: "lazymc", "- State: {:?}", client_state);
debug!(target: "lazymc", "- Packet ID: 0x{:02X} ({})", packet.id, packet.id);
}
// Gracefully close connection
match writer.shutdown().await {
Ok(_) => {}
Err(err) if err.kind() == io::ErrorKind::NotConnected => {}
Err(_) => return Err(()),
}
Ok(())
}
/// Kick client with a message.
///
/// Should close connection afterwards.
async fn kick(msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> {
let packet = LoginDisconnect {
reason: Message::new(Payload::text(msg)),
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::LOGIN_PACKET_ID_DISCONNECT, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())
}
async fn play_packets(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
debug!(target: "LOBBY", "Send play packets");
// See: https://wiki.vg/Protocol_FAQ#What.27s_the_normal_login_sequence_for_a_client.3F
// Send game join
send_join_game(writer).await?;
// TODO: send brand plugin message
// After this, we receive:
// - PLAY_PAKCET_ID_CLIENT_SETTINGS
// - PLAY_PAKCET_ID_PLUGIN_MESSAGE
// - PLAY_PAKCET_ID_PLAYER_POS_ROT
// - PLAY_PAKCET_ID_PLAYER_POS ...
// TODO: send Update View Position ?
// TODO: send Update View Distance ?
// Send chunk data
// TODO: send_chunk_data(writer).await?;
// TODO: probably not required
send_spawn_pos(writer).await?;
// Send player location, disables download terrain screen
send_player_pos(writer).await?;
// TODO: send Update View Position
// TODO: send Spawn Position
// TODO: send Position and Look (one more time)
// Send time update
send_time_update(writer).await?;
// Keep sending keep alive packets
send_keep_alive(writer).await?;
Ok(())
}
async fn send_join_game(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
// // TODO: use proper values here!
// let packet = JoinGame {
// // entity_id: 0,
// // game_mode: GameMode::Spectator,
// entity_id: 27,
// game_mode: GameMode::Hardcore,
// dimension: 23,
// max_players: 100,
// level_type: String::from("default"),
// view_distance: 10,
// reduced_debug_info: true,
// };
// TODO: use proper values here!
let packet = JoinGame {
// entity_id: 0x6d,
// ID must not collide with anything existing
// entity_id: 1337133700,
entity_id: 0,
hardcore: false,
game_mode: 3,
previous_game_mode: -1i8 as u8, // use -1i8 as u8?
world_names: vec![
"minecraft:overworld".into(),
"minecraft:the_nether".into(),
"minecraft:the_end".into(),
],
dimension_codec: snbt_to_compound_tag(include_str!("../res/dimension_codec.snbt")),
dimension: snbt_to_compound_tag(include_str!("../res/dimension.snbt")),
// TODO: is this ok?
// world_name: "minecraft:overworld".into(),
world_name: "lazymc:lobby".into(),
hashed_seed: 0,
max_players: 20,
view_distance: 10,
reduced_debug_info: true,
enable_respawn_screen: false,
is_debug: false,
is_flat: false,
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_JOIN_GAME, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
}
async fn send_respawn(writer: &mut WriteHalf<'_>, join_game: JoinGame) -> Result<(), ()> {
// TODO: use proper values here!
let packet = Respawn {
dimension: join_game.dimension,
world_name: join_game.world_name,
hashed_seed: join_game.hashed_seed,
game_mode: join_game.game_mode,
previous_game_mode: join_game.previous_game_mode,
is_debug: join_game.is_debug,
is_flat: join_game.is_flat,
copy_metadata: false,
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_RESPAWN, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
}
// // TODO: this is possibly broken?
// async fn send_chunk_data(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
// // Send player location, disables download terrain screen
// let packet = ChunkData {
// x: 0,
// z: 0,
// primary_mask: Vec::new(),
// heightmaps: CompoundTag::named("HeightMaps"),
// biomes: Vec::new(),
// data_size: 0,
// data: Vec::new(),
// block_entities_size: 0,
// block_entities: Vec::new(),
// // primary_mask: 65535,
// // heights: CompoundTag::named("HeightMaps"),
// // data: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
// // tiles: vec![CompoundTag::named("TileEntity")],
// };
// let mut data = Vec::new();
// packet.encode(&mut data).map_err(|_| ())?;
// let response = RawPacket::new(proto::CLIENT_CHUNK_DATA, data).encode()?;
// writer.write_all(&response).await.map_err(|_| ())?;
// Ok(())
// }
async fn send_spawn_pos(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
let packet = SpawnPosition {
position: 0,
angle: 0.0,
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_SPAWN_POS, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
}
async fn send_player_pos(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
// Send player location, disables download terrain screen
let packet = PlayerPositionAndLook {
x: 0.0,
y: 0.0,
z: 0.0,
yaw: 0.0,
pitch: 90.0,
flags: 0b00000000,
teleport_id: 0,
dismount_vehicle: true,
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_PLAYER_POS_LOOK, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
}
async fn send_time_update(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
const MC_TIME_NOON: i64 = 6000;
// Send player location, disables download terrain screen
let packet = TimeUpdate {
world_age: MC_TIME_NOON,
time_of_day: MC_TIME_NOON,
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_TIME_UPDATE, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
}
async fn send_keep_alive(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
// Send player location, disables download terrain screen
// TODO: keep picking random ID!
let packet = ClientBoundKeepAlive { id: 0 };
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_KEEP_ALIVE, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
// TODO: require to receive correct keepalive!
Ok(())
}
async fn send_keep_alive_loop(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
// TODO: use interval of 10 sec?
let mut poll_interval = time::interval(Duration::from_secs(10));
loop {
// TODO: wait for start signal over channel instead of polling
poll_interval.tick().await;
debug!(target: "LOBBY", "Sending keep-alive to client");
send_keep_alive(writer).await?;
}
Ok(())
}
async fn send_title(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
// Set title
let packet = SetTitleText {
text: Message::new(Payload::text(STARTING_BANNER)),
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_TEXT, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
// Set subtitle
let packet = SetTitleSubtitle {
text: Message::new(Payload::text(STARTING_BANNER_SUB)),
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response =
RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_SUBTITLE, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
// Set times
// TODO: do not make this longer than 2x keep alive packet interval
let packet = SetTitleTimes {
fade_in: 0,
stay: i32::MAX,
fade_out: 0,
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_TIMES, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
}
async fn send_title_reset(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
// Set title
let packet = SetTitleText {
text: Message::new(Payload::text("")),
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_TEXT, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
// Set subtitle
let packet = SetTitleSubtitle {
text: Message::new(Payload::text("")),
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response =
RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_SUBTITLE, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
// Set times
// TODO: do not make this longer than 2x keep alive packet interval
let packet = SetTitleTimes {
fade_in: 10,
stay: 100,
fade_out: 10,
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_TIMES, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
}
// TODO: go through this, use proper error messages
pub async fn wait_for_server<'a>(
config: Arc<Config>,
server: Arc<Server>,
writer: &mut WriteHalf<'a>,
) -> Result<(), ()> {
debug!(target: "lazymc", "Waiting on server...");
// Set up polling interval, get timeout
let mut poll_interval = time::interval(HOLD_POLL_INTERVAL);
let since = Instant::now();
let timeout = config.time.hold_client_for as u64;
loop {
// TODO: wait for start signal over channel instead of polling
poll_interval.tick().await;
trace!("Poloutboundng server state for holding client...");
// TODO: shouldn't do this here
send_keep_alive(writer).await?;
match server.state() {
// Still waiting on server start
State::Starting => {
trace!(target: "lazymc", "Server not ready, holding client for longer");
// TODO: timeout
// // If hold timeout is reached, kick client
// if since.elapsed().as_secs() >= timeout {
// warn!(target: "lazymc", "Held client reached timeout of {}s, disconnecting", timeout);
// kick(&config.messages.login_starting, &mut inbound.split().1).await?;
// return Ok(());
// }
continue;
}
// Server started, start relaying and proxy
State::Started => {
// TODO: drop client if already disconnected
// // Relay client to proxy
// info!(target: "lazymc", "Server ready for held client, relaying to server");
// service::server::route_proxy_queue(inbound, config, hold_queue);
info!(target: "lazymc", "Server ready for lobby client, connecting to server");
return Ok(());
}
// Server stopping, this shouldn't happen, kick
State::Stopping => {
// TODO: kick message
// warn!(target: "lazymc", "Server stopping for held client, disconnecting");
// kick(&config.messages.login_stopping, &mut inbound.split().1).await?;
break;
}
// Server stopped, this shouldn't happen, disconnect
State::Stopped => {
error!(target: "lazymc", "Server stopped for held client, disconnecting");
break;
}
}
}
Err(())
}
pub async fn connect_to_server(
real_client: Client,
config: &Config,
server: Arc<Server>,
) -> Result<(TcpStream, BytesMut), ()> {
// Set up connection to server
// TODO: on connect fail, ping server and redirect to serve_status if offline
let mut outbound = TcpStream::connect(config.server.address)
.await
.map_err(|_| ())?;
let (mut reader, mut writer) = outbound.split();
let tmp_client = Client::default();
tmp_client.set_state(ClientState::Login);
// TODO: use client version
let packet = Handshake {
protocol_version: 755,
server_addr: config.server.address.ip().to_string(),
server_port: config.server.address.port(),
next_state: ClientState::Login.to_id(),
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let request = RawPacket::new(proto::HANDSHAKE_PACKET_ID_HANDSHAKE, data).encode()?;
writer.write_all(&request).await.map_err(|_| ())?;
// Request login start
// TODO: use client username
let packet = LoginStart {
name: "timvisee".into(),
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let request = RawPacket::new(proto::LOGIN_PACKET_ID_LOGIN_START, data).encode()?;
writer.write_all(&request).await.map_err(|_| ())?;
// # Wait for server responses
// Incoming buffer and packet holding queue
let mut buf = BytesMut::new();
let mut client_queue = BytesMut::new();
let mut server_queue = BytesMut::new();
loop {
// Read packet from stream
let (packet, raw) = match proto::read_packet(&mut buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => {
error!(target: "lazymc", "Closing connection, error occurred");
break;
}
};
// Grab client state
let client_state = tmp_client.state();
// Hijack login success
if client_state == ClientState::Login && packet.id == proto::LOGIN_PACKET_ID_LOGIN_SUCCESS {
debug!(target: "LOBBY", "Login success received");
// TODO: fix reading this packet
// let login_success =
// LoginSuccess::decode(&mut packet.data.as_slice()).map_err(|err| {
// dbg!(err);
// ()
// })?;
// // TODO: remove debug message
// debug!(target: "LOBBY", "Login success: {:?}", login_success.username);
// Switch to play state
tmp_client.set_state(ClientState::Play);
debug!(target: "LOBBY", "Server connection ready");
// TODO: also return buf!
assert!(
buf.is_empty(),
"server incomming buf not empty, will lose data"
);
return Ok((outbound, client_queue));
}
// // Hijack join game
// if client_state == ClientState::Play && packet.id == proto::packets::play::CLIENT_JOIN_GAME
// {
// // We must receive join game packet
// // TODO: also parse packet!
// // TODO: remove debug message
// debug!(target: "LOBBY", "Received join packet, will relay to client");
// // TODO: send join game packet?
// debug!(target: "LOBBY", "client_queue + {} bytes", raw.len());
// client_queue.extend(raw);
// // TODO: real client and tmp_client must match state
// return Ok((outbound, client_queue));
// }
// TODO: // Hijack join game
// TODO: // TODO: client_state == ClientState::Play &&
// TODO: if packet.id == proto::packets::play::CLIENT_JOIN_GAME {
// TODO: let join_game = JoinGame::decode(&mut packet.data.as_slice()).map_err(|_| ())?;
// TODO: // TODO: remove debug message
// TODO: debug!(target: "LOBBY", "GOT JOIN GAME!");
// TODO: continue;
// TODO: }
// // Hijack disconnect message
// // TODO: remove this?
// if packet.id == proto::packets::play::CLIENT_DISCONNECT {
// let disconnect = GameDisconnect::decode(&mut packet.data.as_slice()).map_err(|_| ())?;
// debug!(target: "LOBBY", "DISCONNECT REASON: {:?}", disconnect.reason);
// continue;
// }
// // Grab client state
// let client_state = client.state();
// // Hijack login start
// if client_state == ClientState::Login && packet.id == proto::LOGIN_PACKET_ID_LOGIN_START {
// // Try to get login username
// let login_start = LoginStart::decode(&mut packet.data.as_slice()).map_err(|_| ())?;
// // TODO: remove debug message
// debug!(target: "LOBBY", "Login {:?}", login_start.name);
// // Respond with login success
// let packet = LoginSuccess {
// // TODO: use correct username here
// uuid: Uuid::new_v3(
// &Uuid::new_v3(&Uuid::NAMESPACE_OID, b"OfflinePlayer"),
// login_start.name.as_bytes(),
// ),
// username: login_start.name,
// // uuid: Uuid::parse_str("35ee313b-d89a-41b8-b25e-d32e8aff0389").unwrap(),
// // username: "Username".into(),
// };
// let mut data = Vec::new();
// packet.encode(&mut data).map_err(|_| ())?;
// let response = RawPacket::new(proto::LOGIN_PACKET_ID_LOGIN_SUCCESS, data).encode()?;
// writer.write_all(&response).await.map_err(|_| ())?;
// // Update client state to play
// client.set_state(ClientState::Play);
// // TODO: remove debug message
// debug!(target: "LOBBY", "Sent login success, moving to play state");
// // TODO: handle errors here
// play_packets(&mut writer).await;
// send_keep_alive(&mut writer).await?;
// // Wait for server to come online
// wait_for_server(config.clone(), server.clone()).await?;
// // Connect to server
// connect_to_server(client, config, server).await?;
// // Keep sending keep alive packets
// debug!(target: "LOBBY", "Keep sending keep-alive now...");
// send_keep_alive_loop(&mut writer).await?;
// debug!(target: "LOBBY", "Done with playing packets, client disconnect?");
// return Ok(());
// }
// if client_state == ClientState::Play
// && packet.id == proto::packets::play::SERVER_CLIENT_SETTINGS
// {
// debug!(target: "LOBBY", "Ignoring client settings packet");
// continue;
// }
// if client_state == ClientState::Play
// && packet.id == proto::packets::play::SERVER_PLUGIN_MESSAGE
// {
// debug!(target: "LOBBY", "Ignoring plugin message packet");
// continue;
// }
// if client_state == ClientState::Play
// && packet.id == proto::packets::play::SERVER_PLAYER_POS_ROT
// {
// debug!(target: "LOBBY", "Ignoring player pos rot packet");
// continue;
// }
// if client_state == ClientState::Play && packet.id == proto::packets::play::SERVER_PLAYER_POS
// {
// debug!(target: "LOBBY", "Ignoring player pos packet");
// continue;
// }
// Show unhandled packet warning
debug!(target: "lazymc", "Received unhandled packet:");
debug!(target: "lazymc", "- State: {:?}", client_state);
debug!(target: "lazymc", "- Packet ID: 0x{:02X} ({})", packet.id, packet.id);
}
// TODO: we should receive login success from server
// TODO: we should receive join game packet, relay this to client
// // Gracefully close connection
// match writer.shutdown().await {
// Ok(_) => {}
// Err(err) if err.kind() == io::ErrorKind::NotConnected => {}
// Err(_) => return Err(()),
// }
// We only reach this on errors
// TODO: do we actually ever reach this?
Err(())
}
// TODO: remove unused fields
// TODO: do not drop error here, return Box<dyn Error>
// TODO: add timeout
pub async fn wait_for_server_join_game(
// client: Client,
mut outbound: &mut TcpStream,
// config: Arc<Config>,
// server: Arc<Server>,
// queue: BytesMut,
) -> Result<(JoinGame, BytesMut), ()> {
let (mut reader, mut writer) = outbound.split();
// TODO: note this assumes the first receiving packet (over queue) is login start
// TODO: assert client is in login mode!
// Incoming buffer and packet holding queue
let mut buf = BytesMut::new();
loop {
// Read packet from stream
let (packet, raw) = match proto::read_packet(&mut buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => {
error!(target: "lazymc", "Closing connection, error occurred");
break;
}
};
// Hijack login start
if packet.id == proto::packets::play::CLIENT_JOIN_GAME {
let join_game = JoinGame::decode(&mut packet.data.as_slice()).map_err(|err| {
// TODO: remove this debug
dbg!(err);
()
})?;
// TODO: remove debug message
debug!(target: "LOBBY", "GOT JOIN FROM SERVER");
return Ok((join_game, buf));
}
// Show unhandled packet warning
debug!(target: "lazymc", "Received unhandled packet:");
// debug!(target: "lazymc", "- State: {:?}", client_state);
debug!(target: "lazymc", "- Packet ID: 0x{:02X} ({})", packet.id, packet.id);
}
// Gracefully close connection
match writer.shutdown().await {
Ok(_) => {}
Err(err) if err.kind() == io::ErrorKind::NotConnected => {}
Err(_) => return Err(()),
}
// TODO: will we ever reach this?
Err(())
}
// TODO: update name and description
/// Route inbound TCP stream to proxy with queued data, spawning a new task.
#[inline]
pub fn route_proxy_queue(
inbound: TcpStream,
outbound: TcpStream,
config: Arc<Config>,
client_queue: BytesMut,
server_queue: BytesMut,
) {
// When server is online, proxy all
let service = async move {
proxy::proxy_inbound_outbound_with_queue(inbound, outbound, &client_queue, &server_queue)
.map(|r| {
if let Err(err) = r {
warn!(target: "lazymc", "Failed to proxy: {}", err);
}
// TODO: remove after debug
debug!(target: "LOBBY", "Done with playing packets, client disconnect?");
})
.await
};
tokio::spawn(service);
}
// TODO: go through this, use proper error messages
pub async fn drain_stream<'a>(reader: &mut ReadHalf<'a>) -> Result<(), ()> {
// TODO: remove after debug
debug!(target: "lazymc", "Draining stream...");
// TODO: use other size?
let mut drain_buf = [0; 1024];
loop {
match reader.try_read(&mut drain_buf) {
// TODO: stop if read < drain_buf.len() ?
Ok(read) if read == 0 => return Ok(()),
Ok(read) => continue,
Err(err) => {
// TODO: remove after debug
dbg!("drain err", err);
return Ok(());
}
}
}
}
/// Read NBT CompoundTag from SNBT.
fn snbt_to_compound_tag(data: &str) -> CompoundTag {
use nbt::decode::read_compound_tag;
use quartz_nbt::io::{self, Flavor};
use quartz_nbt::snbt;
use std::io::Cursor;
// Parse SNBT data
let compound = snbt::parse(data).expect("failed to parse SNBT");
// Encode to binary
let mut binary = Vec::new();
io::write_nbt(&mut binary, None, &compound, Flavor::Uncompressed);
// Parse binary with usable NBT create
read_compound_tag(&mut &*binary).unwrap()
}

View File

@ -1,10 +1,12 @@
// TODO: remove this before feature release!
#![allow(unused)]
use std::io::ErrorKind;
use std::sync::Arc;
use std::time::{Duration, Instant};
use bytes::BytesMut;
use futures::FutureExt;
use minecraft_protocol::data::chat::{Message, Payload};
use minecraft_protocol::data::server_status::*;
use minecraft_protocol::decoder::Decoder;
@ -14,28 +16,41 @@ use minecraft_protocol::version::v1_14_4::handshake::Handshake;
use minecraft_protocol::version::v1_14_4::login::{LoginDisconnect, LoginStart, LoginSuccess};
use minecraft_protocol::version::v1_14_4::status::StatusResponse;
use minecraft_protocol::version::v1_17_1::game::{
ChunkData, ClientBoundChatMessage, ClientBoundKeepAlive, JoinGame, PlayerPositionAndLook,
SetTitleSubtitle, SetTitleText, SetTitleTimes, SpawnPosition, TimeUpdate,
ChunkData, ClientBoundChatMessage, ClientBoundKeepAlive, GameDisconnect, JoinGame,
PlayerPositionAndLook, Respawn, SetTitleSubtitle, SetTitleText, SetTitleTimes, SpawnPosition,
TimeUpdate,
};
use nbt::CompoundTag;
use tokio::io::{self, AsyncWriteExt};
use tokio::net::tcp::WriteHalf;
use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use tokio::select;
use tokio::time;
use uuid::Uuid;
use crate::config::*;
use crate::proto::{self, Client, ClientState, RawPacket};
use crate::proxy;
use crate::server::{self, Server, State};
use crate::service;
// TODO: remove this before releasing feature
pub const USE_LOBBY: bool = true;
pub const DONT_START_SERVER: bool = true;
const STARTING_BANNER: &str = "§2 Server is starting...";
const STARTING_BANNER_SUB: &str = "§7⌛ Please wait...";
pub const DONT_START_SERVER: bool = false;
const STARTING_BANNER: &str = "§2Server is starting\n§7⌛ Please wait...";
const HOLD_POLL_INTERVAL: Duration = Duration::from_secs(1);
/// Interval for server state polling when waiting on server to come online.
const SERVER_POLL_INTERVAL: Duration = Duration::from_millis(500);
/// Interval to send keep-alive packets at.
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10);
/// Minecraft ticks per second.
const TICKS_PER_SECOND: u32 = 20;
// TODO: do not drop error here, return Box<dyn Error>
// TODO: on error, nicely kick client with message
pub async fn serve(
client: Client,
mut inbound: TcpStream,
@ -49,12 +64,12 @@ pub async fn serve(
// TODO: assert client is in login mode!
// Incoming buffer and packet holding queue
let mut buf = queue;
let mut hold_queue = BytesMut::new();
let mut inbound_buf = queue;
let mut server_queue = BytesMut::new();
loop {
// Read packet from stream
let (packet, raw) = match proto::read_packet(&mut buf, &mut reader).await {
let (packet, raw) = match proto::read_packet(&mut inbound_buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => {
@ -68,68 +83,88 @@ pub async fn serve(
// Hijack login start
if client_state == ClientState::Login && packet.id == proto::LOGIN_PACKET_ID_LOGIN_START {
// Try to get login username
// Parse login start packet
let login_start = LoginStart::decode(&mut packet.data.as_slice()).map_err(|_| ())?;
// TODO: remove debug message
debug!(target: "LOBBY", "Login {:?}", login_start.name);
debug!(target: "lazymc::lobby", "Login on lobby server (user: {})", login_start.name);
// Respond with login success
let packet = LoginSuccess {
// TODO: use correct username here
uuid: Uuid::new_v3(
&Uuid::new_v3(&Uuid::NAMESPACE_OID, b"OfflinePlayer"),
login_start.name.as_bytes(),
),
username: login_start.name,
// uuid: Uuid::parse_str("35ee313b-d89a-41b8-b25e-d32e8aff0389").unwrap(),
// username: "Username".into(),
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::LOGIN_PACKET_ID_LOGIN_SUCCESS, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
// Update client state to play
// Respond with login success, switch to play state
respond_login_success(&mut writer, &login_start).await?;
client.set_state(ClientState::Play);
// TODO: remove debug message
debug!(target: "LOBBY", "Sent login success, moving to play state");
trace!(target: "lazymc::lobby", "Client login success, sending required play packets for lobby world");
// TODO: handle errors here
play_packets(&mut writer).await;
// Send packets to client required to get into workable play state for lobby world
send_lobby_play_packets(&mut writer).await;
debug!(target: "LOBBY", "Done with playing packets, client disconnect?");
// Wait for server to come online, then set up new connection to it
stage_wait(config.clone(), server.clone(), &mut writer).await?;
let (mut outbound, mut server_buf) = connect_to_server(client, &config, server).await?;
break;
// Grab join game packet from server
let join_game = wait_for_server_join_game(&mut outbound, &mut server_buf).await?;
// TODO: we might have excess server_buf data here, do something with it!
if !server_buf.is_empty() {
error!(target: "lazymc::lobby", "Got excess data from server for client, throwing it away ({} bytes)", server_buf.len());
// TODO: remove after debug
dbg!(server_buf);
}
// Reset our lobby title
send_lobby_title(&mut writer, "").await?;
// Send respawn packet, initiates teleport to real server world
// TODO: should we just send one packet?
send_respawn_from_join(&mut writer, join_game.clone()).await?;
send_respawn_from_join(&mut writer, join_game).await?;
// Drain inbound connection so we don't confuse the server
// TODO: can we drain everything? we might need to forward everything to server except
// for some blacklisted ones
drain_stream(&mut reader).await?;
// TODO: should we wait a little?
// Wait a little because Notchian servers are slow
// See: https://wiki.vg/Protocol#Login_Success
// trace!(target: "lazymc::lobby", "Waiting a second before relaying client connection...");
// time::sleep(Duration::from_secs(1)).await;
// Client and server connection ready now, move client to proxy
debug!(target: "lazymc::lobby", "Server connection ready, moving client to proxy");
route_proxy(inbound, outbound, config);
return Ok(());
}
// TODO: is this ever called?
if client_state == ClientState::Play
&& packet.id == proto::packets::play::SERVER_CLIENT_SETTINGS
{
debug!(target: "LOBBY", "Ignoring client settings packet");
debug!(target: "lazymc::lobby", "Ignoring client settings packet");
continue;
}
// TODO: is this ever called?
if client_state == ClientState::Play
&& packet.id == proto::packets::play::SERVER_PLUGIN_MESSAGE
{
debug!(target: "LOBBY", "Ignoring plugin message packet");
debug!(target: "lazymc::lobby", "Ignoring plugin message packet");
continue;
}
// TODO: is this ever called?
if client_state == ClientState::Play
&& packet.id == proto::packets::play::SERVER_PLAYER_POS_ROT
{
debug!(target: "LOBBY", "Ignoring player pos rot packet");
debug!(target: "lazymc::lobby", "Ignoring player pos rot packet");
continue;
}
// TODO: is this ever called?
if client_state == ClientState::Play && packet.id == proto::packets::play::SERVER_PLAYER_POS
{
debug!(target: "LOBBY", "Ignoring player pos packet");
debug!(target: "lazymc::lobby", "Ignoring player pos packet");
continue;
}
@ -164,67 +199,64 @@ async fn kick(msg: &str, writer: &mut WriteHalf<'_>) -> Result<(), ()> {
writer.write_all(&response).await.map_err(|_| ())
}
async fn play_packets(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
debug!(target: "LOBBY", "Send play packets");
/// Respond to client with login success packet
// TODO: support online mode here
async fn respond_login_success(
writer: &mut WriteHalf<'_>,
login_start: &LoginStart,
) -> Result<(), ()> {
let packet = LoginSuccess {
uuid: Uuid::new_v3(
// TODO: use Uuid::null() here as namespace?
&Uuid::new_v3(&Uuid::NAMESPACE_OID, b"OfflinePlayer"),
login_start.name.as_bytes(),
),
username: login_start.name.clone(),
// uuid: Uuid::parse_str("35ee313b-d89a-41b8-b25e-d32e8aff0389").unwrap(),
// username: "Username".into(),
};
// See: https://wiki.vg/Protocol_FAQ#What.27s_the_normal_login_sequence_for_a_client.3F
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
// Send game join
send_join_game(writer).await?;
// TODO: send brand plugin message
// After this, we receive:
// - PLAY_PAKCET_ID_CLIENT_SETTINGS
// - PLAY_PAKCET_ID_PLUGIN_MESSAGE
// - PLAY_PAKCET_ID_PLAYER_POS_ROT
// - PLAY_PAKCET_ID_PLAYER_POS ...
// TODO: send Update View Position ?
// TODO: send Update View Distance ?
// Send chunk data
// TODO: send_chunk_data(writer).await?;
// TODO: probably not required
send_spawn_pos(writer).await?;
// Send player location, disables download terrain screen
send_player_pos(writer).await?;
// TODO: send Update View Position
// TODO: send Spawn Position
// TODO: send Position and Look (one more time)
// Send time update
send_time_update(writer).await?;
// // Keep sending keep alive packets
send_keep_alive_loop(writer).await?;
let response = RawPacket::new(proto::LOGIN_PACKET_ID_LOGIN_SUCCESS, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
}
async fn send_join_game(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
// // TODO: use proper values here!
// let packet = JoinGame {
// // entity_id: 0,
// // game_mode: GameMode::Spectator,
// entity_id: 27,
// game_mode: GameMode::Hardcore,
// dimension: 23,
// max_players: 100,
// level_type: String::from("default"),
// view_distance: 10,
// reduced_debug_info: true,
// };
/// Send packets to client to get workable play state for lobby world.
async fn send_lobby_play_packets(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
// See: https://wiki.vg/Protocol_FAQ#What.27s_the_normal_login_sequence_for_a_client.3F
// TODO: use proper values here!
// Send initial game join
send_lobby_join_game(writer).await?;
// TODO: send plugin message for brand
// Send spawn and player position, disables 'download terrain' screen
// TODO: is sending spawn this required?
send_lobby_spawn_pos(writer).await?;
send_lobby_player_pos(writer).await?;
// Notify client of world time, required once before keep-alive packets
send_lobby_time_update(writer).await?;
// TODO: we might need to send player_pos one more time
Ok(())
}
/// Send initial join game packet to client for lobby.
async fn send_lobby_join_game(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
// Send Minecrafts default states, slightly customised for lobby world
// TODO: use values from real server here!
let packet = JoinGame {
entity_id: 0x6d,
// TODO: ID must not collide with any other entity, possibly send huge number
entity_id: 0,
hardcore: false,
game_mode: 3,
previous_game_mode: -1i8 as u8, // use -1i8 as u8?
previous_game_mode: -1i8 as u8,
world_names: vec![
"minecraft:overworld".into(),
"minecraft:the_nether".into(),
@ -232,13 +264,16 @@ async fn send_join_game(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
],
dimension_codec: snbt_to_compound_tag(include_str!("../res/dimension_codec.snbt")),
dimension: snbt_to_compound_tag(include_str!("../res/dimension.snbt")),
world_name: "minecraft:overworld".into(),
// TODO: test whether using minecraft:overworld breaks?
world_name: "lazymc:lobby".into(),
hashed_seed: 0,
max_players: 20,
// TODO: try very low view distance?
view_distance: 10,
reduced_debug_info: true,
// TODO: set to true!
reduced_debug_info: false,
enable_respawn_screen: false,
is_debug: false,
is_debug: true,
is_flat: false,
};
@ -251,35 +286,8 @@ async fn send_join_game(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
Ok(())
}
// // TODO: this is possibly broken?
// async fn send_chunk_data(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
// // Send player location, disables download terrain screen
// let packet = ChunkData {
// x: 0,
// z: 0,
// primary_mask: Vec::new(),
// heightmaps: CompoundTag::named("HeightMaps"),
// biomes: Vec::new(),
// data_size: 0,
// data: Vec::new(),
// block_entities_size: 0,
// block_entities: Vec::new(),
// // primary_mask: 65535,
// // heights: CompoundTag::named("HeightMaps"),
// // data: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
// // tiles: vec![CompoundTag::named("TileEntity")],
// };
// let mut data = Vec::new();
// packet.encode(&mut data).map_err(|_| ())?;
// let response = RawPacket::new(proto::CLIENT_CHUNK_DATA, data).encode()?;
// writer.write_all(&response).await.map_err(|_| ())?;
// Ok(())
// }
async fn send_spawn_pos(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
/// Send lobby spawn position to client.
async fn send_lobby_spawn_pos(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
let packet = SpawnPosition {
position: 0,
angle: 0.0,
@ -294,7 +302,8 @@ async fn send_spawn_pos(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
Ok(())
}
async fn send_player_pos(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
/// Send lobby player position to client.
async fn send_lobby_player_pos(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
// Send player location, disables download terrain screen
let packet = PlayerPositionAndLook {
x: 0.0,
@ -316,10 +325,11 @@ async fn send_player_pos(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
Ok(())
}
async fn send_time_update(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
/// Send lobby time update to client.
async fn send_lobby_time_update(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
const MC_TIME_NOON: i64 = 6000;
// Send player location, disables download terrain screen
// Send time update, required once for keep-alive packets
let packet = TimeUpdate {
world_age: MC_TIME_NOON,
time_of_day: MC_TIME_NOON,
@ -334,8 +344,10 @@ async fn send_time_update(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
Ok(())
}
/// Send keep alive packet to client.
///
/// Required periodically in play mode to prevent client timeout.
async fn send_keep_alive(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
// Send player location, disables download terrain screen
// TODO: keep picking random ID!
let packet = ClientBoundKeepAlive { id: 0 };
@ -345,32 +357,24 @@ async fn send_keep_alive(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
let response = RawPacket::new(proto::packets::play::CLIENT_KEEP_ALIVE, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
// TODO: require to receive correct keepalive!
// TODO: verify we receive keep alive response with same ID from client
Ok(())
}
async fn send_keep_alive_loop(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
// TODO: use interval of 10 sec?
let mut poll_interval = time::interval(Duration::from_secs(10));
/// Send lobby title packets to client.
///
/// This will show the given text for two keep-alive periods. Use a newline for the subtitle.
///
/// If an empty string is given, the title times will be reset to default.
async fn send_lobby_title(writer: &mut WriteHalf<'_>, text: &str) -> Result<(), ()> {
// Grab title and subtitle bits
let title = text.lines().next().unwrap_or("");
let subtitle = text.lines().skip(1).collect::<Vec<_>>().join("\n");
loop {
// TODO: wait for start signal over channel instead of polling
poll_interval.tick().await;
debug!(target: "LOBBY", "Sending keep-alive to client");
send_keep_alive(writer).await?;
send_title(writer).await?;
}
Ok(())
}
async fn send_title(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
// Set title
let packet = SetTitleText {
text: Message::new(Payload::text(STARTING_BANNER)),
text: Message::new(Payload::text(title)),
};
let mut data = Vec::new();
@ -381,7 +385,7 @@ async fn send_title(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
// Set subtitle
let packet = SetTitleSubtitle {
text: Message::new(Payload::text(STARTING_BANNER_SUB)),
text: Message::new(Payload::text(&subtitle)),
};
let mut data = Vec::new();
@ -391,11 +395,20 @@ async fn send_title(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
RawPacket::new(proto::packets::play::CLIENT_SET_TITLE_SUBTITLE, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
// Set times
let packet = SetTitleTimes {
fade_in: 0,
stay: i32::MAX,
fade_out: 0,
// Set title times
let packet = if title.is_empty() && subtitle.is_empty() {
// TODO: figure out real default values here
SetTitleTimes {
fade_in: 10,
stay: 100,
fade_out: 10,
}
} else {
SetTitleTimes {
fade_in: 0,
stay: KEEP_ALIVE_INTERVAL.as_secs() as i32 * TICKS_PER_SECOND as i32 * 2,
fade_out: 0,
}
};
let mut data = Vec::new();
@ -407,6 +420,305 @@ async fn send_title(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
Ok(())
}
/// Send respawn packet to client to jump from lobby into now loaded server.
///
/// The required details will be fetched from the `join_game` packet as provided by the server.
async fn send_respawn_from_join(writer: &mut WriteHalf<'_>, join_game: JoinGame) -> Result<(), ()> {
let packet = Respawn {
dimension: join_game.dimension,
world_name: join_game.world_name,
hashed_seed: join_game.hashed_seed,
game_mode: join_game.game_mode,
previous_game_mode: join_game.previous_game_mode,
is_debug: join_game.is_debug,
is_flat: join_game.is_flat,
copy_metadata: false,
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(proto::packets::play::CLIENT_RESPAWN, data).encode()?;
writer.write_all(&response).await.map_err(|_| ())?;
Ok(())
}
/// An infinite keep-alive loop.
///
/// This will keep sending keep-alive and title packets to the client until it is dropped.
async fn keep_alive_loop(writer: &mut WriteHalf<'_>) -> Result<(), ()> {
let mut interval = time::interval(KEEP_ALIVE_INTERVAL);
loop {
interval.tick().await;
trace!(target: "lazymc::lobby", "Sending keep-alive sequence to lobby client");
// Send keep alive and title packets
send_keep_alive(writer).await?;
send_lobby_title(writer, STARTING_BANNER).await?;
}
}
/// Waiting stage.
///
/// In this stage we wait for the server to come online.
///
/// During this stage we keep sending keep-alive and title packets to the client to keep it active.
// TODO: should we use some timeout in here, could be large?
async fn stage_wait<'a>(
config: Arc<Config>,
server: Arc<Server>,
writer: &mut WriteHalf<'a>,
) -> Result<(), ()> {
// Ensure server poll interval is less than keep alive interval
// We need to wait on the smallest interval in the following loop
assert!(
SERVER_POLL_INTERVAL <= KEEP_ALIVE_INTERVAL,
"SERVER_POLL_INTERVAL should be <= KEEP_ALIVE_INTERVAL"
);
select! {
a = keep_alive_loop(writer) => a,
b = wait_for_server(config, server) => b,
}
}
/// Wait for the server to come online.
///
/// Returns `Ok(())` once the server is online, returns `Err(())` if waiting failed.
// TODO: go through this, use proper error messages
async fn wait_for_server<'a>(config: Arc<Config>, server: Arc<Server>) -> Result<(), ()> {
debug!(target: "lazymc::lobby", "Waiting on server...");
// Set up polling interval, get timeout
let mut poll_interval = time::interval(HOLD_POLL_INTERVAL);
let since = Instant::now();
let timeout = config.time.hold_client_for as u64;
loop {
// TODO: wait for start signal over channel instead of polling
poll_interval.tick().await;
trace!(target: "lazymc::lobby", "Polling outbound server state for lobby client...");
match server.state() {
// Still waiting on server start
State::Starting => {
trace!(target: "lazymc::lobby", "Server not ready, holding client for longer");
// TODO: add timeout here?
continue;
}
// Server started, start relaying and proxy
State::Started => {
// TODO: drop client if already disconnected
debug!(target: "lazymc::lobby", "Server ready for lobby client!");
return Ok(());
}
// Server stopping or stopped, this shouldn't happen
State::Stopping | State::Stopped => {
break;
}
}
}
Err(())
}
/// Create connection to the server.
///
/// This will initialize the connection to the play state. Client details are used.
// TODO: clean this up
async fn connect_to_server(
real_client: Client,
config: &Config,
server: Arc<Server>,
) -> Result<(TcpStream, BytesMut), ()> {
// Open connection
// TODO: on connect fail, ping server and redirect to serve_status if offline
let mut outbound = TcpStream::connect(config.server.address)
.await
.map_err(|_| ())?;
let (mut reader, mut writer) = outbound.split();
let tmp_client = Client::default();
tmp_client.set_state(ClientState::Login);
// TODO: use client version
let packet = Handshake {
protocol_version: 755,
server_addr: config.server.address.ip().to_string(),
server_port: config.server.address.port(),
next_state: ClientState::Login.to_id(),
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let request = RawPacket::new(proto::HANDSHAKE_PACKET_ID_HANDSHAKE, data).encode()?;
writer.write_all(&request).await.map_err(|_| ())?;
// Request login start
// TODO: use client username
let packet = LoginStart {
name: "timvisee".into(),
};
let mut data = Vec::new();
packet.encode(&mut data).map_err(|_| ())?;
let request = RawPacket::new(proto::LOGIN_PACKET_ID_LOGIN_START, data).encode()?;
writer.write_all(&request).await.map_err(|_| ())?;
// Incoming buffer
let mut buf = BytesMut::new();
loop {
// Read packet from stream
let (packet, raw) = match proto::read_packet(&mut buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => {
error!(target: "lazymc::lobby", "Closing connection, error occurred");
break;
}
};
// Grab client state
let client_state = tmp_client.state();
// Hijack login success
if client_state == ClientState::Login && packet.id == proto::LOGIN_PACKET_ID_LOGIN_SUCCESS {
trace!(target: "lazymc::lobby", "Received login success from server connection, change to play mode");
// TODO: parse this packet to ensure it's fine
// let login_success =
// LoginSuccess::decode(&mut packet.data.as_slice()).map_err(|err| {
// dbg!(err);
// ()
// })?;
// Switch to play state
tmp_client.set_state(ClientState::Play);
return Ok((outbound, buf));
}
// Show unhandled packet warning
debug!(target: "lazymc::lobby", "Received unhandled packet from server in connect_to_server:");
debug!(target: "lazymc::lobby", "- State: {:?}", client_state);
debug!(target: "lazymc::lobby", "- Packet ID: 0x{:02X} ({})", packet.id, packet.id);
}
// // Gracefully close connection
// match writer.shutdown().await {
// Ok(_) => {}
// Err(err) if err.kind() == io::ErrorKind::NotConnected => {}
// Err(_) => return Err(()),
// }
// TODO: do we ever reach this?
Err(())
}
/// Wait for join game packet on server connection.
///
/// This parses, consumes and returns the packet.
// TODO: clean this up
// TODO: do not drop error here, return Box<dyn Error>
// TODO: add timeout
async fn wait_for_server_join_game(
mut outbound: &mut TcpStream,
buf: &mut BytesMut,
) -> Result<JoinGame, ()> {
let (mut reader, mut writer) = outbound.split();
loop {
// Read packet from stream
let (packet, raw) = match proto::read_packet(buf, &mut reader).await {
Ok(Some(packet)) => packet,
Ok(None) => break,
Err(_) => {
error!(target: "lazymc::lobby", "Closing connection, error occurred");
break;
}
};
// Catch join game
if packet.id == proto::packets::play::CLIENT_JOIN_GAME {
let join_game = JoinGame::decode(&mut packet.data.as_slice()).map_err(|err| {
// TODO: remove this debug
dbg!(err);
()
})?;
return Ok(join_game);
}
// Show unhandled packet warning
debug!(target: "lazymc::lobby", "Received unhandled packet from server in wait_for_server_join_game:");
debug!(target: "lazymc::lobby", "- Packet ID: 0x{:02X} ({})", packet.id, packet.id);
}
// Gracefully close connection
match writer.shutdown().await {
Ok(_) => {}
Err(err) if err.kind() == io::ErrorKind::NotConnected => {}
Err(_) => return Err(()),
}
// TODO: will we ever reach this?
Err(())
}
/// Route our lobby client through the proxy to the real server, spawning a new task.
#[inline]
pub fn route_proxy(inbound: TcpStream, outbound: TcpStream, config: Arc<Config>) {
// When server is online, proxy all
let service = async move {
proxy::proxy_inbound_outbound_with_queue(inbound, outbound, &[], &[])
.map(|r| {
if let Err(err) = r {
warn!(target: "lazymc", "Failed to proxy: {}", err);
}
})
.await
};
tokio::spawn(service);
}
/// Drain given reader until nothing is left.
// TODO: go through this, use proper error messages
async fn drain_stream<'a>(reader: &mut ReadHalf<'a>) -> Result<(), ()> {
// TODO: remove after debug
trace!(target: "lazymc::lobby", "Draining stream...");
// TODO: use other size, look at default std::io size?
let mut drain_buf = [0; 1024];
loop {
match reader.try_read(&mut drain_buf) {
// TODO: stop if read < drain_buf.len() ?
Ok(read) if read == 0 => return Ok(()),
Err(err) if err.kind() == ErrorKind::WouldBlock => return Ok(()),
Ok(read) => continue,
Err(err) => {
// TODO: remove after debug
dbg!("drain err", err);
return Ok(());
}
}
}
}
/// Read NBT CompoundTag from SNBT.
fn snbt_to_compound_tag(data: &str) -> CompoundTag {
use nbt::decode::read_compound_tag;

View File

@ -44,14 +44,15 @@ pub mod packets {
pub const SERVER_PLAYER_POS_ROT: i32 = 0x12;
pub const SERVER_PLAYER_POS: i32 = 0x11;
pub const CLIENT_KEEP_ALIVE: i32 = 0x21;
pub const CLIENT_CHUNK_DATA: i32 = 0x22;
pub const CLIENT_PLAYER_POS_LOOK: i32 = 0x38;
pub const CLIENT_RESPAWN: i32 = 0x3D;
pub const CLIENT_SET_TITLE_TEXT: i32 = 0x59;
pub const CLIENT_SET_TITLE_SUBTITLE: i32 = 0x57;
pub const CLIENT_SET_TITLE_TIMES: i32 = 0x5A;
pub const CLIENT_TIME_UPDATE: i32 = 0x58;
pub const CLIENT_CHAT_MSG: i32 = 0x0F;
pub const CLIENT_SPAWN_POS: i32 = 0x4B;
pub const CLIENT_DISCONNECT: i32 = 0x1A;
}
}

View File

@ -14,22 +14,43 @@ pub async fn proxy(inbound: TcpStream, addr_target: SocketAddr) -> Result<(), Bo
///
/// Send the queue to the target server before proxying.
pub async fn proxy_with_queue(
mut inbound: TcpStream,
inbound: TcpStream,
addr_target: SocketAddr,
queue: &[u8],
) -> Result<(), Box<dyn Error>> {
// Set up connection to server
// TODO: on connect fail, ping server and redirect to serve_status if offline
let mut outbound = TcpStream::connect(addr_target).await?;
let outbound = TcpStream::connect(addr_target).await?;
// Start proxy on both streams
proxy_inbound_outbound_with_queue(inbound, outbound, &[], queue).await
}
/// Proxy the inbound stream to a target address.
///
/// Send the queue to the target server before proxying.
// TODO: find better name for this
pub async fn proxy_inbound_outbound_with_queue(
mut inbound: TcpStream,
mut outbound: TcpStream,
inbound_queue: &[u8],
outbound_queue: &[u8],
) -> Result<(), Box<dyn Error>> {
let (mut ri, mut wi) = inbound.split();
let (mut ro, mut wo) = outbound.split();
// Forward queued bytes to client once writable
if !inbound_queue.is_empty() {
wi.writable().await?;
trace!(target: "lazymc", "Relaying {} queued bytes to client", inbound_queue.len());
wi.write_all(inbound_queue).await?;
}
// Forward queued bytes to server once writable
if !queue.is_empty() {
if !outbound_queue.is_empty() {
wo.writable().await?;
trace!(target: "lazymc", "Relaying {} queued bytes to server", queue.len());
wo.write_all(queue).await?;
trace!(target: "lazymc", "Relaying {} queued bytes to server", outbound_queue.len());
wo.write_all(outbound_queue).await?;
}
let client_to_server = async {