Committing basic reliable UDP package

This commit is contained in:
Collin Smith 2020-06-23 18:40:59 -07:00
parent 3a79adcaae
commit cc1c43b7e6
18 changed files with 1034 additions and 0 deletions

View File

@ -0,0 +1,31 @@
package com.riiablo.net.reliable;
import com.badlogic.gdx.Gdx;
public class Log {
private Log() {}
public static void error(String tag, String message) {
Gdx.app.error(tag, message);
}
public static void error(String tag, String format, Object... args) {
error(tag, String.format(format, args));
}
public static void log(String tag, String message) {
Gdx.app.log(tag, message);
}
public static void log(String tag, String format, Object... args) {
log(tag, String.format(format, args));
}
public static void debug(String tag, String message) {
Gdx.app.debug(tag, message);
}
public static void debug(String tag, String format, Object... args) {
debug(tag, String.format(format, args));
}
}

View File

@ -0,0 +1,30 @@
package com.riiablo.net.reliable;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
public abstract class MessageChannel implements ReliablePacketController.PacketListener {
protected final PacketTransceiver packetTransceiver;
protected final ReliableConfiguration config;
protected final ReliablePacketController packetController;
protected int sequence;
public MessageChannel(ReliableConfiguration config, PacketTransceiver packetTransceiver) {
this.packetTransceiver = packetTransceiver;
this.config = config;
this.packetController = new ReliablePacketController(config, this);
}
public abstract void reset();
public abstract void update(long time, DatagramChannel ch);
public abstract void sendMessage(int channelId, DatagramChannel ch, ByteBuf bb);
public abstract void onMessageReceived(ChannelHandlerContext ctx, DatagramPacket packet);
public interface PacketTransceiver {
void sendPacket(ByteBuf bb);
void receivePacket(int sequence, ByteBuf bb);
}
}

View File

@ -0,0 +1,4 @@
package com.riiablo.net.reliable;
public interface PacketData {
}

View File

@ -0,0 +1,27 @@
package com.riiablo.net.reliable;
public enum QoS {
/**
* Message is guaranteed to arrive and in order.
*/
Reliable,
/**
* Message is not guaranteed delivery nor order.
*/
Unreliable,
/**
* Message is not guaranteed delivery, but will be in order
*/
UnreliableOrdered;
public static QoS valueOf(int i) {
switch (i) {
case 0: return Reliable;
case 1: return Unreliable;
case 2: return UnreliableOrdered;
default: return null;
}
}
}

View File

@ -0,0 +1,175 @@
package com.riiablo.net.reliable;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.TypeParameterMatcher;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import com.badlogic.gdx.Gdx;
public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHandler, ChannelOutboundHandler {
private static final String TAG = "ReliableChannelHandler";
private static final boolean DEBUG = true;
private static final boolean DEBUG_INBOUND = DEBUG && true;
private static final boolean DEBUG_OUTBOUND = DEBUG && true;
private final TypeParameterMatcher matcher;
private final ReliableEndpoint endpoint;
public ReliableChannelHandler(ReliableEndpoint endpoint) {
this.endpoint = endpoint;
matcher = TypeParameterMatcher.get(DatagramPacket.class);
}
protected boolean accept(Object msg) throws Exception {
return matcher.match(msg);
}
protected void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
InetSocketAddress sender = packet.sender();
Gdx.app.log(TAG, "channelRead0 received packet from " + sender.getHostName() + ":" + sender.getPort());
ByteBuf in = packet.content();
if (DEBUG_INBOUND) Gdx.app.debug(TAG, " " + ByteBufUtil.hexDump(in));
endpoint.messageReceived(ctx, packet);
}
protected Object writeMessage(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
InetSocketAddress receiver = (InetSocketAddress) ctx.channel().remoteAddress();
Gdx.app.log(TAG, "channelWrite0 sending packet to " + receiver.getHostName() + ":" + receiver.getPort());
ByteBuf out = msg;
if (DEBUG_OUTBOUND) Gdx.app.debug(TAG, " " + ByteBufUtil.hexDump(out));
return msg;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "handlerAdded");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "handlerRemoved");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Gdx.app.debug(TAG, "exceptionCaught");
ctx.fireExceptionCaught(cause);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "channelRegistered");
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "channelUnregistered");
ctx.fireChannelUnregistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "channelActive");
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "channelInactive");
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Gdx.app.debug(TAG, "channelRead");
boolean release = true;
try {
if (accept(msg)) {
messageReceived(ctx, (DatagramPacket) msg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (release) ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "channelReadComplete");
ctx.fireChannelReadComplete();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
Gdx.app.debug(TAG, "userEventTriggered");
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "channelWritabilityChanged");
ctx.fireChannelWritabilityChanged();
}
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
Gdx.app.debug(TAG, "bind");
ctx.bind(localAddress, promise);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
Gdx.app.debug(TAG, "connect");
ctx.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
Gdx.app.debug(TAG, "disconnect");
ctx.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
Gdx.app.debug(TAG, "close");
ctx.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
Gdx.app.debug(TAG, "deregister");
ctx.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "read");
ctx.read();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Gdx.app.debug(TAG, "write");
msg = writeMessage(ctx, (ByteBuf) msg);
ctx.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "flush");
ctx.flush();
}
}

