Moved netty netcode from :server:netty to :core module

This commit is contained in:
Collin Smith
2020-06-25 10:08:00 -07:00
parent fbdf50f646
commit 597ae213c5
22 changed files with 4 additions and 4 deletions

View File

@ -1,9 +0,0 @@
package com.riiablo.net;
import io.netty.channel.ChannelHandlerContext;
public interface Endpoint<T> extends PacketSender<Object> {
void reset();
void update(float delta);
void messageReceived(ChannelHandlerContext ctx, T msg);
}

View File

@ -1,182 +0,0 @@
package com.riiablo.net;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.TypeParameterMatcher;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import com.badlogic.gdx.Gdx;
public class EndpointedChannelHandler<T> implements ChannelHandler, ChannelInboundHandler, ChannelOutboundHandler {
private static final String TAG = "EndpointedChannelHandler";
private static final boolean DEBUG = true;
private static final boolean DEBUG_CALLS = DEBUG && true;
private static final boolean DEBUG_INBOUND = DEBUG && true;
private static final boolean DEBUG_OUTBOUND = DEBUG && true;
private final TypeParameterMatcher matcher;
private final Endpoint<T> endpoint;
public EndpointedChannelHandler(Class<T> packetType, Endpoint<T> endpoint) {
this.endpoint = endpoint;
matcher = TypeParameterMatcher.get(packetType);
}
protected boolean accept(Object msg) throws Exception {
return matcher.match(msg);
}
protected void messageReceived(ChannelHandlerContext ctx, T msg) throws Exception {
if (DEBUG_CALLS) {
InetSocketAddress sender = msg instanceof DatagramPacket
? ((DatagramPacket) msg).sender()
: (InetSocketAddress) ctx.channel().remoteAddress();
Gdx.app.log(TAG, "messageReceived received packet from " + sender.getHostName() + ":" + sender.getPort());
}
endpoint.messageReceived(ctx, msg);
}
protected Object writeMessage(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
if (DEBUG_CALLS) {
InetSocketAddress receiver = (InetSocketAddress) ctx.channel().remoteAddress();
Gdx.app.log(TAG, "writeMessage sending packet to " + receiver.getHostName() + ":" + receiver.getPort());
}
ByteBuf out = msg;
if (DEBUG_OUTBOUND) Gdx.app.debug(TAG, " " + ByteBufUtil.hexDump(out));
return msg;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "handlerAdded");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "handlerRemoved");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "exceptionCaught");
ctx.fireExceptionCaught(cause);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "channelRegistered");
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "channelUnregistered");
ctx.fireChannelUnregistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "channelActive");
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "channelInactive");
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "channelRead");
boolean release = true;
try {
if (accept(msg)) {
@SuppressWarnings("unchecked")
T castedMsg = (T) msg;
messageReceived(ctx, castedMsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (release) ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "channelReadComplete");
ctx.fireChannelReadComplete();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "userEventTriggered");
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "channelWritabilityChanged");
ctx.fireChannelWritabilityChanged();
}
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "bind");
ctx.bind(localAddress, promise);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "connect");
ctx.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "disconnect");
ctx.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "close");
ctx.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "deregister");
ctx.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "read");
ctx.read();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "write");
msg = writeMessage(ctx, (ByteBuf) msg);
ctx.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "flush");
ctx.flush();
}
}

View File

@ -1,7 +0,0 @@
package com.riiablo.net;
import io.netty.buffer.ByteBuf;
public interface PacketProcessor {
void processPacket(ByteBuf bb);
}

View File

@ -1,10 +0,0 @@
package com.riiablo.net;
import io.netty.channel.Channel;
import java.nio.ByteBuffer;
public interface PacketSender<T> {
Channel channel();
void sendMessage(ByteBuffer bb);
void sendMessage(T qos, ByteBuffer bb);
}

View File

@ -1,31 +0,0 @@
package com.riiablo.net.reliable;
import com.badlogic.gdx.Gdx;
public class Log {
private Log() {}
public static void error(String tag, String message) {
Gdx.app.error(tag, message);
}
public static void error(String tag, String format, Object... args) {
error(tag, String.format(format, args));
}
public static void log(String tag, String message) {
Gdx.app.log(tag, message);
}
public static void log(String tag, String format, Object... args) {
log(tag, String.format(format, args));
}
public static void debug(String tag, String message) {
Gdx.app.debug(tag, message);
}
public static void debug(String tag, String format, Object... args) {
debug(tag, String.format(format, args));
}
}

View File

@ -1,42 +0,0 @@
package com.riiablo.net.reliable;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
public abstract class MessageChannel implements ReliablePacketController.PacketListener {
protected final PacketTransceiver packetTransceiver;
protected final ReliableConfiguration config;
protected final ReliablePacketController packetController;
protected int sequence;
public MessageChannel(ReliableConfiguration config, PacketTransceiver packetTransceiver) {
this.packetTransceiver = packetTransceiver;
this.config = config;
this.packetController = new ReliablePacketController(config, this);
}
public int nextSequence() {
return sequence;
}
protected int incSequence() {
return sequence = (sequence + 1) & Packet.USHORT_MAX_VALUE;
}
protected ReliablePacketController controller() {
return packetController;
}
public abstract void reset();
public abstract void update(float delta, int channelId, DatagramChannel ch);
public abstract void sendMessage(int channelId, DatagramChannel ch, ByteBuf bb);
public abstract void onMessageReceived(ChannelHandlerContext ctx, DatagramPacket packet);
public interface PacketTransceiver {
void sendPacket(ByteBuf bb);
void receivePacket(ByteBuf bb);
}
}

View File

