Basic handshake handling, many fixes

This commit is contained in:
timvisee 2021-11-07 17:20:06 +01:00
parent efae87af7d
commit 2d62384b9e
No known key found for this signature in database
GPG Key ID: B8DB720BC383E172
4 changed files with 232 additions and 102 deletions

4
Cargo.lock generated
View File

@ -247,7 +247,7 @@ checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
[[package]] [[package]]
name = "minecraft-protocol" name = "minecraft-protocol"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/eihwaz/minecraft-protocol?rev=09f95625eff272794590e1ef73b2a1b673f47f50#09f95625eff272794590e1ef73b2a1b673f47f50" source = "git+https://github.com/timvisee/minecraft-protocol?rev=c578492#c57849246166add5ad45ef36b3bdebd8b744883d"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"minecraft-protocol-derive", "minecraft-protocol-derive",
@ -260,7 +260,7 @@ dependencies = [
[[package]] [[package]]
name = "minecraft-protocol-derive" name = "minecraft-protocol-derive"
version = "0.0.0" version = "0.0.0"
source = "git+https://github.com/eihwaz/minecraft-protocol?rev=09f95625eff272794590e1ef73b2a1b673f47f50#09f95625eff272794590e1ef73b2a1b673f47f50" source = "git+https://github.com/timvisee/minecraft-protocol?rev=c578492#c57849246166add5ad45ef36b3bdebd8b744883d"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View File

@ -6,5 +6,5 @@ edition = "2018"
[dependencies] [dependencies]
bytes = "1.1" bytes = "1.1"
futures = "0.3" futures = "0.3"
minecraft-protocol = { git = "https://github.com/eihwaz/minecraft-protocol", rev = "09f95625eff272794590e1ef73b2a1b673f47f50" } minecraft-protocol = { git = "https://github.com/timvisee/minecraft-protocol", rev = "c578492" }
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }

View File