View File

@ -0,0 +1,15 @@
package com.riiablo.net.reliable;
public class ReliableConfiguration {
public int maxPacketSize = 16384;
public int fragmentThreshold = 1024;
public int maxFragments = 16;
public int fragmentSize = 1024;
public int sentPacketBufferSize = 256;
public int receivedPacketBufferSize = 256;
public int fragmentReassemblyBufferSize = 64;
public float rttSmoothingFactor = 0.25f;
public float packetLossSmoothingFactor = 0.1f;
public float bandwidthSmoothingFactor = 0.1f;
public int packetHeaderSize = 28;
}

View File

@ -0,0 +1,121 @@
package com.riiablo.net.reliable;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.apache.commons.lang3.Validate;
import com.riiablo.net.reliable.channel.ReliableMessageChannel;
import com.riiablo.net.reliable.channel.UnreliableMessageChannel;
import com.riiablo.net.reliable.channel.UnreliableOrderedMessageChannel;
import com.riiablo.util.EnumIntMap;
public class ReliableEndpoint implements MessageChannel.PacketTransceiver {
private static final String TAG = "ReliableEndpoint";
private static final boolean DEBUG = true;
private static final boolean DEBUG_QOS = DEBUG && true;
private static final boolean DEBUG_CHANNEL = DEBUG && true;
private static final boolean DEBUG_SEND = DEBUG && true;
private static final boolean DEBUG_RECEIVE = DEBUG && true;
private final DatagramChannel channel;
private final PacketProcessor packetProcessor;
private final EnumIntMap<QoS> defaultChannels;
private final MessageChannel[] channels;
public ReliableEndpoint(DatagramChannel channel, PacketProcessor packetProcessor) {
this.channel = channel;
this.packetProcessor = packetProcessor;
// for my purposes 3 works, channelId can be up to 255 though
channels = new MessageChannel[3];
channels[QoS.Reliable.ordinal()] = new ReliableMessageChannel(this);
channels[QoS.Unreliable.ordinal()] = new UnreliableMessageChannel(this);
channels[QoS.UnreliableOrdered.ordinal()] = new UnreliableOrderedMessageChannel(this);
defaultChannels = new EnumIntMap<>(QoS.class, -1);
for (QoS qos : QoS.values()) {
defaultChannels.put(qos, qos.ordinal());
}
}
public InetSocketAddress remoteAddress() {
return channel.remoteAddress();
}
public void reset() {
for (MessageChannel mc : channels) if (mc != null) mc.reset();
}
public void update(long time) {
for (MessageChannel mc : channels) if (mc != null) mc.update(time, channel);
}
public void sendMessage(QoS qos, ByteBuffer bb) {
if (DEBUG_SEND) Log.debug(TAG, "sendMessage");
if (DEBUG_QOS) Log.debug(TAG, "sending message with %s QoS (0x%02x)", qos, qos.ordinal());
int channelId = defaultChannels.get(qos);
sendMessage(channelId, bb);
}
public void sendMessage(int channelId, ByteBuffer bb) {
if (DEBUG_SEND) Log.debug(TAG, "sendMessage");
Validate.inclusiveBetween(0x00, 0xFF, channelId, "channelId must fit within a ubyte");
if (DEBUG_CHANNEL) Log.debug(TAG, "sending message with on channel %d", channelId);
MessageChannel mc = channels[channelId];
mc.sendMessage(channelId, channel, Unpooled.wrappedBuffer(bb)); // automatically released
}
public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onMessageReceived");
int channelId = Packet.getChannelId(packet.content());
if (DEBUG_QOS) {
QoS qos = QoS.valueOf(channelId);
if (qos != null) {
Log.debug(TAG, "received message with %s QoS (0x%02x)", qos, channelId);
} else {
Log.debug(TAG, "received message with channel %d", channelId);
}
} else if (DEBUG_CHANNEL) {
Log.debug(TAG, "received message with channel %d", channelId);
}
MessageChannel mc = channels[channelId];
mc.onMessageReceived(ctx, packet);
}
@Override
public void sendPacket(ByteBuf bb) {
}
@Override
public void receivePacket(int sequence, ByteBuf bb) {
packetProcessor.processPacket(sequence, bb);
}
public interface PacketProcessor {
void processPacket(int sequence, ByteBuf bb);
}
public static final Stats stats = new Stats();
public static class Stats {
public int NUM_PACKETS_SENT;
public int NUM_PACKETS_RECEIVED;
public int NUM_PACKETS_ACKED;
public int NUM_PACKETS_STALE;
public int NUM_PACKETS_INVALID;
public int NUM_PACKETS_TOO_LARGE_TO_SEND;
public int NUM_PACKETS_TOO_LARGE_TO_RECEIVE;
public int NUM_FRAGMENTS_SENT;
public int NUM_FRAGMENTS_RECEIVED;
public int NUM_FRAGMENTS_INVALID;
}
}