@ -1,309 +0,0 @@
package com.riiablo.net.reliable;
import io.netty.buffer.ByteBuf;
import org.apache.commons.lang3.builder.ToStringBuilder;
import com.badlogic.gdx.utils.Pool;
import com.badlogic.gdx.utils.Pools;
public class Packet {
private static final String TAG = "Packet";
public static final int USHORT_MAX_VALUE = 0xFFFF;
public static final int MAX_PACKET_HEADER_SIZE = 10;
public static final int FRAGMENT_HEADER_SIZE = 6;
static final int SINGLE = 0;
static final int FRAGMENTED = 1 << 0;
static final int ACK_BYTE0 = 1 << 1;
static final int ACK_BYTE1 = 1 << 2;
static final int ACK_BYTE2 = 1 << 3;
static final int ACK_BYTE3 = 1 << 4;
static final int SEQ_DIFF = 1 << 5;
static final int ACK = 1 << 7;
static final int TYPE_MASK = SINGLE | FRAGMENTED;
static final int ACK_BYTE0_MASK = 0x000000FF;
static final int ACK_BYTE1_MASK = 0x0000FF00;
static final int ACK_BYTE2_MASK = 0x00FF0000;
static final int ACK_BYTE3_MASK = 0xFF000000;
private static final int FLAGS_OFFSET = 0;
private static final int CHANNEL_OFFSET = 1;
public static byte getFlags(ByteBuf bb) {
return bb.getByte(FLAGS_OFFSET);
}
public static int getChannelId(ByteBuf bb) {
return bb.getUnsignedByte(CHANNEL_OFFSET);
}
private Packet() {}
public static boolean isFragmented(byte flags) {
return (flags & TYPE_MASK) == FRAGMENTED;
}
public static boolean isAck(byte flags) {
return (flags & ACK) == ACK;
}
private static int getAckBitsFlags(int ackBits, int prefixByte) {
if ((ackBits & ACK_BYTE0_MASK) != ACK_BYTE0_MASK) prefixByte |= ACK_BYTE0;
if ((ackBits & ACK_BYTE1_MASK) != ACK_BYTE1_MASK) prefixByte |= ACK_BYTE1;
if ((ackBits & ACK_BYTE2_MASK) != ACK_BYTE2_MASK) prefixByte |= ACK_BYTE2;
if ((ackBits & ACK_BYTE3_MASK) != ACK_BYTE3_MASK) prefixByte |= ACK_BYTE3;
return prefixByte;
}
private static void writeAckBitsFlags(ByteBuf bb, int ackBits) {
if ((ackBits & ACK_BYTE0_MASK) != ACK_BYTE0_MASK) bb.writeByte(ackBits);
if ((ackBits & ACK_BYTE1_MASK) != ACK_BYTE1_MASK) bb.writeByte(ackBits >> 8);
if ((ackBits & ACK_BYTE2_MASK) != ACK_BYTE2_MASK) bb.writeByte(ackBits >> 16);
if ((ackBits & ACK_BYTE3_MASK) != ACK_BYTE3_MASK) bb.writeByte(ackBits >> 24);
}
public static int writeAck(ByteBuf bb, int channelId, int ack, int ackBits) {
int startIndex = bb.writerIndex();
final int flags = getAckBitsFlags(ackBits, ACK);
bb.writeByte(flags);
bb.writeByte(channelId);
bb.writeShortLE(ack);
writeAckBitsFlags(bb, ackBits);
return bb.writerIndex() - startIndex;
}
public static int writePacketHeader(ByteBuf bb, int channelId, int sequence, int ack, int ackBits) {
int startIndex = bb.writerIndex();
int flags = getAckBitsFlags(ackBits, SINGLE);
int sequenceDiff = sequence - ack;
if (sequenceDiff < 0) sequenceDiff += USHORT_MAX_VALUE;
if (sequenceDiff <= 0xFF) flags |= SEQ_DIFF;
bb.writeByte(flags);
bb.writeByte(channelId);
bb.writeShortLE(sequence);
if (sequenceDiff <= 0xFF) {
bb.writeByte(sequenceDiff);
} else {
bb.writeShortLE(ack);
}
writeAckBitsFlags(bb, ackBits);
return bb.writerIndex() - startIndex;
}
public static int readPacketHeader(ReliableConfiguration config, ByteBuf bb, HeaderData out) {
assert out != null;
int startIndex = bb.readerIndex();
if (bb.readableBytes() < 4) {
Log.error(TAG, "buffer too small for packet header (1)");
return out.headerSize = -1;
}
final byte flags = out.flags = bb.readByte();
if ((flags & TYPE_MASK) != SINGLE) {
Log.error(TAG, "packet header not flagged as single packet");
return out.headerSize = -1;
}
out.channelId = bb.readUnsignedByte();
out.sequence = (flags & ACK) == ACK ? 0 : bb.readUnsignedShortLE(); // ACK doesn't have sequence
if ((flags & SEQ_DIFF) == SEQ_DIFF) {
if (bb.readableBytes() < 1) {
Log.error(TAG, "buffer too small for packet header (2)");
return out.headerSize = -1;
}
int sequenceDiff = bb.readUnsignedByte();
out.ack = (out.sequence - sequenceDiff) & USHORT_MAX_VALUE;
} else {
if (bb.readableBytes() < 2) {
Log.error(TAG, "buffer too small for packet header (3)");
return out.headerSize = -1;
}
out.ack = bb.readUnsignedShortLE();
}
int expectedBytes = 0;
for (int i = ACK_BYTE0; i <= ACK_BYTE3; i <<= 1) {
if ((flags & i) == i) expectedBytes++;
}
if (bb.readableBytes() < expectedBytes) {
Log.error(TAG, "buffer too small for packet header (4)");
return out.headerSize = -1;
}
int ackBits = 0xFFFFFFFF;
if ((flags & ACK_BYTE0) == ACK_BYTE0) {
ackBits &= ~ACK_BYTE0_MASK;
ackBits |= bb.readByte();
}
if ((flags & ACK_BYTE1) == ACK_BYTE1) {
ackBits &= ~ACK_BYTE1_MASK;
ackBits |= (bb.readByte() << 8);
}
if ((flags & ACK_BYTE2) == ACK_BYTE2) {
ackBits &= ~ACK_BYTE2_MASK;
ackBits |= (bb.readByte() << 16);
}
if ((flags & ACK_BYTE3) == ACK_BYTE3) {
ackBits &= ~ACK_BYTE3_MASK;
ackBits |= (bb.readByte() << 24);
}
out.ackBits = ackBits;
return out.headerSize = bb.readerIndex() - startIndex;
}
public static int readFragmentHeader(ReliableConfiguration config, ByteBuf bb, FragmentedHeaderData out) {
assert out != null;
int startIndex = bb.readerIndex();
if (bb.readableBytes() < FRAGMENT_HEADER_SIZE) {
Log.error(TAG, "buffer too small for fragment header");
return out.headerSize = -1;
}
final byte flags = out.flags = bb.readByte();
if ((flags & TYPE_MASK) != FRAGMENTED) {
Log.error(TAG, "packet header not flagged as fragmented packet");
return out.headerSize = -1;
}
final int channelId = out.channelId = bb.readUnsignedByte();
final int sequence = out.sequence = bb.readUnsignedShortLE();
final int fragmentId = out.fragmentId = bb.readUnsignedByte();
final int numFragments = out.numFragments = bb.readUnsignedByte();
if (numFragments > config.maxFragments) {
Log.error(TAG, "num fragments %d outside of range of max fragments %d", numFragments, config.maxFragments);
return out.headerSize = -1;
}
if (fragmentId >= numFragments) {
Log.error(TAG, "fragment id %d outside of range of num fragments %d", fragmentId, numFragments);
return out.headerSize = -1;
}
if (fragmentId == 0) {
if (bb.readableBytes() < 1) {
Log.error(TAG, "buffer too small for packet header");
return out.headerSize = -1;
}
final HeaderData packetHeader = out.header;
int headerSize = readPacketHeader(config, bb, packetHeader);
if (headerSize < 0) {
Log.error(TAG, "bad packet header in fragment");
return out.headerSize = -1;
}
if (packetHeader.sequence != sequence) {
Log.error(TAG, "bad packet sequence in fragment. expected %d, got %d", sequence, packetHeader.sequence);
return out.headerSize = -1;
}
out.ack = packetHeader.ack;
out.ackBits = packetHeader.ackBits;
} else {
out.ack = 0;
out.ackBits = 0;
}
final int fragmentSize = out.fragmentSize = bb.readableBytes();
if (fragmentSize > config.fragmentSize) {
Log.error(TAG, "fragment bytes %d > fragment size %d", fragmentSize, config.fragmentSize);
return out.headerSize = -1;
}
if (fragmentId != numFragments - 1 && fragmentSize != config.fragmentSize) {
Log.error(TAG, "fragment %d is %d bytes, which is not the expected fragment size %d bytes", fragmentId, fragmentSize, config.fragmentSize);
return out.headerSize = -1;
}
return out.headerSize = bb.readerIndex() - startIndex;
}
private static final Pool<HeaderData> HEADER_DATA_POOL = Pools.get(HeaderData.class);
public static HeaderData obtainData() {
return HEADER_DATA_POOL.obtain();
}
public static class HeaderData {
public byte flags;
public int channelId;
public int sequence;
public int ack;
public int ackBits;
public int headerSize;
public void free() {
HEADER_DATA_POOL.free(this);
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("flags", String.format("%02x", flags))
.append("channelId", channelId)
.append("sequence", sequence)
.append("ack", ack)
.append("ackBits", String.format("%08x", ackBits))
.append("headerSize", headerSize)
.build();
}
}
private static final Pool<FragmentedHeaderData> FRAGMENTED_HEADER_DATA_POOL = Pools.get(FragmentedHeaderData.class);
public static FragmentedHeaderData obtainFragmentedData() {
return FRAGMENTED_HEADER_DATA_POOL.obtain();
}
public static class FragmentedHeaderData {
public byte flags;
public int channelId;
public int sequence;
public int ack;
public int ackBits;
public int headerSize;
public int fragmentId;
public int numFragments;
public int fragmentSize;
public final HeaderData header = new HeaderData();
public void free() {
FRAGMENTED_HEADER_DATA_POOL.free(this);
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("flags", String.format("%02x", flags))
.append("channelId", channelId)
.append("sequence", sequence)
.append("ack", ack)
.append("ackBits", String.format("%08x", ackBits))
.append("headerSize", headerSize)
.append("fragmentId", fragmentId)
.append("numFragments", numFragments)
.append("fragmentSize", fragmentSize)
.append("header", header)
.build();
}
}
}