@ -9,19 +9,24 @@ use std::error::Error;
use bytes::BytesMut; use bytes::BytesMut;
use futures::future::poll_fn; use futures::future::poll_fn;
use futures::FutureExt; use futures::FutureExt;
use futures::TryFutureExt;
use minecraft_protocol::data::chat::{Message, Payload}; use minecraft_protocol::data::chat::{Message, Payload};
use minecraft_protocol::data::server_status::*; use minecraft_protocol::data::server_status::*;
use minecraft_protocol::decoder::Decoder; use minecraft_protocol::decoder::Decoder;
use minecraft_protocol::encoder::Encoder; use minecraft_protocol::encoder::Encoder;
use minecraft_protocol::version::v1_14_4::handshake::Handshake;
use minecraft_protocol::version::v1_14_4::status::{PingRequest, PingResponse, StatusResponse}; use minecraft_protocol::version::v1_14_4::status::{PingRequest, PingResponse, StatusResponse};
use tokio::io; use tokio::io;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::io::ReadBuf; use tokio::io::ReadBuf;
use tokio::net::tcp::ReadHalf;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::unbounded_channel; use tokio::sync::mpsc::unbounded_channel;
use config::*; use config::*;
use protocol::{Client, RawPacket}; use protocol::{Client, ClientState, RawPacket};
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), ()> { async fn main() -> Result<(), ()> {
@ -51,6 +56,57 @@ async fn main() -> Result<(), ()> {
Ok(()) Ok(())
} }
/// Read raw packet from stream.
async fn read_packet<'a>(
buf: &mut BytesMut,
stream: &mut ReadHalf<'a>,
) -> Result<Option<(RawPacket, Vec<u8>)>, ()> {
// // Wait until socket is readable
// if stream.readable().await.is_err() {
// eprintln!("Socket not readable!");
// return Ok(None);
// }
// Keep reading until we have at least 2 bytes
while buf.len() < 2 {
// Read packet from socket
let mut tmp = Vec::with_capacity(64);
stream.read_buf(&mut tmp).await;
if tmp.is_empty() {
return Ok(None);
}
buf.extend(tmp);
}
// Attempt to read packet length
let (consumed, len) = match types::read_var_int(&buf) {
Ok(result) => result,
Err(err) => {
eprintln!("Failed to read packet length, should retry!");
eprintln!("{:?}", (&buf).as_ref());
return Err(err);
}
};
// Keep reading until we have all packet bytes
while buf.len() < consumed + len as usize {
// Read packet from socket
let mut tmp = Vec::with_capacity(64);
stream.read_buf(&mut tmp).await;
if tmp.is_empty() {
return Ok(None);
}
buf.extend(tmp);
}
// Parse packet
let raw = buf.split_to(consumed + len as usize);
let packet = RawPacket::decode(&raw)?;
Ok(Some((packet, raw.to_vec())))
}
/// Proxy the given inbound stream to a target address. /// Proxy the given inbound stream to a target address.
// TODO: do not drop error here, return Box<dyn Error> // TODO: do not drop error here, return Box<dyn Error>
async fn proxy(mut client: Client, mut inbound: TcpStream, addr_target: String) -> Result<(), ()> { async fn proxy(mut client: Client, mut inbound: TcpStream, addr_target: String) -> Result<(), ()> {
@ -62,131 +118,186 @@ async fn proxy(mut client: Client, mut inbound: TcpStream, addr_target: String)
let (client_send_queue, mut client_to_send) = unbounded_channel::<Vec<u8>>(); let (client_send_queue, mut client_to_send) = unbounded_channel::<Vec<u8>>();
let client_to_server = async { let client_to_server = async {
// Wait for readable state // Incoming buffer
while ri.readable().await.is_ok() { let mut buf = BytesMut::new();
// Poll until we have data available
let mut poll_buf = [0; 10]; loop {
let mut poll_buf = ReadBuf::new(&mut poll_buf); // In login state, proxy raw data
let read = poll_fn(|cx| ri.poll_peek(cx, &mut poll_buf)) if client.state() == ClientState::Login {
.await eprintln!("STARTED FULL PROXY");
.map_err(|_| ())?;
if read == 0 { wo.writable().await;
continue;
// Forward remaining buffer
wo.write_all(&buf).await.map_err(|_| ())?;
buf.clear();
// Forward rest of data
io::copy(&mut ri, &mut wo).await.map_err(|_| ())?;
break;
} }
// Read packet from socket // Read packet from stream
let mut buf = Vec::with_capacity(64); let (packet, raw) = match read_packet(&mut buf, &mut ri).await {
let read = ri.try_read_buf(&mut buf).map_err(|_| ())?; Ok(Some(packet)) => packet,
if read == 0 { Ok(None) => {
continue; eprintln!("Closing connection, could not read more");
} break;
eprintln!("PACKET {:?}", buf.as_slice());
match RawPacket::decode(buf.as_mut_slice()) {
Ok(packet) => {
eprintln!("PACKET ID: {}", packet.id);
eprintln!("PACKET DATA: {:?}", packet.data);
// Hijack server status packet
if packet.id == protocol::STATUS_PACKET_ID_STATUS {
// Build status response
let server_status = ServerStatus {
version: ServerVersion {
name: String::from("1.16.5"),
protocol: 754,
},
description: Message::new(Payload::text(LABEL_SERVER_SLEEPING)),
players: OnlinePlayers {
online: 0,
max: 0,
sample: vec![],
},
};
let server_status = StatusResponse { server_status };
let mut data = Vec::new();
server_status.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(0, data).encode()?;
client_send_queue
.send(response)
.expect("failed to queue status response");
continue;
}
// Hijack ping packet
if packet.id == protocol::STATUS_PACKET_ID_PING {
let ping =
PingRequest::decode(&mut packet.data.as_slice()).map_err(|_| ())?;
let response = packet.encode()?;
client_send_queue
.send(response)
.expect("failed to queue ping response");
continue;
}
} }
Err(err) => { Err(_) => {
eprintln!("Failed to parse packet: {:?}", err); // Forward raw packet to server
return Err(err); wo.write_all(&buf).await.expect("failed to write to server");
buf.clear();
continue;
}
};
// Show packet details
eprintln!("PACKET {:?}", raw.as_slice());
eprintln!("PACKET ID: {}", packet.id);
eprintln!("PACKET DATA: {:?}", packet.data);
// Hijack handshake
if client.state() == ClientState::Handshake
&& packet.id == protocol::STATUS_PACKET_ID_STATUS
{
if let Ok(handshake) = Handshake::decode(&mut packet.data.as_slice()) {
eprintln!("# PACKET HANDSHAKE");
eprintln!("SWITCHING CLIENT STATE: {}", handshake.next_state);
// TODO: do not panic here
client.set_state(
ClientState::from_id(handshake.next_state)
.expect("unknown next client state"),
);
} else {
eprintln!("HANDSHAKE ERROR");
} }
} }
// Forward data to server // Hijack server status packet
wo.write_all(&buf).await.expect("failed to write to server"); if client.state() == ClientState::Status
&& packet.id == protocol::STATUS_PACKET_ID_STATUS
{
eprintln!("# PACKET STATUS");
// Build status response
let server_status = ServerStatus {
version: ServerVersion {
name: String::from("1.16.5"),
protocol: 754,
},
description: Message::new(Payload::text(LABEL_SERVER_SLEEPING)),
players: OnlinePlayers {
online: 0,
max: 0,
sample: vec![],
},
};
let server_status = StatusResponse { server_status };
let mut data = Vec::new();
server_status.encode(&mut data).map_err(|_| ())?;
let response = RawPacket::new(0, data).encode()?;
client_send_queue
.send(response)
.expect("failed to queue status response");
continue;
}
// Hijack ping packet
if client.state() == ClientState::Status && packet.id == protocol::STATUS_PACKET_ID_PING
{
eprintln!("# PACKET PING");
client_send_queue
.send(raw)
.expect("failed to queue ping response");
continue;
}
// Forward raw packet to server
wo.write_all(&raw).await.expect("failed to write to server");
} }
// io::copy(&mut ri, &mut wo).await?;
wo.shutdown().await.map_err(|_| ()) wo.shutdown().await.map_err(|_| ())
}; };
let server_to_client = async { let server_to_client = async {
// Server packts to send to client, add to client sending queue // Server packts to send to client, add to client sending queue
let proxy = async { let proxy = async {
// Wait for readable state // Incoming buffer
while ro.readable().await.is_ok() { let mut buf = BytesMut::new();
// Poll until we have data available
let mut poll_buf = [0; 10]; loop {
let mut poll_buf = ReadBuf::new(&mut poll_buf); // In login state, simply proxy all
let read = poll_fn(|cx| ro.poll_peek(cx, &mut poll_buf)) if client.state() == ClientState::Login {
.await // if true {
.map_err(|_| ())?; // if true {
if read == 0 { eprintln!("STARTED FULL PROXY");
continue;
// // Wait until socket is readable
// if ro.readable().await.is_err() {
// eprintln!("Socket not readable!");
// break;
// }
// Forward remaining data
client_send_queue.send(buf.to_vec());
buf.clear();
// Keep reading until we have at least 2 bytes
loop {
// Read packet from socket
let mut tmp = Vec::new();
ro.read_buf(&mut tmp).await;
if tmp.is_empty() {
break;
}
client_send_queue.send(tmp);
}
// Forward raw packet to server
// wi.writable().await;
// io::copy(&mut ro, &mut wi).await.map_err(|_| ())?;
break;
} }
// Read packet from socket // Read packet from stream
let mut buf = Vec::with_capacity(64); let (packet, raw) = match read_packet(&mut buf, &mut ro).await {
let read = ro.try_read_buf(&mut buf).map_err(|_| ())?; Ok(Some(packet)) => packet,
if read == 0 { Ok(None) => {
continue; eprintln!("Closing connection, could not read more");
} break;
}
Err(_) => {
// Forward raw packet to server
client_send_queue.send(buf.to_vec());
continue;
}
};
client_send_queue.send(buf); client_send_queue.send(raw);
} }
// io::copy(&mut ri, &mut wo).await?;
Ok(()) Ok(())
}; };
// Push client sending queue to client // Push client sending queue to client
let other = async { let send_queue = async {
loop { wi.writable().await.map_err(|_| ())?;
let msg = poll_fn(|cx| client_to_send.poll_recv(cx))
.await
.expect("failed to poll_fn");
wi.write_all(msg.as_ref()) while let Some(msg) = client_to_send.recv().await {
.await // eprintln!("TO CLIENT: {:?}", &msg);
.expect("failed to write to client"); wi.write_all(msg.as_ref()).await.map_err(|_| ())?;
} }
Ok(()) Ok(())
}; };
tokio::try_join!(proxy, other)?; tokio::try_join!(proxy, send_queue)?;
io::copy(&mut ro, &mut wi).await.map_err(|_| ())?;
wi.shutdown().await.map_err(|_| ()) wi.shutdown().await.map_err(|_| ())
}; };

View File

@ -1,3 +1,5 @@
use std::sync::Mutex;
use crate::types; use crate::types;
pub const STATUS_PACKET_ID_STATUS: i32 = 0; pub const STATUS_PACKET_ID_STATUS: i32 = 0;
@ -8,10 +10,22 @@ pub const STATUS_PACKET_ID_PING: i32 = 1;
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct Client { pub struct Client {
/// Current client state. /// Current client state.
pub state: ClientState, pub state: Mutex<ClientState>,
} }
#[derive(Debug, Copy, Clone)] impl Client {
/// Get client state.
pub fn state(&self) -> ClientState {
*self.state.lock().unwrap()
}
/// Set client state.
pub fn set_state(&self, state: ClientState) {
*self.state.lock().unwrap() = state;
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ClientState { pub enum ClientState {
/// Initial client state. /// Initial client state.
Handshake, Handshake,
@ -78,6 +92,11 @@ impl RawPacket {
let (read, len) = types::read_var_int(buf)?; let (read, len) = types::read_var_int(buf)?;
buf = &buf[read..][..len as usize]; buf = &buf[read..][..len as usize];
Self::decode_data(len, buf)
}
/// Decode packet from raw buffer without the length header.
pub fn decode_data(len: i32, mut buf: &[u8]) -> Result<Self, ()> {
// Read packet ID, select buf // Read packet ID, select buf
let (read, packet_id) = types::read_var_int(buf)?; let (read, packet_id) = types::read_var_int(buf)?;
buf = &buf[read..]; buf = &buf[read..];