View File

@ -0,0 +1,149 @@
package com.riiablo.net.reliable;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import com.riiablo.net.reliable.data.FragmentReassemblyData;
import com.riiablo.net.reliable.data.ReceivedPacketData;
import com.riiablo.net.reliable.data.SentPacketData;
public class ReliablePacketController {
private static final String TAG = "ReliablePacketController";
private static final boolean DEBUG = true;
private static final boolean DEBUG_SEND = DEBUG && true;
private static final boolean DEBUG_RECEIVE = DEBUG && true;
private final ReliableConfiguration config;
private final MessageChannel channel;
private final SequenceBuffer<SentPacketData> sentPackets;
private final SequenceBuffer<ReceivedPacketData> receivedPackets;
private final SequenceBuffer<FragmentReassemblyData> fragmentReassembly;
public ReliablePacketController(ReliableConfiguration config, MessageChannel channel) {
this.config = config;
this.channel = channel;
this.sentPackets = new SequenceBuffer<>(SentPacketData.class, config.sentPacketBufferSize);
this.receivedPackets = new SequenceBuffer<>(ReceivedPacketData.class, config.receivedPacketBufferSize);
this.fragmentReassembly = new SequenceBuffer<>(FragmentReassemblyData.class, config.fragmentReassemblyBufferSize);
}
public int nextSequence() {
return channel.sequence;
}
private int incSequence() {
return channel.sequence = (channel.sequence + 1) & Packet.USHORT_MAX_VALUE;
}
public void reset() {
}
public void update(long time) {
}
public void sendAck(int channelId, DatagramChannel ch) {
}
public int sendPacket(int channelId, DatagramChannel ch, ByteBuf bb) {
if (DEBUG_SEND) Log.debug(TAG, "sendPacket " + bb);
final int packetSize = bb.readableBytes();
if (packetSize > config.maxPacketSize) {
Log.error(TAG, "packet is too large to send (%d bytes), max packet size is %d bytes", packetSize, config.maxPacketSize);
ReliableEndpoint.stats.NUM_PACKETS_TOO_LARGE_TO_SEND++;
return -1;
}
final int sequence = incSequence();
int ack, ackBits;
synchronized (receivedPackets) {
ack = receivedPackets.generateAck();
ackBits = receivedPackets.generateAckBits(ack);
}
SentPacketData sentPacketData = sentPackets.insert(sequence);
// sentPacketData.time = this.time;
// sentPacketData.packetSize =
sentPacketData.acked = false;
if (packetSize <= config.fragmentThreshold) {
// regular packet
ByteBuf header = ch.alloc().buffer(config.packetHeaderSize);
int headerSize = Packet.writePacketHeader(header, channelId, sequence, ack, ackBits);
ByteBuf composite = ch.alloc().compositeBuffer(2)
.addComponent(true, header)
.addComponent(true, bb);
ch.writeAndFlush(composite);
return headerSize;
} else {
// fragmented packet
throw new UnsupportedOperationException();
// return -1;
}
}
public void onPacketReceived(ChannelHandlerContext ctx, DatagramPacket packet) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketReceived " + packet);
final ByteBuf bb = packet.content();
final int packetSize = bb.readableBytes();
if (packetSize > config.maxPacketSize) {
Log.error(TAG, "packet is too large to receive (%d bytes), max packet size is %d bytes", packetSize, config.maxPacketSize);
ReliableEndpoint.stats.NUM_PACKETS_TOO_LARGE_TO_RECEIVE++;
return;
}
final byte flags = Packet.getFlags(bb);
if (!Packet.isFragmented(flags)) {
// regular packet
ReliableEndpoint.stats.NUM_PACKETS_RECEIVED++;
Packet.HeaderData headerData = null;
try {
headerData = Packet.obtainData();
int headerSize = Packet.readPacketHeader(config, bb, headerData);
if (headerSize == -1) {
Log.error(TAG, "ignoring invalid packet. could not read packet header");
ReliableEndpoint.stats.NUM_PACKETS_INVALID++;
return;
}
final int sequence = headerData.sequence;
if (!receivedPackets.testInsert(sequence)) {
Log.error(TAG, "ignoring stale packet %d", sequence);
ReliableEndpoint.stats.NUM_PACKETS_STALE++;
return;
}
if (DEBUG_RECEIVE) Log.debug(TAG, "processing packet %d", sequence);
ByteBuf slice = bb.readSlice(bb.readableBytes());
channel.onPacketProcessed(sequence, slice);
// TODO...
} finally {
if (headerData != null) headerData.free();
}
} else {
// fragmented packet
throw new UnsupportedOperationException();
}
}
public interface PacketListener {
void onPacketTransmitted(ByteBuf bb);
void onPacketProcessed(int sequence, ByteBuf bb);
}
}