View File

@ -1,27 +0,0 @@
package com.riiablo.net.reliable;
public enum QoS {
/**
* Message is guaranteed to arrive and in order.
*/
Reliable,
/**
* Message is not guaranteed delivery nor order.
*/
Unreliable,
/**
* Message is not guaranteed delivery, but will be in order
*/
UnreliableOrdered;
public static QoS valueOf(int i) {
switch (i) {
case 0: return Reliable;
case 1: return Unreliable;
case 2: return UnreliableOrdered;
default: return null;
}
}
}

View File

@ -1,182 +0,0 @@
package com.riiablo.net.reliable;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.TypeParameterMatcher;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import com.badlogic.gdx.Gdx;
import com.riiablo.net.EndpointedChannelHandler;
/**
* Replaced by {@link EndpointedChannelHandler}
* @see EndpointedChannelHandler
*/
@Deprecated
public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHandler, ChannelOutboundHandler {
private static final String TAG = "ReliableChannelHandler";
private static final boolean DEBUG = true;
private static final boolean DEBUG_INBOUND = DEBUG && true;
private static final boolean DEBUG_OUTBOUND = DEBUG && true;
private final TypeParameterMatcher matcher;
private final ReliableEndpoint endpoint;
public ReliableChannelHandler(ReliableEndpoint endpoint) {
this.endpoint = endpoint;
matcher = TypeParameterMatcher.get(DatagramPacket.class);
}
protected boolean accept(Object msg) throws Exception {
return matcher.match(msg);
}
protected void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
InetSocketAddress sender = packet.sender();
Gdx.app.log(TAG, "messageReceived received packet from " + sender.getHostName() + ":" + sender.getPort());
ByteBuf in = packet.content();
if (DEBUG_INBOUND) Gdx.app.debug(TAG, " " + ByteBufUtil.hexDump(in));
endpoint.messageReceived(ctx, packet);
}
protected Object writeMessage(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
InetSocketAddress receiver = (InetSocketAddress) ctx.channel().remoteAddress();
Gdx.app.log(TAG, "writeMessage sending packet to " + receiver.getHostName() + ":" + receiver.getPort());
ByteBuf out = msg;
if (DEBUG_OUTBOUND) Gdx.app.debug(TAG, " " + ByteBufUtil.hexDump(out));
return msg;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "handlerAdded");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "handlerRemoved");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Gdx.app.debug(TAG, "exceptionCaught");
ctx.fireExceptionCaught(cause);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "channelRegistered");
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "channelUnregistered");
ctx.fireChannelUnregistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "channelActive");
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "channelInactive");
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Gdx.app.debug(TAG, "channelRead");
boolean release = true;
try {
if (accept(msg)) {
messageReceived(ctx, (DatagramPacket) msg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (release) ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "channelReadComplete");
ctx.fireChannelReadComplete();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
Gdx.app.debug(TAG, "userEventTriggered");
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "channelWritabilityChanged");
ctx.fireChannelWritabilityChanged();
}
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
Gdx.app.debug(TAG, "bind");
ctx.bind(localAddress, promise);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
Gdx.app.debug(TAG, "connect");
ctx.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
Gdx.app.debug(TAG, "disconnect");
ctx.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
Gdx.app.debug(TAG, "close");
ctx.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
Gdx.app.debug(TAG, "deregister");
ctx.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "read");
ctx.read();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Gdx.app.debug(TAG, "write");
msg = writeMessage(ctx, (ByteBuf) msg);
ctx.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
Gdx.app.debug(TAG, "flush");
ctx.flush();
}
}

View File

@ -1,15 +0,0 @@
package com.riiablo.net.reliable;
public class ReliableConfiguration {
public int maxPacketSize = 16384;
public int fragmentThreshold = 1024;
public int maxFragments = 16;
public int fragmentSize = 1024;
public int sentPacketBufferSize = 256;
public int receivedPacketBufferSize = 256;
public int fragmentReassemblyBufferSize = 64;
public float rttSmoothingFactor = 0.25f;
public float packetLossSmoothingFactor = 0.1f;
public float bandwidthSmoothingFactor = 0.1f;
public int packetHeaderSize = 28;
}

View File

