Created setContentSize method to support composite buffers

Created setContentSize method to support composite buffers
Removed UDP header fields from Netty fbs table
This commit is contained in:
Collin Smith 2020-06-20 12:47:31 -07:00
parent 26aebd5183
commit 5c4c06d47e
6 changed files with 85 additions and 44 deletions

View File

@ -14,37 +14,21 @@ public final class Netty extends Table {
public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; vtable_start = bb_pos - bb.getInt(bb_pos); vtable_size = bb.getShort(vtable_start); }
public Netty __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; }
public int protocol() { int o = __offset(4); return o != 0 ? bb.get(o + bb_pos) & 0xFF : 0; }
public int sequence() { int o = __offset(6); return o != 0 ? bb.getShort(o + bb_pos) & 0xFFFF : 0; }
public int ack() { int o = __offset(8); return o != 0 ? bb.getShort(o + bb_pos) & 0xFFFF : 0; }
public int ackBits() { int o = __offset(10); return o != 0 ? bb.getInt(o + bb_pos) : 0; }
public byte dataType() { int o = __offset(12); return o != 0 ? bb.get(o + bb_pos) : 0; }
public Table data(Table obj) { int o = __offset(14); return o != 0 ? __union(obj, o) : null; }
public byte dataType() { int o = __offset(4); return o != 0 ? bb.get(o + bb_pos) : 0; }
public Table data(Table obj) { int o = __offset(6); return o != 0 ? __union(obj, o) : null; }
public static int createNetty(FlatBufferBuilder builder,
int protocol,
int sequence,
int ack,
int ack_bits,
byte data_type,
int dataOffset) {
builder.startObject(6);
builder.startObject(2);
Netty.addData(builder, dataOffset);
Netty.addAckBits(builder, ack_bits);
Netty.addAck(builder, ack);
Netty.addSequence(builder, sequence);
Netty.addDataType(builder, data_type);
Netty.addProtocol(builder, protocol);
return Netty.endNetty(builder);
}
public static void startNetty(FlatBufferBuilder builder) { builder.startObject(6); }
public static void addProtocol(FlatBufferBuilder builder, int protocol) { builder.addByte(0, (byte)protocol, (byte)0); }
public static void addSequence(FlatBufferBuilder builder, int sequence) { builder.addShort(1, (short)sequence, (short)0); }
public static void addAck(FlatBufferBuilder builder, int ack) { builder.addShort(2, (short)ack, (short)0); }
public static void addAckBits(FlatBufferBuilder builder, int ackBits) { builder.addInt(3, ackBits, 0); }
public static void addDataType(FlatBufferBuilder builder, byte dataType) { builder.addByte(4, dataType, 0); }
public static void addData(FlatBufferBuilder builder, int dataOffset) { builder.addOffset(5, dataOffset, 0); }
public static void startNetty(FlatBufferBuilder builder) { builder.startObject(2); }
public static void addDataType(FlatBufferBuilder builder, byte dataType) { builder.addByte(0, dataType, 0); }
public static void addData(FlatBufferBuilder builder, int dataOffset) { builder.addOffset(1, dataOffset, 0); }
public static int endNetty(FlatBufferBuilder builder) {
int o = builder.endObject();
return o;

View File

@ -9,13 +9,6 @@ union NettyData {
}
table Netty {
// Header
protocol:uint8;
sequence:uint16;
ack:uint16;
ack_bits:int32;
// Content
data:NettyData;
}

View File

@ -23,6 +23,7 @@ import com.badlogic.gdx.backends.headless.HeadlessApplicationConfiguration;
import com.riiablo.codec.Animation;
import com.riiablo.net.packet.netty.Connection;
import com.riiablo.net.packet.netty.Netty;
import com.riiablo.net.packet.netty.NettyData;
public class Client extends ApplicationAdapter {
@ -76,12 +77,12 @@ public class Client extends ApplicationAdapter {
void init(ChannelHandlerContext ctx) {
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
Gdx.app.log(TAG, "Connecting to " + remoteAddress.getHostString() + ":" + remoteAddress.getPort());
Gdx.app.log(TAG, "Sending Connection packet to " + remoteAddress.getHostString() + ":" + remoteAddress.getPort());
FlatBufferBuilder builder = new FlatBufferBuilder();
Connection.startConnection(builder);
int dataOffset = Connection.endConnection(builder);
createNetty(builder, NettyData.Connection, dataOffset);
int offset = Netty.createNetty(builder, NettyData.Connection, dataOffset);
ByteBuf byteBuf = Unpooled.wrappedBuffer(builder.dataBuffer());
ctx.writeAndFlush(byteBuf);

View File

@ -1,7 +1,6 @@
package com.riiablo.server.netty;
import com.google.flatbuffers.ByteBufferUtil;
import com.google.flatbuffers.FlatBufferBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandler;
@ -48,18 +47,27 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan
Gdx.app.log(TAG, "channelRead0 Packet from " + sender.getHostName() + ":" + sender.getPort());
ByteBuf in = msg.content();
try {
boolean valid = processHeader(ctx, in);
ByteBuffer buffer = in.nioBuffer();
Packet packet = Packet.obtain(0, buffer);
processHeader(ctx, packet.data);
processPacket(ctx, packet.data);
} finally {
// in.release(); // Automatically released by channelRead() right now
}
}
protected void processHeader(ChannelHandlerContext ctx, Netty netty) throws Exception {
Gdx.app.log(TAG, " incoming " + String.format("PROTO:%d SEQ:%d ACK:%d ACK_BITS:%08x", netty.protocol(), netty.sequence(), netty.ack(), netty.ackBits()));
int remoteSeq = netty.sequence();
protected boolean processHeader(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
int remoteProtocol = ReliableUtil.getProtocol(in);
if (remoteProtocol != PROTOCOL) {
Gdx.app.log(TAG, String.format(" rejected incoming PROTO:%d", remoteProtocol));
return false;
}
int remoteSeq = ReliableUtil.getSEQ(in);
int remoteAck = ReliableUtil.getACK(in);
int remoteAckBits = ReliableUtil.getACK_BITS(in);
Gdx.app.log(TAG, " accepted incoming " + String.format("PROTO:%d SEQ:%d ACK:%d ACK_BITS:%08x", remoteProtocol, remoteSeq, remoteAck, remoteAckBits));
if (ack < 0) {
ack = remoteSeq;
Gdx.app.log(TAG, " init ack=" + ack);
@ -78,6 +86,7 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan
}
Gdx.app.log(TAG, " " + String.format("ACK:%d ACK_BITS:%08x", ack, ack_bits));
return true;
}
protected static boolean sequenceGreater(int a, int b) {
@ -98,9 +107,8 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan
}
protected void createNetty(FlatBufferBuilder builder, byte data_type, int dataOffset) {
int offset = Netty.createNetty(builder, PROTOCOL, seq = (seq + 1) & 0xFFFF, ack, ack_bits, data_type, dataOffset);
Netty.finishSizePrefixedNettyBuffer(builder, offset);
protected int nextSequence() {
return seq = (seq + 1) & 0xFFFF;
}
protected void channelWrite0(ChannelHandlerContext ctx, Object msg) throws Exception {
@ -114,8 +122,7 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan
: ByteBuffer.wrap(ByteBufUtil.getBytes(out));
Gdx.app.debug(TAG, "nioBuffer=" + nioBuffer);
nioBuffer = ByteBufferUtil.removeSizePrefix(nioBuffer);
Netty netty = Netty.getRootAsNetty(nioBuffer);
Gdx.app.log(TAG, " " + String.format("PROTO:%d SEQ:%d ACK:%d ACK_BITS:%08x", netty.protocol(), netty.sequence(), netty.ack(), netty.ackBits()));
Gdx.app.log(TAG, " " + String.format("PROTO:%d SEQ:%d ACK:%d ACK_BITS:%08x", 0, 0, 0, 0));
} finally {
}
}

View File

@ -64,10 +64,14 @@ public class ReliableUtil {
bb.setInt(ACK_BITS_OFFSET, value);
}
static void setContentSize(ByteBuf bb, int value) {
Validate.isTrue(value <= 0xFFFF, "cannot encode content size as ushort, src.remaining()=" + value);
bb.setShort(CONTENT_SIZE_OFFSET, value);
}
static void setContent(ByteBuf bb, ByteBuffer src) {
Validate.isTrue(src.remaining() <= 0xFFFF, "cannot encode content size as ushort, src.remaining()=" + src.remaining());
setContentSize(bb, src.remaining());
src.mark();
bb.setShort(CONTENT_SIZE_OFFSET, src.remaining());
bb.setBytes(CONTENT_OFFSET, src);
src.reset();
}

View File

@ -3,6 +3,7 @@ package com.riiablo.server.netty;
import com.google.flatbuffers.FlatBufferBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import org.apache.commons.lang3.StringUtils;
@ -20,6 +21,16 @@ public class ReliableUtilTest {
} finally {
ReferenceCountUtil.release(bb);
}
System.out.println("----");
CompositeByteBuf composite = null;
try {
composite = Unpooled.compositeBuffer(2);
testComposite(composite);
} finally {
ReferenceCountUtil.release(composite);
}
}
static void test(ByteBuf bb) {
@ -39,7 +50,7 @@ public class ReliableUtilTest {
FlatBufferBuilder builder = new FlatBufferBuilder();
Connection.startConnection(builder);
int dataOffset = Connection.endConnection(builder);
int offset = Netty.createNetty(builder, PROTOCOL, SEQ, ACK, ACK_BITS, NettyData.Connection, dataOffset);
int offset = Netty.createNetty(builder, NettyData.Connection, dataOffset);
Netty.finishNettyBuffer(builder, offset);
ReliableUtil.setContent(bb, builder.dataBuffer());
@ -53,4 +64,45 @@ public class ReliableUtilTest {
System.out.printf("%-8s %-5s %04x%n", "CSIZE", builder.dataBuffer().remaining() == ReliableUtil.getContentSize(bb), ReliableUtil.getContentSize(bb));
System.out.printf("%-8s %-5s %s%n", "CONTENT", StringUtils.equals(ByteBufUtil.hexDump(builder.sizedByteArray()), ByteBufUtil.hexDump(ReliableUtil.getContent(bb))), ByteBufUtil.hexDump(ReliableUtil.getContent(bb)));
}
static void testComposite(CompositeByteBuf bb) {
final int PROTOCOL = 0b10010001;
final int SEQ = 0xF0F0;
final int ACK = 0x0F0F;
final int ACK_BITS = 0xFF0000FF;
ByteBuf bbHeader = bb.alloc().buffer();
ReliableUtil.setProtocol(bbHeader, PROTOCOL);
ReliableUtil.setSEQ(bbHeader, SEQ);
ReliableUtil.setACK(bbHeader, ACK);
ReliableUtil.setACK_BITS(bbHeader, ACK_BITS);
bbHeader.writerIndex(ReliableUtil.CONTENT_OFFSET); // hack to force writer position passed header
System.out.println("HEADER: " + ByteBufUtil.hexDump(bbHeader)); // note: hexDump requires writerIndex
FlatBufferBuilder builder = new FlatBufferBuilder();
Connection.startConnection(builder);
int dataOffset = Connection.endConnection(builder);
int offset = Netty.createNetty(builder, NettyData.Connection, dataOffset);
Netty.finishNettyBuffer(builder, offset);
ByteBuf bbContent = bb.alloc().buffer();
bbContent.writeBytes(builder.dataBuffer());
System.out.println("CONTENT: " + ByteBufUtil.hexDump(bbContent)); // note: hexDump requires writerIndex
bb.addComponents(bbHeader, bbContent);
ReliableUtil.setContentSize(bb, bbContent.readableBytes());
bb.writerIndex(ReliableUtil.CONTENT_OFFSET + ReliableUtil.getContentSize(bb)); // hack to force writer position passed content
System.out.println(ByteBufUtil.hexDump(bb)); // note: hexDump requires writerIndex
System.out.printf("%-8s %-5s %02x%n", "PROTOCOL", PROTOCOL == ReliableUtil.getProtocol(bb), ReliableUtil.getProtocol(bb));
System.out.printf("%-8s %-5s %04x%n", "SEQ", SEQ == ReliableUtil.getSEQ(bb), ReliableUtil.getSEQ(bb));
System.out.printf("%-8s %-5s %04x%n", "ACK", ACK == ReliableUtil.getACK(bb), ReliableUtil.getACK(bb));
System.out.printf("%-8s %-5s %08x%n", "ACK_BITS", ACK_BITS == ReliableUtil.getACK_BITS(bb), ReliableUtil.getACK_BITS(bb));
System.out.printf("%-8s %-5s %04x%n", "CSIZE", builder.dataBuffer().remaining() == ReliableUtil.getContentSize(bb), ReliableUtil.getContentSize(bb));
System.out.printf("%-8s %-5s %s%n", "CONTENT", StringUtils.equals(ByteBufUtil.hexDump(builder.sizedByteArray()), ByteBufUtil.hexDump(ReliableUtil.getContent(bb))), ByteBufUtil.hexDump(ReliableUtil.getContent(bb)));
}
}