View File

@ -0,0 +1,14 @@
package com.riiablo.net.reliable;
public class ReliableUtils {
private ReliableUtils() {}
public static boolean sequenceGreaterThan(int s1, int s2) {
return ((s1 > s2) && (s1 - s2 <= Short.MAX_VALUE))
|| ((s1 < s2) && (s2 - s1 > Short.MAX_VALUE));
}
public static boolean sequenceLessThan(int s1, int s2) {
return sequenceGreaterThan(s2, s1);
}
}

View File

@ -0,0 +1,107 @@
package com.riiablo.net.reliable;
import java.util.Arrays;
import org.apache.commons.lang3.exception.ExceptionUtils;
public class SequenceBuffer<T> {
public static final int INVALID_SEQUENCE = -1; // 0xFFFFFFFF
private int sequence;
private final int numEntries;
private final int entrySequence[];
private final Object entryData[];
public SequenceBuffer(Class<T> dataContainer, int bufferSize) {
numEntries = bufferSize;
entrySequence = new int[numEntries];
Arrays.fill(entrySequence, INVALID_SEQUENCE);
entryData = new Object[numEntries];
try {
for (int i = 0; i < numEntries; i++) entryData[i] = dataContainer.newInstance();
} catch (Throwable t) {
ExceptionUtils.wrapAndThrow(t);
}
sequence = 0;
}
public void reset() {
sequence = 0;
Arrays.fill(entrySequence, INVALID_SEQUENCE);
}
public void removeEntries(int startSequence, int endSequence) {
startSequence &= Packet.USHORT_MAX_VALUE;
endSequence &= Packet.USHORT_MAX_VALUE;
if (endSequence < startSequence) {
Arrays.fill(entrySequence, startSequence, numEntries, INVALID_SEQUENCE);
Arrays.fill(entrySequence, 0, endSequence, INVALID_SEQUENCE);
} else {
Arrays.fill(entrySequence, startSequence, endSequence, INVALID_SEQUENCE);
}
}
public boolean testInsert(int sequence) {
sequence &= Packet.USHORT_MAX_VALUE;
return !ReliableUtils.sequenceLessThan(sequence, (this.sequence - numEntries) & Packet.USHORT_MAX_VALUE);
}
@SuppressWarnings("unchecked")
public T insert(int sequence) {
sequence &= Packet.USHORT_MAX_VALUE;
if (ReliableUtils.sequenceLessThan(sequence, (this.sequence - numEntries) & Packet.USHORT_MAX_VALUE)) {
return null;
}
final int nextSequence = (sequence + 1) & Packet.USHORT_MAX_VALUE;
if (ReliableUtils.sequenceGreaterThan(nextSequence, this.sequence)) {
removeEntries(this.sequence, sequence);
this.sequence = nextSequence;
}
int index = sequence % numEntries;
entrySequence[index] = sequence;
return (T) entryData[index];
}
public void remove(int sequence) {
sequence &= Packet.USHORT_MAX_VALUE;
entrySequence[sequence % numEntries] = INVALID_SEQUENCE;
}
public boolean available(int sequence) {
sequence &= Packet.USHORT_MAX_VALUE;
return entrySequence[sequence % numEntries] == INVALID_SEQUENCE;
}
public boolean exists(int sequence) {
sequence &= Packet.USHORT_MAX_VALUE;
return entrySequence[sequence % numEntries] == sequence;
}
@SuppressWarnings("unchecked")
public T find(int sequence) {
sequence &= Packet.USHORT_MAX_VALUE;
int index = sequence % numEntries;
return entrySequence[index] == sequence ? (T) entryData[index] : null;
}
@SuppressWarnings("unchecked")
public T atIndex(int index) {
return entrySequence[index] != INVALID_SEQUENCE ? (T) entryData[index] : null;
}
public int generateAck() {
return (sequence - 1) & Packet.USHORT_MAX_VALUE;
}
public int generateAckBits(int ack) {
ack &= Packet.USHORT_MAX_VALUE;
int ackBits = 0;
for (int i = 0, mask = 1; i < Integer.SIZE; i++, mask <<= 1) {
if (exists(ack - i)) ackBits |= mask;
}
return ackBits;
}
}