@ -1,144 +0,0 @@
package com.riiablo.net.reliable;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import java.nio.ByteBuffer;
import org.apache.commons.lang3.Validate;
import com.riiablo.net.Endpoint;
import com.riiablo.net.PacketProcessor;
import com.riiablo.net.reliable.channel.ReliableMessageChannel;
import com.riiablo.net.reliable.channel.UnreliableMessageChannel;
import com.riiablo.net.reliable.channel.UnreliableOrderedMessageChannel;
import com.riiablo.util.EnumIntMap;
public class ReliableEndpoint implements Endpoint<DatagramPacket>, MessageChannel.PacketTransceiver {
private static final String TAG = "ReliableEndpoint";
private static final boolean DEBUG = true;
private static final boolean DEBUG_QOS = DEBUG && true;
private static final boolean DEBUG_CHANNEL = DEBUG && true;
private static final boolean DEBUG_SEND = DEBUG && true;
private static final boolean DEBUG_RECEIVE = DEBUG && true;
private final DatagramChannel channel;
private final PacketProcessor packetProcessor;
private final EnumIntMap<QoS> defaultChannels;
private final MessageChannel[] channels;
public ReliableEndpoint(DatagramChannel channel, PacketProcessor packetProcessor) {
this.channel = channel;
this.packetProcessor = packetProcessor;
// for my purposes 3 works, channelId can be up to 255 though
channels = new MessageChannel[3];
channels[QoS.Reliable.ordinal()] = new ReliableMessageChannel(this);
channels[QoS.Unreliable.ordinal()] = new UnreliableMessageChannel(this);
channels[QoS.UnreliableOrdered.ordinal()] = new UnreliableOrderedMessageChannel(this);
defaultChannels = new EnumIntMap<>(QoS.class, -1);
for (QoS qos : QoS.values()) {
defaultChannels.put(qos, qos.ordinal());
}
}
@Override
public Channel channel() {
return channel;
}
public MessageChannel channel(int channelId) {
return channels[channelId];
}
@Override
public void reset() {
final MessageChannel[] channels = this.channels;
for (int i = 0, s = channels.length; i < s; i++) {
channels[i].reset();
}
}
@Override
public void update(float delta) {
final MessageChannel[] channels = this.channels;
for (int i = 0, s = channels.length; i < s; i++) {
channels[i].update(delta, i, channel);
}
}
@Override
public void sendMessage(ByteBuffer bb) {
if (DEBUG_SEND) Log.debug(TAG, "sendMessage (auto)");
sendMessage(QoS.Reliable, bb);
}
@Override
public void sendMessage(Object qos, ByteBuffer bb) {
if (DEBUG_SEND) Log.debug(TAG, "sendMessage");
assert qos instanceof QoS;
if (DEBUG_QOS) Log.debug(TAG, "sending message with %s QoS (0x%02x)", qos, ((QoS) qos).ordinal());
int channelId = defaultChannels.get((QoS) qos);
sendMessage(channelId, bb);
}
public void sendMessage(int channelId, ByteBuffer bb) {
if (DEBUG_SEND) Log.debug(TAG, "sendMessage");
Validate.inclusiveBetween(0x00, 0xFF, channelId, "channelId must fit within a ubyte");
if (DEBUG_CHANNEL) Log.debug(TAG, "sending message with on channel %d", channelId);
MessageChannel mc = channels[channelId];
mc.sendMessage(channelId, channel, Unpooled.wrappedBuffer(bb)); // automatically released
}
@Override
public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onMessageReceived");
int channelId = Packet.getChannelId(packet.content());
if (DEBUG_QOS) {
QoS qos = QoS.valueOf(channelId);
if (qos != null) {
Log.debug(TAG, "received message with %s QoS (0x%02x)", qos, channelId);
} else {
Log.debug(TAG, "received message with channel %d", channelId);
}
} else if (DEBUG_CHANNEL) {
Log.debug(TAG, "received message with channel %d", channelId);
}
MessageChannel mc = channels[channelId];
mc.onMessageReceived(ctx, packet);
}
@Override
public void sendPacket(ByteBuf bb) {
}
@Override
public void receivePacket(ByteBuf bb) {
packetProcessor.processPacket(bb);
}
public static final Stats stats = new Stats();
public static class Stats {
public int NUM_PACKETS_SENT;
public int NUM_PACKETS_RECEIVED;
public int NUM_PACKETS_ACKED;
public int NUM_PACKETS_STALE;
public int NUM_PACKETS_INVALID;
public int NUM_PACKETS_TOO_LARGE_TO_SEND;
public int NUM_PACKETS_TOO_LARGE_TO_RECEIVE;
public int NUM_FRAGMENTS_SENT;
public int NUM_FRAGMENTS_RECEIVED;
public int NUM_FRAGMENTS_INVALID;
public int NUM_ACKS_SENT;
public int NUM_ACKS_RECEIVED;
public int NUM_ACKS_INVALID;
}
}

View File

