From 01888faac66be749290af538e5f968fb3056a234 Mon Sep 17 00:00:00 2001 From: Collin Smith Date: Sat, 20 Jun 2020 14:03:27 -0700 Subject: [PATCH] Implemented ReliableUtil and composite ByteBuf into ReliableChannelHandler Implemented ReliableUtil and composite ByteBuf into ReliableChannelHandler ReliableChannelHandler automatically replaces outbound message with composite Removed size prefixing contract from Packet factory Added ReliableUtil#createHeader to write data and set writer index passed header --- .../src/com/riiablo/server/netty/Packet.java | 5 +-- .../server/netty/ReliableChannelHandler.java | 45 ++++++++++++------- .../riiablo/server/netty/ReliableUtil.java | 17 +++++++ .../server/netty/ReliableUtilTest.java | 20 +++++---- 4 files changed, 59 insertions(+), 28 deletions(-) diff --git a/server/netty/src/com/riiablo/server/netty/Packet.java b/server/netty/src/com/riiablo/server/netty/Packet.java index 2732b57c..ac6df4e6 100644 --- a/server/netty/src/com/riiablo/server/netty/Packet.java +++ b/server/netty/src/com/riiablo/server/netty/Packet.java @@ -1,6 +1,5 @@ package com.riiablo.server.netty; -import com.google.flatbuffers.ByteBufferUtil; import java.nio.ByteBuffer; import com.badlogic.gdx.utils.TimeUtils; @@ -11,14 +10,14 @@ public class Packet { public int id; public long time; public ByteBuffer buffer; - public Netty data; + public Netty data; public static Packet obtain(int id, ByteBuffer buffer) { Packet packet = new Packet(); packet.id = id; packet.time = TimeUtils.millis(); packet.buffer = buffer; - packet.data = Netty.getRootAsNetty(ByteBufferUtil.removeSizePrefix(buffer)); + packet.data = Netty.getRootAsNetty(buffer); return packet; } } diff --git a/server/netty/src/com/riiablo/server/netty/ReliableChannelHandler.java b/server/netty/src/com/riiablo/server/netty/ReliableChannelHandler.java index 9a046fd4..974e1778 100644 --- a/server/netty/src/com/riiablo/server/netty/ReliableChannelHandler.java +++ b/server/netty/src/com/riiablo/server/netty/ReliableChannelHandler.java @@ -1,8 +1,8 @@ package com.riiablo.server.netty; -import com.google.flatbuffers.ByteBufferUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.CompositeByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; @@ -25,6 +25,7 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan private static final boolean DEBUG = true; private static final boolean DEBUG_SEQ = DEBUG && true; private static final boolean DEBUG_OUTBOUND = DEBUG && true; + private static final boolean DEBUG_INBOUND = DEBUG && true; private final TypeParameterMatcher matcher; @@ -46,9 +47,13 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan InetSocketAddress sender = msg.sender(); Gdx.app.log(TAG, "channelRead0 Packet from " + sender.getHostName() + ":" + sender.getPort()); ByteBuf in = msg.content(); + if (DEBUG_INBOUND) Gdx.app.log(TAG, " " + ByteBufUtil.hexDump(in)); try { boolean valid = processHeader(ctx, in); - ByteBuffer buffer = in.nioBuffer(); + if (!valid) return; + ByteBuf content = ReliableUtil.getContent(in); + if (DEBUG_INBOUND) Gdx.app.log(TAG, " " + ByteBufUtil.hexDump(content)); + ByteBuffer buffer = content.nioBuffer(); Packet packet = Packet.obtain(0, buffer); processPacket(ctx, packet.data); } finally { @@ -57,6 +62,7 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan } protected boolean processHeader(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + if (DEBUG_INBOUND) Gdx.app.log(TAG, " " + ByteBufUtil.hexDump(ReliableUtil.getHeader(in))); int remoteProtocol = ReliableUtil.getProtocol(in); if (remoteProtocol != PROTOCOL) { Gdx.app.log(TAG, String.format(" rejected incoming PROTO:%d", remoteProtocol)); @@ -66,8 +72,9 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan int remoteSeq = ReliableUtil.getSEQ(in); int remoteAck = ReliableUtil.getACK(in); int remoteAckBits = ReliableUtil.getACK_BITS(in); + int csize = ReliableUtil.getContentSize(in); - Gdx.app.log(TAG, " accepted incoming " + String.format("PROTO:%d SEQ:%d ACK:%d ACK_BITS:%08x", remoteProtocol, remoteSeq, remoteAck, remoteAckBits)); + Gdx.app.log(TAG, " accepted incoming " + ReliableUtil.toString(in)); if (ack < 0) { ack = remoteSeq; Gdx.app.log(TAG, " init ack=" + ack); @@ -111,20 +118,26 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan return seq = (seq + 1) & 0xFFFF; } - protected void channelWrite0(ChannelHandlerContext ctx, Object msg) throws Exception { + protected Object channelWrite0(ChannelHandlerContext ctx, Object msg) throws Exception { InetSocketAddress receiver = (InetSocketAddress) ctx.channel().remoteAddress(); Gdx.app.log(TAG, "channelWrite0 Packet to " + receiver.getHostName() + ":" + receiver.getPort()); - ByteBuf out = (ByteBuf) msg; - try { - Gdx.app.debug(TAG, "out.nioBufferCount()=" + out.nioBufferCount()); - ByteBuffer nioBuffer = out.nioBufferCount() > 0 - ? out.nioBuffer() - : ByteBuffer.wrap(ByteBufUtil.getBytes(out)); - Gdx.app.debug(TAG, "nioBuffer=" + nioBuffer); - nioBuffer = ByteBufferUtil.removeSizePrefix(nioBuffer); - Gdx.app.log(TAG, " " + String.format("PROTO:%d SEQ:%d ACK:%d ACK_BITS:%08x", 0, 0, 0, 0)); - } finally { - } + + ByteBuf header = ctx.alloc().buffer(); // TODO: worth sizing this correctly? + ReliableUtil.createHeader(header, PROTOCOL, nextSequence(), ack, ack_bits); + if (DEBUG_SEQ) Gdx.app.log(TAG, " " + ReliableUtil.toString(header)); + if (DEBUG_OUTBOUND) Gdx.app.log(TAG, " " + ByteBufUtil.hexDump(header)); + + ByteBuf content = (ByteBuf) msg; + ReliableUtil.setContentSize(header, content.readableBytes()); + + CompositeByteBuf composite = ctx.alloc().compositeBuffer(2) + .addComponent(true, header) + .addComponent(true, content); + + if (DEBUG_OUTBOUND) Gdx.app.log(TAG, " " + ByteBufUtil.hexDump(content)); + if (DEBUG_OUTBOUND) Gdx.app.log(TAG, " " + ByteBufUtil.hexDump(composite)); + + return composite; } @Override @@ -225,7 +238,7 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { Gdx.app.debug(TAG, "write"); - if (DEBUG_OUTBOUND) channelWrite0(ctx, msg); + msg = channelWrite0(ctx, msg); ctx.write(msg, promise); } diff --git a/server/netty/src/com/riiablo/server/netty/ReliableUtil.java b/server/netty/src/com/riiablo/server/netty/ReliableUtil.java index 7756a54e..8c10dd90 100644 --- a/server/netty/src/com/riiablo/server/netty/ReliableUtil.java +++ b/server/netty/src/com/riiablo/server/netty/ReliableUtil.java @@ -44,6 +44,10 @@ public class ReliableUtil { return bb.getUnsignedShort(CONTENT_SIZE_OFFSET); } + public static ByteBuf getHeader(ByteBuf bb) { + return bb.slice(0, CONTENT_OFFSET); + } + public static ByteBuf getContent(ByteBuf bb) { return bb.slice(CONTENT_OFFSET, getContentSize(bb)); } @@ -75,4 +79,17 @@ public class ReliableUtil { bb.setBytes(CONTENT_OFFSET, src); src.reset(); } + + static void createHeader(ByteBuf bb, int protocol, int seq, int ack, int ack_bits) { + bb.writerIndex(CONTENT_OFFSET); + setProtocol(bb, protocol); + setSEQ(bb, seq); + setACK(bb, ack); + setACK_BITS(bb, ack_bits); + } + + static String toString(ByteBuf bb) { + return String.format("PROTO:%d SEQ:%d ACK:%d ACK_BITS:%08x CSIZE:%d", + getProtocol(bb), getSEQ(bb), getACK(bb), getACK_BITS(bb), getContentSize(bb)); + } } diff --git a/server/netty/src/com/riiablo/server/netty/ReliableUtilTest.java b/server/netty/src/com/riiablo/server/netty/ReliableUtilTest.java index 4216c57a..159020a1 100644 --- a/server/netty/src/com/riiablo/server/netty/ReliableUtilTest.java +++ b/server/netty/src/com/riiablo/server/netty/ReliableUtilTest.java @@ -6,11 +6,13 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.ReferenceCountUtil; +import java.nio.ByteBuffer; import org.apache.commons.lang3.StringUtils; import com.riiablo.net.packet.netty.Connection; import com.riiablo.net.packet.netty.Netty; import com.riiablo.net.packet.netty.NettyData; +import com.riiablo.util.BufferUtils; public class ReliableUtilTest { public static void main(String[] args) { @@ -72,13 +74,7 @@ public class ReliableUtilTest { final int ACK_BITS = 0xFF0000FF; ByteBuf bbHeader = bb.alloc().buffer(); - - ReliableUtil.setProtocol(bbHeader, PROTOCOL); - ReliableUtil.setSEQ(bbHeader, SEQ); - ReliableUtil.setACK(bbHeader, ACK); - ReliableUtil.setACK_BITS(bbHeader, ACK_BITS); - - bbHeader.writerIndex(ReliableUtil.CONTENT_OFFSET); // hack to force writer position passed header + ReliableUtil.createHeader(bbHeader, PROTOCOL, SEQ, ACK, ACK_BITS); System.out.println("HEADER: " + ByteBufUtil.hexDump(bbHeader)); // note: hexDump requires writerIndex FlatBufferBuilder builder = new FlatBufferBuilder(); @@ -88,14 +84,16 @@ public class ReliableUtilTest { Netty.finishNettyBuffer(builder, offset); ByteBuf bbContent = bb.alloc().buffer(); + ByteBuffer dataBuffer = builder.dataBuffer(); + dataBuffer.mark(); bbContent.writeBytes(builder.dataBuffer()); + dataBuffer.reset(); System.out.println("CONTENT: " + ByteBufUtil.hexDump(bbContent)); // note: hexDump requires writerIndex - bb.addComponents(bbHeader, bbContent); + bb.addComponents(true, bbHeader, bbContent); ReliableUtil.setContentSize(bb, bbContent.readableBytes()); - bb.writerIndex(ReliableUtil.CONTENT_OFFSET + ReliableUtil.getContentSize(bb)); // hack to force writer position passed content System.out.println(ByteBufUtil.hexDump(bb)); // note: hexDump requires writerIndex System.out.printf("%-8s %-5s %02x%n", "PROTOCOL", PROTOCOL == ReliableUtil.getProtocol(bb), ReliableUtil.getProtocol(bb)); @@ -104,5 +102,9 @@ public class ReliableUtilTest { System.out.printf("%-8s %-5s %08x%n", "ACK_BITS", ACK_BITS == ReliableUtil.getACK_BITS(bb), ReliableUtil.getACK_BITS(bb)); System.out.printf("%-8s %-5s %04x%n", "CSIZE", builder.dataBuffer().remaining() == ReliableUtil.getContentSize(bb), ReliableUtil.getContentSize(bb)); System.out.printf("%-8s %-5s %s%n", "CONTENT", StringUtils.equals(ByteBufUtil.hexDump(builder.sizedByteArray()), ByteBufUtil.hexDump(ReliableUtil.getContent(bb))), ByteBufUtil.hexDump(ReliableUtil.getContent(bb))); + + ByteBuf content = ReliableUtil.getContent(bb); + ByteBuffer nioContent = content.nioBuffer(); + System.out.println(ByteBufUtil.hexDump(BufferUtils.readRemaining(nioContent))); } }