Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework bungee injector to allow for packet cancellation #999

Merged
merged 2 commits into from
Sep 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,30 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.EncoderException;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.PromiseCombiner;
import net.md_5.bungee.api.connection.ProxiedPlayer;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;

// Thanks to ViaVersion for the compression method.
@ChannelHandler.Sharable
public class PacketEventsEncoder extends MessageToMessageEncoder<ByteBuf> {
public class PacketEventsEncoder extends ChannelOutboundHandlerAdapter {

private static final Recycler<OutList> OUT_LIST_RECYCLER = new Recycler<OutList>() {
@Override
protected OutList newObject(Handle<OutList> handle) {
return new OutList(handle);
}
};

public ProxiedPlayer player;
public User user;
public boolean handledCompression;
Expand All @@ -45,7 +59,7 @@ public PacketEventsEncoder(User user) {
this.user = user;
}

public void read(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
public void read(ChannelHandlerContext ctx, ByteBuf buffer, ChannelPromise promise) throws Exception {
boolean doCompression = handleCompressionOrder(ctx, buffer);
int firstReaderIndex = buffer.readerIndex();
PacketSendEvent packetSendEvent = EventCreationUtil.createSendEvent(ctx.channel(), user, player,
Expand All @@ -57,17 +71,16 @@ public void read(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) th
ByteBufHelper.clear(packetSendEvent.getByteBuf());
packetSendEvent.getLastUsedWrapper().writeVarInt(packetSendEvent.getPacketId());
packetSendEvent.getLastUsedWrapper().write();
}
else {
} else {
buffer.readerIndex(firstReaderIndex);
}
if (doCompression) {
recompress(ctx, buffer, out);
this.recompress(ctx, buffer, promise);
} else {
out.add(buffer.retain());
ctx.write(buffer, promise);
}
} else {
ByteBufHelper.clear(packetSendEvent.getByteBuf());
ReferenceCountUtil.release(packetSendEvent.getByteBuf());
}
if (packetSendEvent.hasPostTasks()) {
for (Runnable task : packetSendEvent.getPostTasks()) {
Expand All @@ -77,16 +90,17 @@ public void read(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) th
}

@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
if (!msg.isReadable()) {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (!(msg instanceof ByteBuf)) {
super.write(ctx, msg, promise);
return;
}
read(ctx, msg, out);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ByteBuf buf = (ByteBuf) msg;
if (!buf.isReadable()) {
buf.release();
} else {
this.read(ctx, buf, promise);
}
}

private boolean handleCompressionOrder(ChannelHandlerContext ctx, ByteBuf buffer) {
Expand Down Expand Up @@ -125,12 +139,41 @@ private boolean handleCompressionOrder(ChannelHandlerContext ctx, ByteBuf buffer
return false;
}

private void recompress(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) {
ChannelHandler compressor = ctx.pipeline().get("compress");
private void recompress(ChannelHandlerContext ctx, ByteBuf buffer, ChannelPromise promise) {
OutList outWrapper = OUT_LIST_RECYCLER.get();
List<Object> out = outWrapper.list;
try {
ChannelHandler compressor = ctx.pipeline().get("compress");
CustomPipelineUtil.callPacketEncodeByteBuf(compressor, ctx, buffer, out);
} catch (InvocationTargetException e) {
e.printStackTrace();

int len = out.size();
if (len == 1) {
// should be the only case which
// happens on vanilla bungeecord
ctx.write(out.get(0), promise);
} else {
// copied from MessageToMessageEncoder#writePromiseCombiner
PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
for (int i = 0; i < len; i++) {
combiner.add(ctx.write(out.get(i)));
}
combiner.finish(promise);
}
} catch (InvocationTargetException exception) {
throw new EncoderException("Error while recompressing bytebuf " + buffer.readableBytes(), exception);
} finally {
outWrapper.handle.recycle(outWrapper);
}
}

private static final class OutList {

// the default bungee compressor only produces one output bytebuf
private final List<Object> list = new ArrayList<>(1);
private final Recycler.Handle<OutList> handle;

public OutList(Recycler.Handle<OutList> handle) {
this.handle = handle;
}
}
}