@ -1,321 +0,0 @@
package com.riiablo.net.reliable;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import com.badlogic.gdx.math.MathUtils;
import com.riiablo.net.reliable.data.FragmentReassemblyData;
import com.riiablo.net.reliable.data.ReceivedPacketData;
import com.riiablo.net.reliable.data.SentPacketData;
public class ReliablePacketController {
private static final String TAG = "ReliablePacketController";
private static final boolean DEBUG = true;
private static final boolean DEBUG_SEND = DEBUG && true;
private static final boolean DEBUG_RECEIVE = DEBUG && true;
private static final float TOLERANCE = 0.00001f;
private final ReliableConfiguration config;
private final MessageChannel channel;
private final SequenceBuffer<SentPacketData> sentPackets;
private final SequenceBuffer<ReceivedPacketData> receivedPackets;
private final SequenceBuffer<FragmentReassemblyData> fragmentReassembly;
private float time;
private float rtt;
private float packetLoss;
private float sentBandwidth;
private float receivedBandwidth;
private float ackedBandwidth;
public ReliablePacketController(ReliableConfiguration config, MessageChannel channel) {
this.config = config;
this.channel = channel;
this.sentPackets = new SequenceBuffer<>(SentPacketData.class, config.sentPacketBufferSize);
this.receivedPackets = new SequenceBuffer<>(ReceivedPacketData.class, config.receivedPacketBufferSize);
this.fragmentReassembly = new SequenceBuffer<>(FragmentReassemblyData.class, config.fragmentReassemblyBufferSize);
}
public float rtt() {
return rtt;
}
public void reset() {
channel.sequence = 0;
for (int i = 0, s = config.fragmentReassemblyBufferSize; i < s; i++) {
FragmentReassemblyData reassemblyData = fragmentReassembly.atIndex(i);
if (reassemblyData != null) reassemblyData.dataBuffer.clear();
}
sentPackets.reset();
receivedPackets.reset();
fragmentReassembly.reset();
}
public void update(float delta) {
time += delta;
updatePacketLoss();
updateSentBandwidth();
updateReceivedBandwidth();
updateAckedBandwidth();
}
private void updatePacketLoss() {
int baseSequence = (sentPackets.getSequence() - config.sentPacketBufferSize + 1 + Packet.USHORT_MAX_VALUE) & Packet.USHORT_MAX_VALUE;
int numDropped = 0;
int numSamples = config.sentPacketBufferSize / 2;
for (int i = 0; i < numSamples; i++) {
int sequence = (baseSequence + i) & Packet.USHORT_MAX_VALUE;
SentPacketData sentPacketData = sentPackets.find(sequence);
if (sentPacketData != null && !sentPacketData.acked) numDropped++;
}
float packetLoss = numDropped / (float) numSamples;
if (MathUtils.isEqual(this.packetLoss, packetLoss, TOLERANCE)) {
this.packetLoss += (packetLoss - this.packetLoss) * config.packetLossSmoothingFactor;
} else {
this.packetLoss = packetLoss;
}
}
private void updateSentBandwidth() {
int baseSequence = (sentPackets.getSequence() - config.sentPacketBufferSize + 1 + Packet.USHORT_MAX_VALUE) & Packet.USHORT_MAX_VALUE;
int bytesSent = 0;
float startTime = Float.MAX_VALUE;
float finishTime = 0f;
int numSamples = config.sentPacketBufferSize / 2;
for (int i = 0; i < numSamples; i++) {
int sequence = (baseSequence + i) & Packet.USHORT_MAX_VALUE;
SentPacketData sentPacketData = sentPackets.find(sequence);
if (sentPacketData == null) continue;
bytesSent += sentPacketData.packetSize;
startTime = Math.min(startTime, sentPacketData.time);
finishTime = Math.max(finishTime, sentPacketData.time);
}
if (startTime != Float.MAX_VALUE && finishTime != 0f) {
float sentBandwidth = bytesSent / (finishTime - startTime) * 8f / 1000f;
if (MathUtils.isEqual(this.sentBandwidth, sentBandwidth, TOLERANCE)) {
this.sentBandwidth += (sentBandwidth - this.sentBandwidth) * config.bandwidthSmoothingFactor;
} else {
this.sentBandwidth = sentBandwidth;
}
}
}
private void updateReceivedBandwidth() {
synchronized (receivedPackets) {
int baseSequence = (receivedPackets.getSequence() - config.receivedPacketBufferSize + 1 + Packet.USHORT_MAX_VALUE) & Packet.USHORT_MAX_VALUE;
int bytesReceived = 0;
float startTime = Float.MAX_VALUE;
float finishTime = 0f;
int numSamples = config.receivedPacketBufferSize / 2;
for (int i = 0; i < numSamples; i++) {
int sequence = (baseSequence + i) & Packet.USHORT_MAX_VALUE;
ReceivedPacketData receivedPacketData = receivedPackets.find(sequence);
if (receivedPacketData == null) continue;
bytesReceived += receivedPacketData.packetSize;
startTime = Math.min(startTime, receivedPacketData.time);
finishTime = Math.max(finishTime, receivedPacketData.time);
}
if (startTime != Float.MAX_VALUE && finishTime != 0f) {
float receivedBandwidth = bytesReceived / (finishTime - startTime) * 8f / 1000f;
if (MathUtils.isEqual(this.receivedBandwidth, receivedBandwidth, TOLERANCE)) {
this.receivedBandwidth += (receivedBandwidth - this.receivedBandwidth) * config.bandwidthSmoothingFactor;
} else {
this.receivedBandwidth = receivedBandwidth;
}
}
}
}
private void updateAckedBandwidth() {
int baseSequence = (sentPackets.getSequence() - config.sentPacketBufferSize + 1 + Packet.USHORT_MAX_VALUE) & Packet.USHORT_MAX_VALUE;
int bytesSent = 0;
float startTime = Float.MAX_VALUE;
float finishTime = 0f;
int numSamples = config.sentPacketBufferSize / 2;
for (int i = 0; i < numSamples; i++) {
int sequence = (baseSequence + i) & Packet.USHORT_MAX_VALUE;
SentPacketData sentPacketData = sentPackets.find(sequence);
if (sentPacketData == null || !sentPacketData.acked) continue;
bytesSent += sentPacketData.packetSize;
startTime = Math.min(startTime, sentPacketData.time);
finishTime = Math.max(finishTime, sentPacketData.time);
}
if (startTime != Float.MAX_VALUE && finishTime != 0f) {
float ackedBandwidth = bytesSent / (finishTime - startTime) * 8f / 1000f;
if (MathUtils.isEqual(this.ackedBandwidth, ackedBandwidth, TOLERANCE)) {
this.ackedBandwidth += (ackedBandwidth - this.ackedBandwidth) * config.bandwidthSmoothingFactor;
} else {
this.ackedBandwidth = ackedBandwidth;
}
}
}
public void sendAck(int channelId, DatagramChannel ch) {
if (DEBUG_SEND) Log.debug(TAG, "sendAck");
int ack, ackBits;
synchronized (receivedPackets) {
ack = receivedPackets.generateAck();
ackBits = receivedPackets.generateAckBits(ack);
}
ByteBuf packet = ch.alloc().directBuffer(config.packetHeaderSize);
int headerSize = Packet.writeAck(packet, channelId, ack, ackBits);
if (headerSize < 0) {
Log.error(TAG, "failed to write ack");
ReliableEndpoint.stats.NUM_ACKS_INVALID++;
return;
}
channel.onPacketTransmitted(packet);
ch.writeAndFlush(packet);
}
public int sendPacket(int channelId, DatagramChannel ch, ByteBuf bb) {
if (DEBUG_SEND) Log.debug(TAG, "sendPacket " + bb);
final int packetSize = bb.readableBytes();
if (packetSize > config.maxPacketSize) {
Log.error(TAG, "packet is too large to send (%d bytes), max packet size is %d bytes", packetSize, config.maxPacketSize);
ReliableEndpoint.stats.NUM_PACKETS_TOO_LARGE_TO_SEND++;
return -1;
}
final int sequence = channel.incSequence();
int ack, ackBits;
synchronized (receivedPackets) {
ack = receivedPackets.generateAck();
ackBits = receivedPackets.generateAckBits(ack);
}
SentPacketData sentPacketData = sentPackets.insert(sequence);
sentPacketData.time = this.time;
sentPacketData.packetSize = packetSize;
sentPacketData.acked = false;
if (packetSize <= config.fragmentThreshold) {
// regular packet
ByteBuf header = ch.alloc().buffer(config.packetHeaderSize);
int headerSize = Packet.writePacketHeader(header, channelId, sequence, ack, ackBits);
ByteBuf composite = ch.alloc().compositeBuffer(2)
.addComponent(true, header)
.addComponent(true, bb);
channel.onPacketTransmitted(composite);
ch.writeAndFlush(composite);
return headerSize;
} else {
// fragmented packet
throw new UnsupportedOperationException();
// return -1;
}
}
public void onPacketReceived(ChannelHandlerContext ctx, DatagramPacket packet) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketReceived " + packet);
final ByteBuf bb = packet.content();
final int packetSize = bb.readableBytes();
if (packetSize > config.maxPacketSize) {
Log.error(TAG, "packet is too large to receive (%d bytes), max packet size is %d bytes", packetSize, config.maxPacketSize);
ReliableEndpoint.stats.NUM_PACKETS_TOO_LARGE_TO_RECEIVE++;
return;
}
final byte flags = Packet.getFlags(bb);
if (!Packet.isFragmented(flags)) {
// regular packet
ReliableEndpoint.stats.NUM_PACKETS_RECEIVED++;
Packet.HeaderData headerData = null;
try {
headerData = Packet.obtainData();
int headerSize = Packet.readPacketHeader(config, bb, headerData);
if (headerSize == -1) {
Log.error(TAG, "ignoring invalid packet. could not read packet header");
ReliableEndpoint.stats.NUM_PACKETS_INVALID++;
return;
}
final boolean isStale;
final int sequence = headerData.sequence;
synchronized (receivedPackets) {
isStale = !receivedPackets.testInsert(sequence);
}
final boolean isAck = Packet.isAck(flags);
if (!isStale && !isAck) {
if (DEBUG_RECEIVE) Log.debug(TAG, "processing packet %d", sequence);
ByteBuf slice = bb.readSlice(bb.readableBytes());
channel.onPacketProcessed(sequence, slice);
synchronized (receivedPackets) {
ReceivedPacketData receivedPacketData = receivedPackets.insert(sequence);
receivedPacketData.time = time;
receivedPacketData.packetSize = packetSize;
}
}
if (!isStale || isAck) {
final int ack = headerData.ack;
for (int i = 0, ackBits = headerData.ackBits; i < Integer.SIZE && ackBits != 0; i++, ackBits >>>= 1) {
if ((ackBits & 1) != 0) {
int ackSequence = (ack - i) & Packet.USHORT_MAX_VALUE;
SentPacketData sentPacketData = sentPackets.find(ackSequence);
if (sentPacketData != null && !sentPacketData.acked) {
if (DEBUG_RECEIVE) Log.debug(TAG, "acked packet %d", ackSequence);
ReliableEndpoint.stats.NUM_PACKETS_ACKED++;
sentPacketData.acked = true;
channel.onAckProcessed(ackSequence);
float rtt = (time - sentPacketData.time) * 1000f;
if ((this.rtt == 0.0f && rtt > 0.0f) || MathUtils.isEqual(this.rtt, rtt, TOLERANCE)) {
this.rtt = rtt;
} else {
this.rtt += (rtt - this.rtt) * config.rttSmoothingFactor;
}
}
}
}
}
if (isStale) {
Log.error(TAG, "ignoring stale packet %d", sequence);
ReliableEndpoint.stats.NUM_PACKETS_STALE++;
return;
}
} finally {
if (headerData != null) headerData.free();
}
} else {
// fragmented packet
throw new UnsupportedOperationException();
}
}
public interface PacketListener {
void onPacketTransmitted(ByteBuf bb);
void onAckProcessed(int sequence);
void onPacketProcessed(int sequence, ByteBuf bb);
}
}

