mirror of
https://github.com/PaperMC/Paper.git
synced 2025-07-26 09:42:06 -07:00
This implements a solution that preemptively exits the tick loop if we have already marked the connection as disconnecting. This avoids changing the result of Connection#isConnected in order to avoid other possibly unintentional changes. Fundamentally it should be investigated if closing the connection async is really still needed. This also additionally removes the login disconnecting logic for server stopping, as this was also a possible issue in the config stage but also shouldn't be an issue as connections are closed on server stop very early. Additionally, do not check for isConnecting() on VERIFYING, as that seemed to be an old diff originally trying to resolve this code, however isConnected is not updated at this point so it pretty much was useless.
401 lines
20 KiB
Diff
401 lines
20 KiB
Diff
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
|
|
From: Aikar <aikar@aikar.co>
|
|
Date: Wed, 6 May 2020 04:53:35 -0400
|
|
Subject: [PATCH] Optimize Network Manager and add advanced packet support
|
|
|
|
Adds ability for 1 packet to bundle other packets to follow it
|
|
Adds ability for a packet to delay sending more packets until a state is ready.
|
|
|
|
Removes synchronization from sending packets
|
|
Removes processing packet queue off of main thread
|
|
- for the few cases where it is allowed, order is not necessary nor
|
|
should it even be happening concurrently in first place (handshaking/login/status)
|
|
|
|
Ensures packets sent asynchronously are dispatched on main thread
|
|
|
|
This helps ensure safety for ProtocolLib as packet listeners
|
|
are commonly accessing world state. This will allow you to schedule
|
|
a packet to be sent async, but itll be dispatched sync for packet
|
|
listeners to process.
|
|
|
|
This should solve some deadlock risks
|
|
|
|
Also adds Netty Channel Flush Consolidation to reduce the amount of flushing
|
|
|
|
Also avoids spamming closed channel exception by rechecking closed state in dispatch
|
|
and then catch exceptions and close if they fire.
|
|
|
|
Part of this commit was authored by: Spottedleaf, sandtechnology
|
|
|
|
diff --git a/net/minecraft/network/Connection.java b/net/minecraft/network/Connection.java
|
|
index 14a25f4d9c6da9b9d7646cf88b80a04f5b4d0823..682fb4cbf9975b21171ae210defd6e338de3b718 100644
|
|
--- a/net/minecraft/network/Connection.java
|
|
+++ b/net/minecraft/network/Connection.java
|
|
@@ -85,7 +85,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
private static final ProtocolInfo<ServerHandshakePacketListener> INITIAL_PROTOCOL = HandshakeProtocols.SERVERBOUND;
|
|
private final PacketFlow receiving;
|
|
private volatile boolean sendLoginDisconnect = true;
|
|
- private final Queue<Consumer<Connection>> pendingActions = Queues.newConcurrentLinkedQueue();
|
|
+ private final Queue<WrappedConsumer> pendingActions = Queues.newConcurrentLinkedQueue(); // Paper - Optimize network
|
|
public Channel channel;
|
|
public SocketAddress address;
|
|
// Spigot start
|
|
@@ -146,6 +146,10 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
@Nullable public SocketAddress haProxyAddress; // Paper - Add API to get player's proxy address
|
|
public @Nullable java.util.Optional<net.minecraft.network.chat.Component> legacySavedLoginEventResultOverride; // Paper - playerloginevent
|
|
public @Nullable net.minecraft.server.level.ServerPlayer savedPlayerForLoginEventLegacy; // Paper - playerloginevent
|
|
+ // Paper start - Optimize network
|
|
+ public boolean isPending = true;
|
|
+ public boolean queueImmunity;
|
|
+ // Paper end - Optimize network
|
|
|
|
public Connection(PacketFlow receiving) {
|
|
this.receiving = receiving;
|
|
@@ -420,11 +424,38 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
}
|
|
|
|
public void send(Packet<?> packet, @Nullable ChannelFutureListener channelFutureListener, boolean flag) {
|
|
- if (this.isConnected()) {
|
|
- this.flushQueue();
|
|
+ // Paper start - Optimize network: Handle oversized packets better
|
|
+ final boolean connected = this.isConnected();
|
|
+ if (!connected && !this.preparing) {
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ packet.onPacketDispatch(this.getPlayer());
|
|
+ if (connected && (InnerUtil.canSendImmediate(this, packet)
|
|
+ || (io.papermc.paper.util.MCUtil.isMainThread() && packet.isReady() && this.pendingActions.isEmpty()
|
|
+ && (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())))) {
|
|
this.sendPacket(packet, channelFutureListener, flag);
|
|
} else {
|
|
- this.pendingActions.add(connection -> connection.sendPacket(packet, channelFutureListener, flag));
|
|
+ // Write the packets to the queue, then flush - antixray hooks there already
|
|
+ final java.util.List<Packet<?>> extraPackets = InnerUtil.buildExtraPackets(packet);
|
|
+ final boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty();
|
|
+ if (!hasExtraPackets) {
|
|
+ this.pendingActions.add(new PacketSendAction(packet, channelFutureListener, flag));
|
|
+ } else {
|
|
+ final java.util.List<PacketSendAction> actions = new java.util.ArrayList<>(1 + extraPackets.size());
|
|
+ actions.add(new PacketSendAction(packet, null, false)); // Delay the future listener until the end of the extra packets
|
|
+
|
|
+ for (int i = 0, len = extraPackets.size(); i < len;) {
|
|
+ final Packet<?> extraPacket = extraPackets.get(i);
|
|
+ final boolean end = ++i == len;
|
|
+ actions.add(new PacketSendAction(extraPacket, end ? channelFutureListener : null, end)); // Append listener to the end
|
|
+ }
|
|
+
|
|
+ this.pendingActions.addAll(actions);
|
|
+ }
|
|
+
|
|
+ this.flushQueue();
|
|
+ // Paper end - Optimize network
|
|
}
|
|
}
|
|
|
|
@@ -433,7 +464,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
this.flushQueue();
|
|
action.accept(this);
|
|
} else {
|
|
- this.pendingActions.add(action);
|
|
+ this.pendingActions.add(new WrappedConsumer(action)); // Paper - Optimize network
|
|
}
|
|
}
|
|
|
|
@@ -447,21 +478,41 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
}
|
|
|
|
private void doSendPacket(Packet<?> packet, @Nullable ChannelFutureListener channelFutureListener, boolean flag) {
|
|
+ // Paper start - Optimize network
|
|
+ final net.minecraft.server.level.ServerPlayer player = this.getPlayer();
|
|
+ if (!this.isConnected()) {
|
|
+ packet.onPacketDispatchFinish(player, null);
|
|
+ return;
|
|
+ }
|
|
+ try {
|
|
+ final ChannelFuture channelFuture;
|
|
+ // Paper end - Optimize network
|
|
if (channelFutureListener != null) {
|
|
- ChannelFuture channelFuture = flag ? this.channel.writeAndFlush(packet) : this.channel.write(packet);
|
|
+ channelFuture = flag ? this.channel.writeAndFlush(packet) : this.channel.write(packet); // Paper - Optimize network
|
|
channelFuture.addListener(channelFutureListener);
|
|
} else if (flag) {
|
|
- this.channel.writeAndFlush(packet, this.channel.voidPromise());
|
|
+ channelFuture = this.channel.writeAndFlush(packet, this.channel.voidPromise()); // Paper - Optimize network
|
|
} else {
|
|
- this.channel.write(packet, this.channel.voidPromise());
|
|
+ channelFuture = this.channel.write(packet, this.channel.voidPromise()); // Paper - Optimize network
|
|
+ }
|
|
+
|
|
+ // Paper start - Optimize network
|
|
+ if (packet.hasFinishListener()) {
|
|
+ channelFuture.addListener((ChannelFutureListener) future -> packet.onPacketDispatchFinish(player, future));
|
|
+ }
|
|
+ } catch (final Exception e) {
|
|
+ LOGGER.error("NetworkException: {}", player, e);
|
|
+ this.disconnect(Component.translatable("disconnect.genericReason", "Internal Exception: " + e.getMessage()));
|
|
+ packet.onPacketDispatchFinish(player, null);
|
|
}
|
|
+ // Paper end - Optimize network
|
|
}
|
|
|
|
public void flushChannel() {
|
|
if (this.isConnected()) {
|
|
this.flush();
|
|
} else {
|
|
- this.pendingActions.add(Connection::flush);
|
|
+ this.pendingActions.add(new WrappedConsumer(Connection::flush)); // Paper - Optimize network
|
|
}
|
|
}
|
|
|
|
@@ -473,16 +524,57 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
}
|
|
}
|
|
|
|
- private void flushQueue() {
|
|
- if (this.channel != null && this.channel.isOpen()) {
|
|
+ // Paper start - Optimize network: Rewrite this to be safer if ran off main thread
|
|
+ private boolean flushQueue() {
|
|
+ if (!this.isConnected()) {
|
|
+ return true;
|
|
+ }
|
|
+ if (io.papermc.paper.util.MCUtil.isMainThread()) {
|
|
+ return this.processQueue();
|
|
+ } else if (this.isPending) {
|
|
+ // Should only happen during login/status stages
|
|
synchronized (this.pendingActions) {
|
|
- Consumer<Connection> consumer;
|
|
- while ((consumer = this.pendingActions.poll()) != null) {
|
|
- consumer.accept(this);
|
|
+ return this.processQueue();
|
|
+ }
|
|
+ }
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ private boolean processQueue() {
|
|
+ if (this.pendingActions.isEmpty()) {
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore
|
|
+ // But if we are not on main due to login/status, the parent is synchronized on packetQueue
|
|
+ final java.util.Iterator<WrappedConsumer> iterator = this.pendingActions.iterator();
|
|
+ while (iterator.hasNext()) {
|
|
+ final WrappedConsumer queued = iterator.next(); // poll -> peek
|
|
+
|
|
+ // Fix NPE (Spigot bug caused by handleDisconnection())
|
|
+ if (queued == null) {
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ if (queued.isConsumed()) {
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ if (queued instanceof PacketSendAction packetSendAction) {
|
|
+ final Packet<?> packet = packetSendAction.packet;
|
|
+ if (!packet.isReady()) {
|
|
+ return false;
|
|
}
|
|
}
|
|
+
|
|
+ iterator.remove();
|
|
+ if (queued.tryMarkConsumed()) {
|
|
+ queued.accept(this);
|
|
+ }
|
|
}
|
|
+ return true;
|
|
}
|
|
+ // Paper end - Optimize network
|
|
|
|
private static final int MAX_PER_TICK = io.papermc.paper.configuration.GlobalConfiguration.get().misc.maxJoinsPerTick; // Paper - Buffer joins to world
|
|
private static int joinAttemptsThisTick; // Paper - Buffer joins to world
|
|
@@ -552,6 +644,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
|
|
public void disconnect(DisconnectionDetails disconnectionDetails) {
|
|
this.preparing = false; // Spigot
|
|
+ this.clearPacketQueue(); // Paper - Optimize network
|
|
if (this.channel == null) {
|
|
this.delayedDisconnect = disconnectionDetails;
|
|
}
|
|
@@ -740,7 +833,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
public void handleDisconnection() {
|
|
if (this.channel != null && !this.channel.isOpen()) {
|
|
if (this.disconnectionHandled) {
|
|
- LOGGER.warn("handleDisconnection() called twice");
|
|
+ // LOGGER.warn("handleDisconnection() called twice"); // Paper - Don't log useless message
|
|
} else {
|
|
this.disconnectionHandled = true;
|
|
PacketListener packetListener = this.getPacketListener();
|
|
@@ -751,7 +844,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
);
|
|
packetListener1.onDisconnect(disconnectionDetails);
|
|
}
|
|
- this.pendingActions.clear(); // Free up packet queue.
|
|
+ this.clearPacketQueue(); // Paper - Optimize network
|
|
// Paper start - Add PlayerConnectionCloseEvent
|
|
if (packetListener instanceof net.minecraft.server.network.ServerCommonPacketListenerImpl commonPacketListener) {
|
|
/* Player was logged in, either game listener or configuration listener */
|
|
@@ -786,4 +879,97 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
public void setBandwidthLogger(LocalSampleLogger bandwithLogger) {
|
|
this.bandwidthDebugMonitor = new BandwidthDebugMonitor(bandwithLogger);
|
|
}
|
|
+
|
|
+ // Paper start - Optimize network
|
|
+ public void clearPacketQueue() {
|
|
+ final net.minecraft.server.level.ServerPlayer player = getPlayer();
|
|
+ for (final Consumer<Connection> queuedAction : this.pendingActions) {
|
|
+ if (queuedAction instanceof PacketSendAction packetSendAction) {
|
|
+ final Packet<?> packet = packetSendAction.packet;
|
|
+ if (packet.hasFinishListener()) {
|
|
+ packet.onPacketDispatchFinish(player, null);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ this.pendingActions.clear();
|
|
+ }
|
|
+
|
|
+ private static class InnerUtil { // Attempt to hide these methods from ProtocolLib, so it doesn't accidently pick them up.
|
|
+
|
|
+ @Nullable
|
|
+ private static java.util.List<Packet<?>> buildExtraPackets(final Packet<?> packet) {
|
|
+ final java.util.List<Packet<?>> extra = packet.getExtraPackets();
|
|
+ if (extra == null || extra.isEmpty()) {
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+ final java.util.List<Packet<?>> ret = new java.util.ArrayList<>(1 + extra.size());
|
|
+ buildExtraPackets0(extra, ret);
|
|
+ return ret;
|
|
+ }
|
|
+
|
|
+ private static void buildExtraPackets0(final java.util.List<Packet<?>> extraPackets, final java.util.List<Packet<?>> into) {
|
|
+ for (final Packet<?> extra : extraPackets) {
|
|
+ into.add(extra);
|
|
+ final java.util.List<Packet<?>> extraExtra = extra.getExtraPackets();
|
|
+ if (extraExtra != null && !extraExtra.isEmpty()) {
|
|
+ buildExtraPackets0(extraExtra, into);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private static boolean canSendImmediate(final Connection networkManager, final net.minecraft.network.protocol.Packet<?> packet) {
|
|
+ return networkManager.isPending || networkManager.packetListener.protocol() != ConnectionProtocol.PLAY ||
|
|
+ packet instanceof net.minecraft.network.protocol.common.ClientboundKeepAlivePacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundDisguisedChatPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitleTextPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetSubtitleTextPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetActionBarTextPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitlesAnimationPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundClearTitlesPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSoundPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSoundEntityPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundStopSoundPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundLevelParticlesPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerInfoUpdatePacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerInfoRemovePacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundBossEventPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.ping.ClientboundPongResponsePacket;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private static class WrappedConsumer implements Consumer<Connection> {
|
|
+ private final Consumer<Connection> delegate;
|
|
+ private final java.util.concurrent.atomic.AtomicBoolean consumed = new java.util.concurrent.atomic.AtomicBoolean(false);
|
|
+
|
|
+ private WrappedConsumer(final Consumer<Connection> delegate) {
|
|
+ this.delegate = delegate;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void accept(final Connection connection) {
|
|
+ this.delegate.accept(connection);
|
|
+ }
|
|
+
|
|
+ public boolean tryMarkConsumed() {
|
|
+ return consumed.compareAndSet(false, true);
|
|
+ }
|
|
+
|
|
+ public boolean isConsumed() {
|
|
+ return consumed.get();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private static final class PacketSendAction extends WrappedConsumer {
|
|
+ private final Packet<?> packet;
|
|
+
|
|
+ private PacketSendAction(final Packet<?> packet, @Nullable final ChannelFutureListener channelFutureListener, final boolean flush) {
|
|
+ super(connection -> connection.sendPacket(packet, channelFutureListener, flush));
|
|
+ this.packet = packet;
|
|
+ }
|
|
+ }
|
|
+ // Paper end - Optimize network
|
|
}
|
|
diff --git a/net/minecraft/network/protocol/Packet.java b/net/minecraft/network/protocol/Packet.java
|
|
index 65ff8b9112ec76eeac48c679044fc02ae7d4ffeb..e4789584cbe43959681a8522c66eab58369bebd0 100644
|
|
--- a/net/minecraft/network/protocol/Packet.java
|
|
+++ b/net/minecraft/network/protocol/Packet.java
|
|
@@ -35,4 +35,32 @@ public interface Packet<T extends PacketListener> {
|
|
static <B extends ByteBuf, T extends Packet<?>> StreamCodec<B, T> codec(StreamMemberEncoder<B, T> encoder, StreamDecoder<B, T> decoder) {
|
|
return StreamCodec.ofMember(encoder, decoder);
|
|
}
|
|
+
|
|
+ // Paper start
|
|
+ /**
|
|
+ * @param player Null if not at PLAY stage yet
|
|
+ */
|
|
+ default void onPacketDispatch(@org.jetbrains.annotations.Nullable net.minecraft.server.level.ServerPlayer player) {
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * @param player Null if not at PLAY stage yet
|
|
+ * @param future Can be null if packet was cancelled
|
|
+ */
|
|
+ default void onPacketDispatchFinish(@org.jetbrains.annotations.Nullable net.minecraft.server.level.ServerPlayer player, @org.jetbrains.annotations.Nullable io.netty.channel.ChannelFuture future) {
|
|
+ }
|
|
+
|
|
+ default boolean hasFinishListener() {
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ default boolean isReady() {
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ @org.jetbrains.annotations.Nullable
|
|
+ default java.util.List<Packet<?>> getExtraPackets() {
|
|
+ return null;
|
|
+ }
|
|
+ // Paper end
|
|
}
|
|
diff --git a/net/minecraft/server/network/ServerConnectionListener.java b/net/minecraft/server/network/ServerConnectionListener.java
|
|
index b873af7183d9b1aabc87e63d254a4326f17b21c9..7de11ba404f0b60e7f7b7c16954811a343688219 100644
|
|
--- a/net/minecraft/server/network/ServerConnectionListener.java
|
|
+++ b/net/minecraft/server/network/ServerConnectionListener.java
|
|
@@ -66,11 +66,13 @@ public class ServerConnectionListener {
|
|
|
|
// Paper start - prevent blocking on adding a new connection while the server is ticking
|
|
private final java.util.Queue<Connection> pending = new java.util.concurrent.ConcurrentLinkedQueue<>();
|
|
+ private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper - Optimize network
|
|
|
|
private final void addPending() {
|
|
Connection connection;
|
|
while ((connection = this.pending.poll()) != null) {
|
|
this.connections.add(connection);
|
|
+ connection.isPending = false; // Paper - Optimize network
|
|
}
|
|
}
|
|
// Paper end - prevent blocking on adding a new connection while the server is ticking
|
|
@@ -120,6 +122,7 @@ public class ServerConnectionListener {
|
|
} catch (ChannelException var5) {
|
|
}
|
|
|
|
+ if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper - Optimize network
|
|
ChannelPipeline channelPipeline = channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30));
|
|
if (ServerConnectionListener.this.server.repliesToStatus()) {
|
|
channelPipeline.addLast("legacy_query", new LegacyQueryHandler(ServerConnectionListener.this.getServer()));
|