View File

@ -0,0 +1,93 @@
package com.riiablo.net.reliable;
import com.google.flatbuffers.FlatBufferBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.net.InetSocketAddress;
import com.badlogic.gdx.Application;
import com.badlogic.gdx.ApplicationAdapter;
import com.badlogic.gdx.Gdx;
import com.badlogic.gdx.backends.headless.HeadlessApplication;
import com.badlogic.gdx.backends.headless.HeadlessApplicationConfiguration;
import com.riiablo.codec.Animation;
import com.riiablo.net.packet.netty.Connection;
import com.riiablo.net.packet.netty.Netty;
import com.riiablo.net.packet.netty.NettyData;
public class TestClient extends ApplicationAdapter implements ReliableEndpoint.PacketProcessor {
private static final String TAG = "Client";
public static void main(String[] args) throws Exception {
Thread.sleep(1000);
HeadlessApplicationConfiguration config = new HeadlessApplicationConfiguration();
config.renderInterval = Animation.FRAME_DURATION;
new HeadlessApplication(new TestClient(), config);
}
private ReliableEndpoint endpoint;
@Override
public void create() {
Gdx.app.setLogLevel(Application.LOG_DEBUG);
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap()
.group(group)
.channel(NioDatagramChannel.class)
.handler(new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(DatagramChannel ch) {
endpoint = new ReliableEndpoint(ch, TestClient.this);
ch.pipeline()
.addLast(new ReliableChannelHandler(endpoint))
.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
init();
ctx.pipeline().remove(this);
}
void init() {
InetSocketAddress remoteAddress = endpoint.remoteAddress();
Gdx.app.log(TAG, "Sending Connection packet to " + remoteAddress.getHostString() + ":" + remoteAddress.getPort());
FlatBufferBuilder builder = new FlatBufferBuilder();
Connection.startConnection(builder);
int dataOffset = Connection.endConnection(builder);
int offset = Netty.createNetty(builder, NettyData.Connection, dataOffset);
Netty.finishNettyBuffer(builder, offset);
endpoint.sendMessage(QoS.Unreliable, builder.dataBuffer());
}
})
;
}
});
ChannelFuture f = b.connect("localhost", TestServer.PORT);
f.channel().closeFuture().sync();
} catch (Throwable t) {
Gdx.app.error(TAG, t.getMessage(), t);
} finally {
group.shutdownGracefully();
}
}
@Override
public void processPacket(int sequence, ByteBuf bb) {
Gdx.app.debug(TAG, "Processing packet...");
Gdx.app.log(TAG, ByteBufUtil.hexDump(bb));
}
}