View File

@ -1,14 +0,0 @@
package com.riiablo.net.reliable;
public class ReliableUtils {
private ReliableUtils() {}
public static boolean sequenceGreaterThan(int s1, int s2) {
return ((s1 > s2) && (s1 - s2 <= Short.MAX_VALUE))
|| ((s1 < s2) && (s2 - s1 > Short.MAX_VALUE));
}
public static boolean sequenceLessThan(int s1, int s2) {
return sequenceGreaterThan(s2, s1);
}
}

View File

@ -1,111 +0,0 @@
package com.riiablo.net.reliable;
import java.util.Arrays;
import org.apache.commons.lang3.exception.ExceptionUtils;
public class SequenceBuffer<T> {
public static final int INVALID_SEQUENCE = -1; // 0xFFFFFFFF
private int sequence;
public final int numEntries;
private final int entrySequence[];
private final Object entryData[];
public SequenceBuffer(Class<T> dataContainer, int bufferSize) {
numEntries = bufferSize;
entrySequence = new int[numEntries];
Arrays.fill(entrySequence, INVALID_SEQUENCE);
entryData = new Object[numEntries];
try {
for (int i = 0; i < numEntries; i++) entryData[i] = dataContainer.newInstance();
} catch (Throwable t) {
ExceptionUtils.wrapAndThrow(t);
}
sequence = 0;
}
public int getSequence() {
return sequence;
}
public void reset() {
sequence = 0;
Arrays.fill(entrySequence, INVALID_SEQUENCE);
}
public void removeEntries(int startSequence, int endSequence) {
startSequence &= Packet.USHORT_MAX_VALUE;
endSequence &= Packet.USHORT_MAX_VALUE;
if (endSequence < startSequence) {
Arrays.fill(entrySequence, startSequence, numEntries, INVALID_SEQUENCE);
Arrays.fill(entrySequence, 0, endSequence, INVALID_SEQUENCE);
} else {
Arrays.fill(entrySequence, startSequence, endSequence, INVALID_SEQUENCE);
}
}
public boolean testInsert(int sequence) {
sequence &= Packet.USHORT_MAX_VALUE;
return !ReliableUtils.sequenceLessThan(sequence, (this.sequence - numEntries) & Packet.USHORT_MAX_VALUE);
}
@SuppressWarnings("unchecked")
public T insert(int sequence) {
sequence &= Packet.USHORT_MAX_VALUE;
if (ReliableUtils.sequenceLessThan(sequence, (this.sequence - numEntries) & Packet.USHORT_MAX_VALUE)) {
return null;
}
final int nextSequence = (sequence + 1) & Packet.USHORT_MAX_VALUE;
if (ReliableUtils.sequenceGreaterThan(nextSequence, this.sequence)) {
removeEntries(this.sequence, sequence);
this.sequence = nextSequence;
}
int index = sequence % numEntries;
entrySequence[index] = sequence;
return (T) entryData[index];
}
public void remove(int sequence) {
sequence &= Packet.USHORT_MAX_VALUE;
entrySequence[sequence % numEntries] = INVALID_SEQUENCE;
}
public boolean available(int sequence) {
sequence &= Packet.USHORT_MAX_VALUE;
return entrySequence[sequence % numEntries] == INVALID_SEQUENCE;
}
public boolean exists(int sequence) {
sequence &= Packet.USHORT_MAX_VALUE;
return entrySequence[sequence % numEntries] == sequence;
}
@SuppressWarnings("unchecked")
public T find(int sequence) {
sequence &= Packet.USHORT_MAX_VALUE;
int index = sequence % numEntries;
return entrySequence[index] == sequence ? (T) entryData[index] : null;
}
@SuppressWarnings("unchecked")
public T atIndex(int index) {
return entrySequence[index] != INVALID_SEQUENCE ? (T) entryData[index] : null;
}
public int generateAck() {
return (sequence - 1) & Packet.USHORT_MAX_VALUE;
}
public int generateAckBits(int ack) {
ack &= Packet.USHORT_MAX_VALUE;
int ackBits = 0;
for (int i = 0, mask = 1; i < Integer.SIZE; i++, mask <<= 1) {
if (exists(ack - i)) ackBits |= mask;
}
return ackBits;
}
}

View File

