Implemented ReliableUtil and composite ByteBuf into ReliableChannelHandler

Implemented ReliableUtil and composite ByteBuf into ReliableChannelHandler
ReliableChannelHandler automatically replaces outbound message with composite
Removed size prefixing contract from Packet factory
Added ReliableUtil#createHeader to write data and set writer index passed header
This commit is contained in:
Collin Smith 2020-06-20 14:03:27 -07:00
parent d672768495
commit 01888faac6
4 changed files with 59 additions and 28 deletions

View File

@ -1,6 +1,5 @@
package com.riiablo.server.netty;
import com.google.flatbuffers.ByteBufferUtil;
import java.nio.ByteBuffer;
import com.badlogic.gdx.utils.TimeUtils;
@ -11,14 +10,14 @@ public class Packet {
public int id;
public long time;
public ByteBuffer buffer;
public Netty data;
public Netty data;
public static Packet obtain(int id, ByteBuffer buffer) {
Packet packet = new Packet();
packet.id = id;
packet.time = TimeUtils.millis();
packet.buffer = buffer;
packet.data = Netty.getRootAsNetty(ByteBufferUtil.removeSizePrefix(buffer));
packet.data = Netty.getRootAsNetty(buffer);
return packet;
}
}

View File

@ -1,8 +1,8 @@
package com.riiablo.server.netty;
import com.google.flatbuffers.ByteBufferUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
@ -25,6 +25,7 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan
private static final boolean DEBUG = true;
private static final boolean DEBUG_SEQ = DEBUG && true;
private static final boolean DEBUG_OUTBOUND = DEBUG && true;
private static final boolean DEBUG_INBOUND = DEBUG && true;
private final TypeParameterMatcher matcher;
@ -46,9 +47,13 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan
InetSocketAddress sender = msg.sender();
Gdx.app.log(TAG, "channelRead0 Packet from " + sender.getHostName() + ":" + sender.getPort());
ByteBuf in = msg.content();
if (DEBUG_INBOUND) Gdx.app.log(TAG, " " + ByteBufUtil.hexDump(in));
try {
boolean valid = processHeader(ctx, in);
ByteBuffer buffer = in.nioBuffer();
if (!valid) return;
ByteBuf content = ReliableUtil.getContent(in);
if (DEBUG_INBOUND) Gdx.app.log(TAG, " " + ByteBufUtil.hexDump(content));
ByteBuffer buffer = content.nioBuffer();
Packet packet = Packet.obtain(0, buffer);
processPacket(ctx, packet.data);
} finally {
@ -57,6 +62,7 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan
}
protected boolean processHeader(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (DEBUG_INBOUND) Gdx.app.log(TAG, " " + ByteBufUtil.hexDump(ReliableUtil.getHeader(in)));
int remoteProtocol = ReliableUtil.getProtocol(in);
if (remoteProtocol != PROTOCOL) {
Gdx.app.log(TAG, String.format(" rejected incoming PROTO:%d", remoteProtocol));
@ -66,8 +72,9 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan
int remoteSeq = ReliableUtil.getSEQ(in);
int remoteAck = ReliableUtil.getACK(in);
int remoteAckBits = ReliableUtil.getACK_BITS(in);
int csize = ReliableUtil.getContentSize(in);
Gdx.app.log(TAG, " accepted incoming " + String.format("PROTO:%d SEQ:%d ACK:%d ACK_BITS:%08x", remoteProtocol, remoteSeq, remoteAck, remoteAckBits));
Gdx.app.log(TAG, " accepted incoming " + ReliableUtil.toString(in));
if (ack < 0) {
ack = remoteSeq;
Gdx.app.log(TAG, " init ack=" + ack);
@ -111,20 +118,26 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan
return seq = (seq + 1) & 0xFFFF;
}
protected void channelWrite0(ChannelHandlerContext ctx, Object msg) throws Exception {
protected Object channelWrite0(ChannelHandlerContext ctx, Object msg) throws Exception {
InetSocketAddress receiver = (InetSocketAddress) ctx.channel().remoteAddress();
Gdx.app.log(TAG, "channelWrite0 Packet to " + receiver.getHostName() + ":" + receiver.getPort());
ByteBuf out = (ByteBuf) msg;
try {
Gdx.app.debug(TAG, "out.nioBufferCount()=" + out.nioBufferCount());
ByteBuffer nioBuffer = out.nioBufferCount() > 0
? out.nioBuffer()
: ByteBuffer.wrap(ByteBufUtil.getBytes(out));
Gdx.app.debug(TAG, "nioBuffer=" + nioBuffer);
nioBuffer = ByteBufferUtil.removeSizePrefix(nioBuffer);
Gdx.app.log(TAG, " " + String.format("PROTO:%d SEQ:%d ACK:%d ACK_BITS:%08x", 0, 0, 0, 0));
} finally {
}
ByteBuf header = ctx.alloc().buffer(); // TODO: worth sizing this correctly?
ReliableUtil.createHeader(header, PROTOCOL, nextSequence(), ack, ack_bits);
if (DEBUG_SEQ) Gdx.app.log(TAG, " " + ReliableUtil.toString(header));
if (DEBUG_OUTBOUND) Gdx.app.log(TAG, " " + ByteBufUtil.hexDump(header));
ByteBuf content = (ByteBuf) msg;
ReliableUtil.setContentSize(header, content.readableBytes());
CompositeByteBuf composite = ctx.alloc().compositeBuffer(2)
.addComponent(true, header)
.addComponent(true, content);
if (DEBUG_OUTBOUND) Gdx.app.log(TAG, " " + ByteBufUtil.hexDump(content));
if (DEBUG_OUTBOUND) Gdx.app.log(TAG, " " + ByteBufUtil.hexDump(composite));
return composite;
}
@Override
@ -225,7 +238,7 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Gdx.app.debug(TAG, "write");
if (DEBUG_OUTBOUND) channelWrite0(ctx, msg);
msg = channelWrite0(ctx, msg);
ctx.write(msg, promise);
}

View File

@ -44,6 +44,10 @@ public class ReliableUtil {
return bb.getUnsignedShort(CONTENT_SIZE_OFFSET);
}
public static ByteBuf getHeader(ByteBuf bb) {
return bb.slice(0, CONTENT_OFFSET);
}
public static ByteBuf getContent(ByteBuf bb) {
return bb.slice(CONTENT_OFFSET, getContentSize(bb));
}
@ -75,4 +79,17 @@ public class ReliableUtil {
bb.setBytes(CONTENT_OFFSET, src);
src.reset();
}
static void createHeader(ByteBuf bb, int protocol, int seq, int ack, int ack_bits) {
bb.writerIndex(CONTENT_OFFSET);
setProtocol(bb, protocol);
setSEQ(bb, seq);
setACK(bb, ack);
setACK_BITS(bb, ack_bits);
}
static String toString(ByteBuf bb) {
return String.format("PROTO:%d SEQ:%d ACK:%d ACK_BITS:%08x CSIZE:%d",
getProtocol(bb), getSEQ(bb), getACK(bb), getACK_BITS(bb), getContentSize(bb));
}
}

View File

@ -6,11 +6,13 @@ import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import java.nio.ByteBuffer;
import org.apache.commons.lang3.StringUtils;
import com.riiablo.net.packet.netty.Connection;
import com.riiablo.net.packet.netty.Netty;
import com.riiablo.net.packet.netty.NettyData;
import com.riiablo.util.BufferUtils;
public class ReliableUtilTest {
public static void main(String[] args) {
@ -72,13 +74,7 @@ public class ReliableUtilTest {
final int ACK_BITS = 0xFF0000FF;
ByteBuf bbHeader = bb.alloc().buffer();
ReliableUtil.setProtocol(bbHeader, PROTOCOL);
ReliableUtil.setSEQ(bbHeader, SEQ);
ReliableUtil.setACK(bbHeader, ACK);
ReliableUtil.setACK_BITS(bbHeader, ACK_BITS);
bbHeader.writerIndex(ReliableUtil.CONTENT_OFFSET); // hack to force writer position passed header
ReliableUtil.createHeader(bbHeader, PROTOCOL, SEQ, ACK, ACK_BITS);
System.out.println("HEADER: " + ByteBufUtil.hexDump(bbHeader)); // note: hexDump requires writerIndex
FlatBufferBuilder builder = new FlatBufferBuilder();
@ -88,14 +84,16 @@ public class ReliableUtilTest {
Netty.finishNettyBuffer(builder, offset);
ByteBuf bbContent = bb.alloc().buffer();
ByteBuffer dataBuffer = builder.dataBuffer();
dataBuffer.mark();
bbContent.writeBytes(builder.dataBuffer());
dataBuffer.reset();
System.out.println("CONTENT: " + ByteBufUtil.hexDump(bbContent)); // note: hexDump requires writerIndex
bb.addComponents(bbHeader, bbContent);
bb.addComponents(true, bbHeader, bbContent);
ReliableUtil.setContentSize(bb, bbContent.readableBytes());
bb.writerIndex(ReliableUtil.CONTENT_OFFSET + ReliableUtil.getContentSize(bb)); // hack to force writer position passed content
System.out.println(ByteBufUtil.hexDump(bb)); // note: hexDump requires writerIndex
System.out.printf("%-8s %-5s %02x%n", "PROTOCOL", PROTOCOL == ReliableUtil.getProtocol(bb), ReliableUtil.getProtocol(bb));
@ -104,5 +102,9 @@ public class ReliableUtilTest {
System.out.printf("%-8s %-5s %08x%n", "ACK_BITS", ACK_BITS == ReliableUtil.getACK_BITS(bb), ReliableUtil.getACK_BITS(bb));
System.out.printf("%-8s %-5s %04x%n", "CSIZE", builder.dataBuffer().remaining() == ReliableUtil.getContentSize(bb), ReliableUtil.getContentSize(bb));
System.out.printf("%-8s %-5s %s%n", "CONTENT", StringUtils.equals(ByteBufUtil.hexDump(builder.sizedByteArray()), ByteBufUtil.hexDump(ReliableUtil.getContent(bb))), ByteBufUtil.hexDump(ReliableUtil.getContent(bb)));
ByteBuf content = ReliableUtil.getContent(bb);
ByteBuffer nioContent = content.nioBuffer();
System.out.println(ByteBufUtil.hexDump(BufferUtils.readRemaining(nioContent)));
}
}