View File

@ -0,0 +1,81 @@
package com.riiablo.net.reliable;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.nio.ByteBuffer;
import com.badlogic.gdx.Application;
import com.badlogic.gdx.ApplicationAdapter;
import com.badlogic.gdx.Gdx;
import com.badlogic.gdx.backends.headless.HeadlessApplication;
import com.badlogic.gdx.backends.headless.HeadlessApplicationConfiguration;
import com.riiablo.codec.Animation;
import com.riiablo.net.packet.netty.Netty;
import com.riiablo.net.packet.netty.NettyData;
public class TestServer extends ApplicationAdapter implements ReliableEndpoint.PacketProcessor {
private static final String TAG = "Server";
static final int PORT = 6114;
public static void main(String[] args) {
HeadlessApplicationConfiguration config = new HeadlessApplicationConfiguration();
config.renderInterval = Animation.FRAME_DURATION;
new HeadlessApplication(new TestServer(), config);
}
private ReliableEndpoint endpoint;
@Override
public void create() {
Gdx.app.setLogLevel(Application.LOG_DEBUG);
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap()
.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(DatagramChannel ch) {
endpoint = new ReliableEndpoint(ch, TestServer.this);
ch.pipeline()
.addLast(new ReliableChannelHandler(endpoint))
;
}
})
;
ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
} catch (Throwable t) {
Gdx.app.error(TAG, t.getMessage(), t);
} finally {
group.shutdownGracefully();
}
}
@Override
public void processPacket(int sequence, ByteBuf bb) {
Gdx.app.debug(TAG, "Processing packet...");
Gdx.app.log(TAG, ByteBufUtil.hexDump(bb));
ByteBuffer nioBuffer = bb.nioBuffer();
Netty netty = Netty.getRootAsNetty(nioBuffer);
byte dataType = netty.dataType();
if (0 <= dataType && dataType < NettyData.names.length) {
Gdx.app.debug(TAG, "dataType=" + NettyData.name(dataType));
}
}
}

View File

@ -0,0 +1,48 @@
package com.riiablo.net.reliable.channel;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import com.riiablo.net.reliable.MessageChannel;
import com.riiablo.net.reliable.ReliableConfiguration;
public class ReliableMessageChannel extends MessageChannel {
private static final String TAG = "ReliableMessageChannel";
public ReliableMessageChannel(PacketTransceiver packetTransceiver) {
super(new ReliableConfiguration(), packetTransceiver);
}
@Override
public void reset() {
}
@Override
public void update(long time, DatagramChannel ch) {
}
@Override
public void sendMessage(int channelId, DatagramChannel ch, ByteBuf bb) {
}
@Override
public void onMessageReceived(ChannelHandlerContext ctx, DatagramPacket packet) {
}
@Override
public void onPacketTransmitted(ByteBuf bb) {
}
@Override
public void onPacketProcessed(int sequence, ByteBuf bb) {
}
}