@ -1,308 +0,0 @@
package com.riiablo.net.reliable.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import com.badlogic.gdx.math.MathUtils;
import com.badlogic.gdx.utils.IntArray;
import com.badlogic.gdx.utils.Queue;
import com.riiablo.net.reliable.Log;
import com.riiablo.net.reliable.MessageChannel;
import com.riiablo.net.reliable.Packet;
import com.riiablo.net.reliable.ReliableConfiguration;
import com.riiablo.net.reliable.ReliableUtils;
import com.riiablo.net.reliable.SequenceBuffer;
public class ReliableMessageChannel extends MessageChannel {
private static final String TAG = "ReliableMessageChannel";
private static final boolean DEBUG = true;
private static final boolean DEBUG_SEND = DEBUG && true;
private static final boolean DEBUG_RECEIVE = DEBUG && true;
private final ByteBuf packetBuffer = Unpooled.buffer();
private final SequenceBuffer<BufferedPacket> sendBuffer;
private final SequenceBuffer<BufferedPacket> receiveBuffer;
private final SequenceBuffer<OutgoingPacketSet> ackBuffer;
private final Queue<ByteBuf> messageQueue = new Queue<>(64, ByteBuf.class);
private final IntArray outgoingMessageIds = new IntArray(256);
private float time;
private float lastBufferFlush;
private float lastMessageSend;
private int oldestUnacked;
// private int sequence; // hides MessageChannel#sequence
private int nextReceive;
private boolean congestionControl = false;
private float congestionDisableTimer;
private float congestionDisableInterval;
private float lastCongestionSwitchTime;
public ReliableMessageChannel(PacketTransceiver packetTransceiver) {
super(new ReliableConfiguration(), packetTransceiver);
this.sendBuffer = new SequenceBuffer<>(BufferedPacket.class, 256);
this.receiveBuffer = new SequenceBuffer<>(BufferedPacket.class, 256);
this.ackBuffer = new SequenceBuffer<>(OutgoingPacketSet.class, 256);
time = 0.0f;
lastBufferFlush = -1.0f;
lastMessageSend = 0.0f;
congestionDisableInterval = 5.0f;
sequence = 0;
nextReceive = 0;
oldestUnacked = 0;
}
@Override
public void reset() {
packetController.reset();
sendBuffer.reset();
// receiveBuffer.reset(); // this isn't in the original code? why?
ackBuffer.reset();
lastBufferFlush = -1.0f;
lastMessageSend = 0.0f;
congestionControl = false;
lastCongestionSwitchTime = 0.0f;
congestionDisableTimer = 0.0f;
congestionDisableInterval = 5.0f;
sequence = 0;
nextReceive = 0;
oldestUnacked = 0;
}
@Override
public void update(float delta, int channelId, DatagramChannel ch) {
packetController.update(delta);
time += delta;
// see if we can pop messages off of the message queue and put them into the send queue
updateQueue(channelId, ch);
updateCongestion(delta, channelId, ch);
}
private void updateQueue(int channelId, DatagramChannel ch) {
if (messageQueue.size > 0) {
int sendBufferSize = 0;
for (int seq = oldestUnacked; ReliableUtils.sequenceLessThan(seq, sequence); seq = (seq + 1) & Packet.USHORT_MAX_VALUE) {
if (sendBuffer.exists(seq)) sendBufferSize++;
}
if (sendBufferSize < sendBuffer.numEntries) {
ByteBuf packetData = messageQueue.removeFirst();
sendMessage(channelId, ch, packetData);
}
}
}
private void updateCongestion(float delta, int channelId, DatagramChannel ch) {
boolean conditionsBad = packetController.rtt() >= 250.0f; // 250ms
// if conditions are bad, immediately enable congestion control and reset the congestion timer
if (conditionsBad) {
if (!congestionControl) {
// if we're within 10 seconds of the last time we switched, double the threshold interval
if (time - lastCongestionSwitchTime < 10.0) {
congestionDisableInterval = Math.min(congestionDisableInterval * 2, 60.0f);
}
lastCongestionSwitchTime = time;
}
congestionControl = true;
congestionDisableTimer = 0.0f;
}
// if we're in bad mode, and conditions are good, update the timer and see if we can disable
// congestion control
if (congestionControl && !conditionsBad) {
congestionDisableTimer += delta;
if (congestionDisableTimer >= congestionDisableInterval) {
congestionControl = false;
lastCongestionSwitchTime = time;
congestionDisableTimer = 0.0f;
}
}
// as long as conditions are good, halve the threshold interval every 10 seconds
if (!congestionControl) {
congestionDisableTimer += delta;
if (congestionDisableTimer > 10.0f) {
congestionDisableInterval = Math.max(congestionDisableInterval * 0.5f, 5.0f);
}
}
// if we're in congestion control mode, only send packets 10 times per second. otherwise, send
// 30 times per second
float flushInterval = congestionControl ? (1.0f / 10) : (1.0f / 30);
if (time - lastBufferFlush >= flushInterval) {
lastBufferFlush = time;
processSendBuffer(channelId, ch);
}
}
private void processSendBuffer(int channelId, DatagramChannel ch) {
// int numUnacked = 0;
// for (int seq = oldestUnacked; ReliableUtils.sequenceLessThan(seq, sequence); seq = (seq + 1) & Packet.USHORT_MAX_VALUE) {
// numUnacked++;
// }
for (int seq = oldestUnacked; ReliableUtils.sequenceLessThan(seq, sequence); seq = (seq + 1) & Packet.USHORT_MAX_VALUE) {
// never send message ID >= (oldestUnacked + bufferSize)
if (seq >= (oldestUnacked + 256)) break;
// for any message that hasn't been sent in the last 0.1 seconds and fits in the available
// space of our message packer, add it
BufferedPacket packet = sendBuffer.find(seq);
if (packet != null && !packet.writeLock) {
if (MathUtils.isEqual(time, packet.time, 0.1f)) continue;
boolean packetFits = false;
int packetSize = packetBuffer.readableBytes() + packet.bb.readableBytes();
if (packet.bb.readableBytes() < config.fragmentThreshold) {
packetFits = packetSize <= (config.fragmentThreshold - Packet.MAX_PACKET_HEADER_SIZE);
} else {
packetFits = packetSize <= (config.maxPacketSize - Packet.FRAGMENT_HEADER_SIZE - Packet.MAX_PACKET_HEADER_SIZE);
}
// if the packet won't fit, flush the message packet
if (!packetFits) {
flushPacketBuffer(channelId, ch);
}
packet.time = time;
packetBuffer.writeBytes(packet.bb);
outgoingMessageIds.add(seq);
lastMessageSend = time;
}
}
// if it has been 0.1 seconds since the last time we sent a message, send an empty message
if (time - lastMessageSend >= 0.1f) {
packetController.sendAck(channelId, ch);
lastMessageSend = time;
}
// flush and remaining messages in the packet buffer
flushPacketBuffer(channelId, ch);
}
private void flushPacketBuffer(int channelId, DatagramChannel ch) {
if (packetBuffer.readableBytes() > 0) {
int outgoingSeq = packetController.sendPacket(channelId, ch, packetBuffer);
OutgoingPacketSet outgoingPacket = ackBuffer.insert(outgoingSeq);
// store message IDs so we can map packet-level acks to message ID acks
outgoingPacket.messageIds.clear();
outgoingPacket.messageIds.addAll(outgoingMessageIds);
packetBuffer.clear();
outgoingMessageIds.clear();
}
}
@Override
public void sendMessage(int channelId, DatagramChannel ch, ByteBuf bb) {
if (DEBUG_SEND) Log.debug(TAG, "sendMessage " + bb);
int sendBufferSize = 0;
for (int seq = oldestUnacked; ReliableUtils.sequenceLessThan(seq, sequence); seq = (seq + 1) & Packet.USHORT_MAX_VALUE) {
if (sendBuffer.exists(seq)) sendBufferSize++;
}
// TODO: make sure this doesn't leak
if (sendBufferSize == sendBuffer.numEntries) {
messageQueue.addLast(bb);
return;
}
final int sequence = incSequence();
BufferedPacket packet = sendBuffer.insert(sequence);
packet.time = -1.0f;
// ensure size for header
// TODO: there appears to be extra code here outside of the spec that aggregates multiple
// messages together and prepends messageId and messageLength fields. Maybe this was done
// to group up smaller messages? Needs to be looked into more.
// https://github.com/KillaMaaki/ReliableNetcode.NET/blob/c5a7339e2de70f52bfda2078f1bbdab2ec9a85c1/ReliableNetcode/MessageChannel.cs#L331-L393
packet.bb = bb;
packet.writeLock = false;
}
@Override
public void onMessageReceived(ChannelHandlerContext ctx, DatagramPacket packet) {
if (DEBUG_SEND) Log.debug(TAG, "onMessageReceived " + packet);
packetController.onPacketReceived(ctx, packet);
}
@Override
public void onPacketTransmitted(ByteBuf bb) {
}
@Override
public void onAckProcessed(int sequence) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onAckProcessed " + sequence);
// first, map sequence to message IDs and ack them
OutgoingPacketSet outgoingPacket = ackBuffer.find(sequence);
if (outgoingPacket == null) return;
// process messages
final int[] messageIds = outgoingPacket.messageIds.items;
for (int i = 0, s = outgoingPacket.messageIds.size; i < s; i++) {
// remove acked message from send buffer
int messageId = messageIds[i];
if (sendBuffer.exists(messageId)) {
sendBuffer.find(messageId).writeLock = true;
sendBuffer.remove(messageId);
}
}
// update oldest unacked message
boolean allAcked = true;
for (int seq = oldestUnacked;
seq == this.sequence || ReliableUtils.sequenceLessThan(seq, this.sequence);
seq = (seq + 1) & Packet.USHORT_MAX_VALUE) {
// if it's still in the send buffer, it hasn't been acked
if (sendBuffer.exists(seq)) {
oldestUnacked = seq;
allAcked = false;
break;
}
}
if (allAcked) oldestUnacked = this.sequence;
}
@Override
public void onPacketProcessed(int sequence, ByteBuf bb) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + sequence + " " + bb);
packetTransceiver.receivePacket(bb);
// TODO: this is different from original function, see above note within #sendMessage
}
public static class BufferedPacket {
boolean writeLock = true;
float time;
ByteBuf bb;
}
public static class OutgoingPacketSet {
final IntArray messageIds = new IntArray();
}
}

View File

@ -1,59 +0,0 @@
package com.riiablo.net.reliable.channel;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import com.riiablo.net.reliable.Log;
import com.riiablo.net.reliable.MessageChannel;
import com.riiablo.net.reliable.ReliableConfiguration;
public class UnreliableMessageChannel extends MessageChannel {
private static final String TAG = "UnreliableMessageChannel";
private static final boolean DEBUG = true;
private static final boolean DEBUG_SEND = DEBUG && true;
private static final boolean DEBUG_RECEIVE = DEBUG && true;
public UnreliableMessageChannel(PacketTransceiver packetTransceiver) {
super(new ReliableConfiguration(), packetTransceiver);
}
@Override
public void onPacketTransmitted(ByteBuf bb) {
}
@Override
public void onAckProcessed(int sequence) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onAckProcessed " + sequence);
}
@Override
public void onPacketProcessed(int sequence, ByteBuf bb) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + sequence + " " + bb);
packetTransceiver.receivePacket(bb);
}
@Override
public void reset() {
packetController.reset();
}
@Override
public void update(float delta, int channelId, DatagramChannel ch) {
packetController.update(delta);
}
@Override
public void sendMessage(int channelId, DatagramChannel ch, ByteBuf bb) {
if (DEBUG_SEND) Log.debug(TAG, "sendMessage " + bb);
packetController.sendPacket(channelId, ch, bb);
}
@Override
public void onMessageReceived(ChannelHandlerContext ctx, DatagramPacket packet) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onMessageReceived " + packet);
packetController.onPacketReceived(ctx, packet);
}
}

View File

@ -1,68 +0,0 @@
package com.riiablo.net.reliable.channel;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import com.riiablo.net.reliable.Log;
import com.riiablo.net.reliable.MessageChannel;
import com.riiablo.net.reliable.Packet;
import com.riiablo.net.reliable.ReliableConfiguration;
import com.riiablo.net.reliable.ReliableUtils;
public class UnreliableOrderedMessageChannel extends MessageChannel {
private static final String TAG = "UnreliableOrderedMessageChannel";
private static final boolean DEBUG = true;
private static final boolean DEBUG_SEND = DEBUG && true;
private static final boolean DEBUG_RECEIVE = DEBUG && true;
private int nextSequence = 0;
public UnreliableOrderedMessageChannel(PacketTransceiver packetTransceiver) {
super(new ReliableConfiguration(), packetTransceiver);
}
@Override
public void reset() {
nextSequence = 0;
packetController.reset();
}
@Override
public void update(float delta, int channelId, DatagramChannel ch) {
packetController.update(delta);
}
@Override
public void sendMessage(int channelId, DatagramChannel ch, ByteBuf bb) {
if (DEBUG_SEND) Log.debug(TAG, "sendMessage " + bb);
packetController.sendPacket(channelId, ch, bb);
}
@Override
public void onMessageReceived(ChannelHandlerContext ctx, DatagramPacket packet) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onMessageReceived " + packet);
packetController.onPacketReceived(ctx, packet);
}
@Override
public void onPacketTransmitted(ByteBuf bb) {
packetTransceiver.sendPacket(bb);
}
@Override
public void onAckProcessed(int sequence) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onAckProcessed " + sequence);
}
@Override
public void onPacketProcessed(int sequence, ByteBuf bb) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + sequence + " " + bb);
if (sequence == nextSequence || ReliableUtils.sequenceGreaterThan(sequence, nextSequence)) {
nextSequence = (sequence + 1) & Packet.USHORT_MAX_VALUE;
packetTransceiver.receivePacket(bb);
}
}
}

View File

@ -1,18 +0,0 @@
package com.riiablo.net.reliable.data;
import io.netty.buffer.ByteBuf;
import com.artemis.utils.BitVector;
public class FragmentReassemblyData {
public int sequence;
public int ack;
public int ackBits;
public int numFragmentsReceived;
public int numFragmentsTotal;
public ByteBuf dataBuffer;
public int packetBytes;
public int headerOffset;
public final BitVector fragmentReceived = new BitVector(256);
}

View File

@ -1,6 +0,0 @@
package com.riiablo.net.reliable.data;
public class ReceivedPacketData {
public float time;
public int packetSize;
}

View File

@ -1,7 +0,0 @@
package com.riiablo.net.reliable.data;
public class SentPacketData {
public float time;
public boolean acked;
public int packetSize;
}

View File

@ -1,56 +0,0 @@
package com.riiablo.net.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.nio.ByteBuffer;
import com.badlogic.gdx.Gdx;
import com.riiablo.net.Endpoint;
import com.riiablo.net.PacketProcessor;
public class TcpEndpoint implements Endpoint<ByteBuf> {
private static final String TAG = "TcpEndpoint";
private static final boolean DEBUG = true;
private static final boolean DEBUG_SEND = DEBUG && true;
private static final boolean DEBUG_RECEIVE = DEBUG && true;
private final Channel channel;
private final PacketProcessor packetProcessor;
public TcpEndpoint(Channel channel, PacketProcessor packetProcessor) {
this.channel = channel;
this.packetProcessor = packetProcessor;
}
@Override
public Channel channel() {
return channel;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) {
if (DEBUG_RECEIVE) Gdx.app.debug(TAG, "onMessageReceived");
packetProcessor.processPacket(msg);
}
@Override
public void sendMessage(ByteBuffer bb) {
if (DEBUG_SEND) Gdx.app.debug(TAG, "sendMessage");
channel.writeAndFlush(Unpooled.wrappedBuffer(bb)); // releases msg
}
@Override
public void sendMessage(Object qos, ByteBuffer bb) {
sendMessage(bb);
}
@Override
public void reset() {}
@Override
public void update(float delta) {}
}