View File

@ -0,0 +1,54 @@
package com.riiablo.net.reliable.channel;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import com.riiablo.net.reliable.Log;
import com.riiablo.net.reliable.MessageChannel;
import com.riiablo.net.reliable.ReliableConfiguration;
public class UnreliableMessageChannel extends MessageChannel {
private static final String TAG = "UnreliableMessageChannel";
private static final boolean DEBUG = true;
private static final boolean DEBUG_SEND = DEBUG && true;
private static final boolean DEBUG_RECEIVE = DEBUG && true;
public UnreliableMessageChannel(PacketTransceiver packetTransceiver) {
super(new ReliableConfiguration(), packetTransceiver);
}
@Override
public void onPacketTransmitted(ByteBuf bb) {
}
@Override
public void onPacketProcessed(int sequence, ByteBuf bb) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + bb);
packetTransceiver.receivePacket(sequence, bb);
}
@Override
public void reset() {
packetController.reset();
}
@Override
public void update(long time, DatagramChannel ch) {
packetController.update(time);
}
@Override
public void sendMessage(int channelId, DatagramChannel ch, ByteBuf bb) {
if (DEBUG_SEND) Log.debug(TAG, "sendMessage " + bb);
packetController.sendPacket(channelId, ch, bb);
}
@Override
public void onMessageReceived(ChannelHandlerContext ctx, DatagramPacket packet) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onMessageReceived " + packet);
packetController.onPacketReceived(ctx, packet);
}
}

View File

@ -0,0 +1,54 @@
package com.riiablo.net.reliable.channel;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import com.riiablo.net.reliable.Log;
import com.riiablo.net.reliable.MessageChannel;
import com.riiablo.net.reliable.ReliableConfiguration;
public class UnreliableOrderedMessageChannel extends MessageChannel {
private static final String TAG = "UnreliableOrderedMessageChannel";
private static final boolean DEBUG = true;
private static final boolean DEBUG_SEND = DEBUG && true;
private static final boolean DEBUG_RECEIVE = DEBUG && true;
public UnreliableOrderedMessageChannel(PacketTransceiver packetTransceiver) {
super(new ReliableConfiguration(), packetTransceiver);
}
@Override
public void reset() {
}
@Override
public void update(long time, DatagramChannel ch) {
}
@Override
public void sendMessage(int channelId, DatagramChannel ch, ByteBuf bb) {
if (DEBUG_SEND) Log.debug(TAG, "sendMessage " + bb);
packetController.sendPacket(channelId, ch, bb);
}
@Override
public void onMessageReceived(ChannelHandlerContext ctx, DatagramPacket packet) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onMessageReceived " + packet);
packetController.onPacketReceived(ctx, packet);
}
@Override
public void onPacketTransmitted(ByteBuf bb) {
}
@Override
public void onPacketProcessed(int sequence, ByteBuf bb) {
}
}

View File

@ -0,0 +1,18 @@
package com.riiablo.net.reliable.data;
import io.netty.buffer.ByteBuf;
import com.artemis.utils.BitVector;
public class FragmentReassemblyData {
public int sequence;
public int ack;
public int ackBits;
public int numFragmentsReceived;
public int numFragmentsTotal;
public ByteBuf dataBuffer;
public int packetBytes;
public int headerOffset;
public final BitVector fragmentReceived = new BitVector(256);
}

View File

@ -0,0 +1,6 @@
package com.riiablo.net.reliable.data;
public class ReceivedPacketData {
public long time;
public int packetSize;
}

View File

@ -0,0 +1,7 @@
package com.riiablo.net.reliable.data;
public class SentPacketData {
public long time;
public boolean acked;
public int packetSize;
}