diff --git a/server/netty/src/com/riiablo/net/reliable/Log.java b/server/netty/src/com/riiablo/net/reliable/Log.java new file mode 100644 index 00000000..ba333fa0 --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/Log.java @@ -0,0 +1,31 @@ +package com.riiablo.net.reliable; + +import com.badlogic.gdx.Gdx; + +public class Log { + private Log() {} + + public static void error(String tag, String message) { + Gdx.app.error(tag, message); + } + + public static void error(String tag, String format, Object... args) { + error(tag, String.format(format, args)); + } + + public static void log(String tag, String message) { + Gdx.app.log(tag, message); + } + + public static void log(String tag, String format, Object... args) { + log(tag, String.format(format, args)); + } + + public static void debug(String tag, String message) { + Gdx.app.debug(tag, message); + } + + public static void debug(String tag, String format, Object... args) { + debug(tag, String.format(format, args)); + } +} diff --git a/server/netty/src/com/riiablo/net/reliable/MessageChannel.java b/server/netty/src/com/riiablo/net/reliable/MessageChannel.java new file mode 100644 index 00000000..93dfb233 --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/MessageChannel.java @@ -0,0 +1,30 @@ +package com.riiablo.net.reliable; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; + +public abstract class MessageChannel implements ReliablePacketController.PacketListener { + protected final PacketTransceiver packetTransceiver; + protected final ReliableConfiguration config; + protected final ReliablePacketController packetController; + + protected int sequence; + + public MessageChannel(ReliableConfiguration config, PacketTransceiver packetTransceiver) { + this.packetTransceiver = packetTransceiver; + this.config = config; + this.packetController = new ReliablePacketController(config, this); + } + + public abstract void reset(); + public abstract void update(long time, DatagramChannel ch); + public abstract void sendMessage(int channelId, DatagramChannel ch, ByteBuf bb); + public abstract void onMessageReceived(ChannelHandlerContext ctx, DatagramPacket packet); + + public interface PacketTransceiver { + void sendPacket(ByteBuf bb); + void receivePacket(int sequence, ByteBuf bb); + } +} diff --git a/server/netty/src/com/riiablo/net/reliable/PacketData.java b/server/netty/src/com/riiablo/net/reliable/PacketData.java new file mode 100644 index 00000000..69dbb9c9 --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/PacketData.java @@ -0,0 +1,4 @@ +package com.riiablo.net.reliable; + +public interface PacketData { +} diff --git a/server/netty/src/com/riiablo/net/reliable/QoS.java b/server/netty/src/com/riiablo/net/reliable/QoS.java new file mode 100644 index 00000000..4a83b0f8 --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/QoS.java @@ -0,0 +1,27 @@ +package com.riiablo.net.reliable; + +public enum QoS { + /** + * Message is guaranteed to arrive and in order. + */ + Reliable, + + /** + * Message is not guaranteed delivery nor order. + */ + Unreliable, + + /** + * Message is not guaranteed delivery, but will be in order + */ + UnreliableOrdered; + + public static QoS valueOf(int i) { + switch (i) { + case 0: return Reliable; + case 1: return Unreliable; + case 2: return UnreliableOrdered; + default: return null; + } + } +} diff --git a/server/netty/src/com/riiablo/net/reliable/ReliableChannelHandler.java b/server/netty/src/com/riiablo/net/reliable/ReliableChannelHandler.java new file mode 100644 index 00000000..fd8a8583 --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/ReliableChannelHandler.java @@ -0,0 +1,175 @@ +package com.riiablo.net.reliable; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelOutboundHandler; +import io.netty.channel.ChannelPromise; +import io.netty.channel.socket.DatagramPacket; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.TypeParameterMatcher; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import com.badlogic.gdx.Gdx; + +public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHandler, ChannelOutboundHandler { + private static final String TAG = "ReliableChannelHandler"; + + private static final boolean DEBUG = true; + private static final boolean DEBUG_INBOUND = DEBUG && true; + private static final boolean DEBUG_OUTBOUND = DEBUG && true; + + private final TypeParameterMatcher matcher; + private final ReliableEndpoint endpoint; + + public ReliableChannelHandler(ReliableEndpoint endpoint) { + this.endpoint = endpoint; + matcher = TypeParameterMatcher.get(DatagramPacket.class); + } + + protected boolean accept(Object msg) throws Exception { + return matcher.match(msg); + } + + protected void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception { + InetSocketAddress sender = packet.sender(); + Gdx.app.log(TAG, "channelRead0 received packet from " + sender.getHostName() + ":" + sender.getPort()); + ByteBuf in = packet.content(); + if (DEBUG_INBOUND) Gdx.app.debug(TAG, " " + ByteBufUtil.hexDump(in)); + endpoint.messageReceived(ctx, packet); + } + + protected Object writeMessage(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + InetSocketAddress receiver = (InetSocketAddress) ctx.channel().remoteAddress(); + Gdx.app.log(TAG, "channelWrite0 sending packet to " + receiver.getHostName() + ":" + receiver.getPort()); + ByteBuf out = msg; + if (DEBUG_OUTBOUND) Gdx.app.debug(TAG, " " + ByteBufUtil.hexDump(out)); + return msg; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + Gdx.app.debug(TAG, "handlerAdded"); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + Gdx.app.debug(TAG, "handlerRemoved"); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + Gdx.app.debug(TAG, "exceptionCaught"); + ctx.fireExceptionCaught(cause); + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + Gdx.app.debug(TAG, "channelRegistered"); + ctx.fireChannelRegistered(); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + Gdx.app.debug(TAG, "channelUnregistered"); + ctx.fireChannelUnregistered(); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + Gdx.app.debug(TAG, "channelActive"); + ctx.fireChannelActive(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + Gdx.app.debug(TAG, "channelInactive"); + ctx.fireChannelInactive(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + Gdx.app.debug(TAG, "channelRead"); + boolean release = true; + try { + if (accept(msg)) { + messageReceived(ctx, (DatagramPacket) msg); + } else { + release = false; + ctx.fireChannelRead(msg); + } + } finally { + if (release) ReferenceCountUtil.release(msg); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + Gdx.app.debug(TAG, "channelReadComplete"); + ctx.fireChannelReadComplete(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + Gdx.app.debug(TAG, "userEventTriggered"); + ctx.fireUserEventTriggered(evt); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + Gdx.app.debug(TAG, "channelWritabilityChanged"); + ctx.fireChannelWritabilityChanged(); + } + + @Override + public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { + Gdx.app.debug(TAG, "bind"); + ctx.bind(localAddress, promise); + } + + @Override + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { + Gdx.app.debug(TAG, "connect"); + ctx.connect(remoteAddress, localAddress, promise); + } + + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + Gdx.app.debug(TAG, "disconnect"); + ctx.disconnect(promise); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + Gdx.app.debug(TAG, "close"); + ctx.close(promise); + } + + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + Gdx.app.debug(TAG, "deregister"); + ctx.deregister(promise); + } + + @Override + public void read(ChannelHandlerContext ctx) throws Exception { + Gdx.app.debug(TAG, "read"); + ctx.read(); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + Gdx.app.debug(TAG, "write"); + msg = writeMessage(ctx, (ByteBuf) msg); + ctx.write(msg, promise); + } + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + Gdx.app.debug(TAG, "flush"); + ctx.flush(); + } +} diff --git a/server/netty/src/com/riiablo/net/reliable/ReliableConfiguration.java b/server/netty/src/com/riiablo/net/reliable/ReliableConfiguration.java new file mode 100644 index 00000000..03a097e6 --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/ReliableConfiguration.java @@ -0,0 +1,15 @@ +package com.riiablo.net.reliable; + +public class ReliableConfiguration { + public int maxPacketSize = 16384; + public int fragmentThreshold = 1024; + public int maxFragments = 16; + public int fragmentSize = 1024; + public int sentPacketBufferSize = 256; + public int receivedPacketBufferSize = 256; + public int fragmentReassemblyBufferSize = 64; + public float rttSmoothingFactor = 0.25f; + public float packetLossSmoothingFactor = 0.1f; + public float bandwidthSmoothingFactor = 0.1f; + public int packetHeaderSize = 28; +} diff --git a/server/netty/src/com/riiablo/net/reliable/ReliableEndpoint.java b/server/netty/src/com/riiablo/net/reliable/ReliableEndpoint.java new file mode 100644 index 00000000..ec632875 --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/ReliableEndpoint.java @@ -0,0 +1,121 @@ +package com.riiablo.net.reliable; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import org.apache.commons.lang3.Validate; + +import com.riiablo.net.reliable.channel.ReliableMessageChannel; +import com.riiablo.net.reliable.channel.UnreliableMessageChannel; +import com.riiablo.net.reliable.channel.UnreliableOrderedMessageChannel; +import com.riiablo.util.EnumIntMap; + +public class ReliableEndpoint implements MessageChannel.PacketTransceiver { + private static final String TAG = "ReliableEndpoint"; + + private static final boolean DEBUG = true; + private static final boolean DEBUG_QOS = DEBUG && true; + private static final boolean DEBUG_CHANNEL = DEBUG && true; + private static final boolean DEBUG_SEND = DEBUG && true; + private static final boolean DEBUG_RECEIVE = DEBUG && true; + + private final DatagramChannel channel; + private final PacketProcessor packetProcessor; + + private final EnumIntMap defaultChannels; + private final MessageChannel[] channels; + + public ReliableEndpoint(DatagramChannel channel, PacketProcessor packetProcessor) { + this.channel = channel; + this.packetProcessor = packetProcessor; + + // for my purposes 3 works, channelId can be up to 255 though + channels = new MessageChannel[3]; + channels[QoS.Reliable.ordinal()] = new ReliableMessageChannel(this); + channels[QoS.Unreliable.ordinal()] = new UnreliableMessageChannel(this); + channels[QoS.UnreliableOrdered.ordinal()] = new UnreliableOrderedMessageChannel(this); + + defaultChannels = new EnumIntMap<>(QoS.class, -1); + for (QoS qos : QoS.values()) { + defaultChannels.put(qos, qos.ordinal()); + } + } + + public InetSocketAddress remoteAddress() { + return channel.remoteAddress(); + } + + public void reset() { + for (MessageChannel mc : channels) if (mc != null) mc.reset(); + } + + public void update(long time) { + for (MessageChannel mc : channels) if (mc != null) mc.update(time, channel); + } + + public void sendMessage(QoS qos, ByteBuffer bb) { + if (DEBUG_SEND) Log.debug(TAG, "sendMessage"); + if (DEBUG_QOS) Log.debug(TAG, "sending message with %s QoS (0x%02x)", qos, qos.ordinal()); + int channelId = defaultChannels.get(qos); + sendMessage(channelId, bb); + } + + public void sendMessage(int channelId, ByteBuffer bb) { + if (DEBUG_SEND) Log.debug(TAG, "sendMessage"); + Validate.inclusiveBetween(0x00, 0xFF, channelId, "channelId must fit within a ubyte"); + if (DEBUG_CHANNEL) Log.debug(TAG, "sending message with on channel %d", channelId); + MessageChannel mc = channels[channelId]; + mc.sendMessage(channelId, channel, Unpooled.wrappedBuffer(bb)); // automatically released + } + + public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) { + if (DEBUG_RECEIVE) Log.debug(TAG, "onMessageReceived"); + int channelId = Packet.getChannelId(packet.content()); + if (DEBUG_QOS) { + QoS qos = QoS.valueOf(channelId); + if (qos != null) { + Log.debug(TAG, "received message with %s QoS (0x%02x)", qos, channelId); + } else { + Log.debug(TAG, "received message with channel %d", channelId); + } + } else if (DEBUG_CHANNEL) { + Log.debug(TAG, "received message with channel %d", channelId); + } + + MessageChannel mc = channels[channelId]; + mc.onMessageReceived(ctx, packet); + } + + @Override + public void sendPacket(ByteBuf bb) { + + } + + @Override + public void receivePacket(int sequence, ByteBuf bb) { + packetProcessor.processPacket(sequence, bb); + } + + public interface PacketProcessor { + void processPacket(int sequence, ByteBuf bb); + } + + public static final Stats stats = new Stats(); + + public static class Stats { + public int NUM_PACKETS_SENT; + public int NUM_PACKETS_RECEIVED; + public int NUM_PACKETS_ACKED; + public int NUM_PACKETS_STALE; + public int NUM_PACKETS_INVALID; + public int NUM_PACKETS_TOO_LARGE_TO_SEND; + public int NUM_PACKETS_TOO_LARGE_TO_RECEIVE; + public int NUM_FRAGMENTS_SENT; + public int NUM_FRAGMENTS_RECEIVED; + public int NUM_FRAGMENTS_INVALID; + } +} diff --git a/server/netty/src/com/riiablo/net/reliable/ReliablePacketController.java b/server/netty/src/com/riiablo/net/reliable/ReliablePacketController.java new file mode 100644 index 00000000..f4f0bacb --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/ReliablePacketController.java @@ -0,0 +1,149 @@ +package com.riiablo.net.reliable; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; + +import com.riiablo.net.reliable.data.FragmentReassemblyData; +import com.riiablo.net.reliable.data.ReceivedPacketData; +import com.riiablo.net.reliable.data.SentPacketData; + +public class ReliablePacketController { + private static final String TAG = "ReliablePacketController"; + + private static final boolean DEBUG = true; + private static final boolean DEBUG_SEND = DEBUG && true; + private static final boolean DEBUG_RECEIVE = DEBUG && true; + + private final ReliableConfiguration config; + private final MessageChannel channel; + + private final SequenceBuffer sentPackets; + private final SequenceBuffer receivedPackets; + private final SequenceBuffer fragmentReassembly; + + public ReliablePacketController(ReliableConfiguration config, MessageChannel channel) { + this.config = config; + this.channel = channel; + + this.sentPackets = new SequenceBuffer<>(SentPacketData.class, config.sentPacketBufferSize); + this.receivedPackets = new SequenceBuffer<>(ReceivedPacketData.class, config.receivedPacketBufferSize); + this.fragmentReassembly = new SequenceBuffer<>(FragmentReassemblyData.class, config.fragmentReassemblyBufferSize); + } + + public int nextSequence() { + return channel.sequence; + } + + private int incSequence() { + return channel.sequence = (channel.sequence + 1) & Packet.USHORT_MAX_VALUE; + } + + public void reset() { + + } + + public void update(long time) { + + } + + public void sendAck(int channelId, DatagramChannel ch) { + + } + + public int sendPacket(int channelId, DatagramChannel ch, ByteBuf bb) { + if (DEBUG_SEND) Log.debug(TAG, "sendPacket " + bb); + + final int packetSize = bb.readableBytes(); + if (packetSize > config.maxPacketSize) { + Log.error(TAG, "packet is too large to send (%d bytes), max packet size is %d bytes", packetSize, config.maxPacketSize); + ReliableEndpoint.stats.NUM_PACKETS_TOO_LARGE_TO_SEND++; + return -1; + } + + final int sequence = incSequence(); + int ack, ackBits; + synchronized (receivedPackets) { + ack = receivedPackets.generateAck(); + ackBits = receivedPackets.generateAckBits(ack); + } + + SentPacketData sentPacketData = sentPackets.insert(sequence); +// sentPacketData.time = this.time; +// sentPacketData.packetSize = + sentPacketData.acked = false; + + if (packetSize <= config.fragmentThreshold) { + // regular packet + + ByteBuf header = ch.alloc().buffer(config.packetHeaderSize); + int headerSize = Packet.writePacketHeader(header, channelId, sequence, ack, ackBits); + + ByteBuf composite = ch.alloc().compositeBuffer(2) + .addComponent(true, header) + .addComponent(true, bb); + + ch.writeAndFlush(composite); + return headerSize; + } else { + // fragmented packet + + throw new UnsupportedOperationException(); +// return -1; + } + } + + public void onPacketReceived(ChannelHandlerContext ctx, DatagramPacket packet) { + if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketReceived " + packet); + + final ByteBuf bb = packet.content(); + final int packetSize = bb.readableBytes(); + if (packetSize > config.maxPacketSize) { + Log.error(TAG, "packet is too large to receive (%d bytes), max packet size is %d bytes", packetSize, config.maxPacketSize); + ReliableEndpoint.stats.NUM_PACKETS_TOO_LARGE_TO_RECEIVE++; + return; + } + + final byte flags = Packet.getFlags(bb); + if (!Packet.isFragmented(flags)) { + // regular packet + + ReliableEndpoint.stats.NUM_PACKETS_RECEIVED++; + + Packet.HeaderData headerData = null; + try { + headerData = Packet.obtainData(); + int headerSize = Packet.readPacketHeader(config, bb, headerData); + if (headerSize == -1) { + Log.error(TAG, "ignoring invalid packet. could not read packet header"); + ReliableEndpoint.stats.NUM_PACKETS_INVALID++; + return; + } + + final int sequence = headerData.sequence; + if (!receivedPackets.testInsert(sequence)) { + Log.error(TAG, "ignoring stale packet %d", sequence); + ReliableEndpoint.stats.NUM_PACKETS_STALE++; + return; + } + + if (DEBUG_RECEIVE) Log.debug(TAG, "processing packet %d", sequence); + ByteBuf slice = bb.readSlice(bb.readableBytes()); + channel.onPacketProcessed(sequence, slice); + // TODO... + } finally { + if (headerData != null) headerData.free(); + } + } else { + // fragmented packet + + throw new UnsupportedOperationException(); + } + } + + public interface PacketListener { + void onPacketTransmitted(ByteBuf bb); + void onPacketProcessed(int sequence, ByteBuf bb); + } +} diff --git a/server/netty/src/com/riiablo/net/reliable/ReliableUtils.java b/server/netty/src/com/riiablo/net/reliable/ReliableUtils.java new file mode 100644 index 00000000..bfb5178b --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/ReliableUtils.java @@ -0,0 +1,14 @@ +package com.riiablo.net.reliable; + +public class ReliableUtils { + private ReliableUtils() {} + + public static boolean sequenceGreaterThan(int s1, int s2) { + return ((s1 > s2) && (s1 - s2 <= Short.MAX_VALUE)) + || ((s1 < s2) && (s2 - s1 > Short.MAX_VALUE)); + } + + public static boolean sequenceLessThan(int s1, int s2) { + return sequenceGreaterThan(s2, s1); + } +} diff --git a/server/netty/src/com/riiablo/net/reliable/SequenceBuffer.java b/server/netty/src/com/riiablo/net/reliable/SequenceBuffer.java new file mode 100644 index 00000000..6827b2ae --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/SequenceBuffer.java @@ -0,0 +1,107 @@ +package com.riiablo.net.reliable; + +import java.util.Arrays; +import org.apache.commons.lang3.exception.ExceptionUtils; + +public class SequenceBuffer { + public static final int INVALID_SEQUENCE = -1; // 0xFFFFFFFF + + private int sequence; + + private final int numEntries; + private final int entrySequence[]; + private final Object entryData[]; + + public SequenceBuffer(Class dataContainer, int bufferSize) { + numEntries = bufferSize; + entrySequence = new int[numEntries]; + Arrays.fill(entrySequence, INVALID_SEQUENCE); + entryData = new Object[numEntries]; + try { + for (int i = 0; i < numEntries; i++) entryData[i] = dataContainer.newInstance(); + } catch (Throwable t) { + ExceptionUtils.wrapAndThrow(t); + } + + sequence = 0; + } + + public void reset() { + sequence = 0; + Arrays.fill(entrySequence, INVALID_SEQUENCE); + } + + public void removeEntries(int startSequence, int endSequence) { + startSequence &= Packet.USHORT_MAX_VALUE; + endSequence &= Packet.USHORT_MAX_VALUE; + if (endSequence < startSequence) { + Arrays.fill(entrySequence, startSequence, numEntries, INVALID_SEQUENCE); + Arrays.fill(entrySequence, 0, endSequence, INVALID_SEQUENCE); + } else { + Arrays.fill(entrySequence, startSequence, endSequence, INVALID_SEQUENCE); + } + } + + public boolean testInsert(int sequence) { + sequence &= Packet.USHORT_MAX_VALUE; + return !ReliableUtils.sequenceLessThan(sequence, (this.sequence - numEntries) & Packet.USHORT_MAX_VALUE); + } + + @SuppressWarnings("unchecked") + public T insert(int sequence) { + sequence &= Packet.USHORT_MAX_VALUE; + if (ReliableUtils.sequenceLessThan(sequence, (this.sequence - numEntries) & Packet.USHORT_MAX_VALUE)) { + return null; + } + + final int nextSequence = (sequence + 1) & Packet.USHORT_MAX_VALUE; + if (ReliableUtils.sequenceGreaterThan(nextSequence, this.sequence)) { + removeEntries(this.sequence, sequence); + this.sequence = nextSequence; + } + + int index = sequence % numEntries; + entrySequence[index] = sequence; + return (T) entryData[index]; + } + + public void remove(int sequence) { + sequence &= Packet.USHORT_MAX_VALUE; + entrySequence[sequence % numEntries] = INVALID_SEQUENCE; + } + + public boolean available(int sequence) { + sequence &= Packet.USHORT_MAX_VALUE; + return entrySequence[sequence % numEntries] == INVALID_SEQUENCE; + } + + public boolean exists(int sequence) { + sequence &= Packet.USHORT_MAX_VALUE; + return entrySequence[sequence % numEntries] == sequence; + } + + @SuppressWarnings("unchecked") + public T find(int sequence) { + sequence &= Packet.USHORT_MAX_VALUE; + int index = sequence % numEntries; + return entrySequence[index] == sequence ? (T) entryData[index] : null; + } + + @SuppressWarnings("unchecked") + public T atIndex(int index) { + return entrySequence[index] != INVALID_SEQUENCE ? (T) entryData[index] : null; + } + + public int generateAck() { + return (sequence - 1) & Packet.USHORT_MAX_VALUE; + } + + public int generateAckBits(int ack) { + ack &= Packet.USHORT_MAX_VALUE; + int ackBits = 0; + for (int i = 0, mask = 1; i < Integer.SIZE; i++, mask <<= 1) { + if (exists(ack - i)) ackBits |= mask; + } + return ackBits; + } +} diff --git a/server/netty/src/com/riiablo/net/reliable/TestClient.java b/server/netty/src/com/riiablo/net/reliable/TestClient.java new file mode 100644 index 00000000..3534e775 --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/TestClient.java @@ -0,0 +1,93 @@ +package com.riiablo.net.reliable; + +import com.google.flatbuffers.FlatBufferBuilder; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.nio.NioDatagramChannel; +import java.net.InetSocketAddress; + +import com.badlogic.gdx.Application; +import com.badlogic.gdx.ApplicationAdapter; +import com.badlogic.gdx.Gdx; +import com.badlogic.gdx.backends.headless.HeadlessApplication; +import com.badlogic.gdx.backends.headless.HeadlessApplicationConfiguration; + +import com.riiablo.codec.Animation; +import com.riiablo.net.packet.netty.Connection; +import com.riiablo.net.packet.netty.Netty; +import com.riiablo.net.packet.netty.NettyData; + +public class TestClient extends ApplicationAdapter implements ReliableEndpoint.PacketProcessor { + private static final String TAG = "Client"; + + public static void main(String[] args) throws Exception { + Thread.sleep(1000); + HeadlessApplicationConfiguration config = new HeadlessApplicationConfiguration(); + config.renderInterval = Animation.FRAME_DURATION; + new HeadlessApplication(new TestClient(), config); + } + + private ReliableEndpoint endpoint; + + @Override + public void create() { + Gdx.app.setLogLevel(Application.LOG_DEBUG); + + EventLoopGroup group = new NioEventLoopGroup(); + try { + Bootstrap b = new Bootstrap() + .group(group) + .channel(NioDatagramChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(DatagramChannel ch) { + endpoint = new ReliableEndpoint(ch, TestClient.this); + ch.pipeline() + .addLast(new ReliableChannelHandler(endpoint)) + .addLast(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + init(); + ctx.pipeline().remove(this); + } + + void init() { + InetSocketAddress remoteAddress = endpoint.remoteAddress(); + Gdx.app.log(TAG, "Sending Connection packet to " + remoteAddress.getHostString() + ":" + remoteAddress.getPort()); + + FlatBufferBuilder builder = new FlatBufferBuilder(); + Connection.startConnection(builder); + int dataOffset = Connection.endConnection(builder); + int offset = Netty.createNetty(builder, NettyData.Connection, dataOffset); + Netty.finishNettyBuffer(builder, offset); + + endpoint.sendMessage(QoS.Unreliable, builder.dataBuffer()); + } + }) + ; + } + }); + + ChannelFuture f = b.connect("localhost", TestServer.PORT); + f.channel().closeFuture().sync(); + } catch (Throwable t) { + Gdx.app.error(TAG, t.getMessage(), t); + } finally { + group.shutdownGracefully(); + } + } + + @Override + public void processPacket(int sequence, ByteBuf bb) { + Gdx.app.debug(TAG, "Processing packet..."); + Gdx.app.log(TAG, ByteBufUtil.hexDump(bb)); + } +} diff --git a/server/netty/src/com/riiablo/net/reliable/TestServer.java b/server/netty/src/com/riiablo/net/reliable/TestServer.java new file mode 100644 index 00000000..2a25ef75 --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/TestServer.java @@ -0,0 +1,81 @@ +package com.riiablo.net.reliable; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.nio.NioDatagramChannel; +import java.nio.ByteBuffer; + +import com.badlogic.gdx.Application; +import com.badlogic.gdx.ApplicationAdapter; +import com.badlogic.gdx.Gdx; +import com.badlogic.gdx.backends.headless.HeadlessApplication; +import com.badlogic.gdx.backends.headless.HeadlessApplicationConfiguration; + +import com.riiablo.codec.Animation; +import com.riiablo.net.packet.netty.Netty; +import com.riiablo.net.packet.netty.NettyData; + +public class TestServer extends ApplicationAdapter implements ReliableEndpoint.PacketProcessor { + private static final String TAG = "Server"; + + static final int PORT = 6114; + + public static void main(String[] args) { + HeadlessApplicationConfiguration config = new HeadlessApplicationConfiguration(); + config.renderInterval = Animation.FRAME_DURATION; + new HeadlessApplication(new TestServer(), config); + } + + private ReliableEndpoint endpoint; + + @Override + public void create() { + Gdx.app.setLogLevel(Application.LOG_DEBUG); + + EventLoopGroup group = new NioEventLoopGroup(); + try { + Bootstrap b = new Bootstrap() + .group(group) + .channel(NioDatagramChannel.class) + .option(ChannelOption.SO_BROADCAST, true) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(DatagramChannel ch) { + endpoint = new ReliableEndpoint(ch, TestServer.this); + ch.pipeline() + .addLast(new ReliableChannelHandler(endpoint)) + ; + } + }) + ; + + ChannelFuture f = b.bind(PORT).sync(); + f.channel().closeFuture().sync(); + } catch (Throwable t) { + Gdx.app.error(TAG, t.getMessage(), t); + } finally { + group.shutdownGracefully(); + } + } + + @Override + public void processPacket(int sequence, ByteBuf bb) { + Gdx.app.debug(TAG, "Processing packet..."); + Gdx.app.log(TAG, ByteBufUtil.hexDump(bb)); + + ByteBuffer nioBuffer = bb.nioBuffer(); + Netty netty = Netty.getRootAsNetty(nioBuffer); + + byte dataType = netty.dataType(); + if (0 <= dataType && dataType < NettyData.names.length) { + Gdx.app.debug(TAG, "dataType=" + NettyData.name(dataType)); + } + } +} diff --git a/server/netty/src/com/riiablo/net/reliable/channel/ReliableMessageChannel.java b/server/netty/src/com/riiablo/net/reliable/channel/ReliableMessageChannel.java new file mode 100644 index 00000000..31a89b29 --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/channel/ReliableMessageChannel.java @@ -0,0 +1,48 @@ +package com.riiablo.net.reliable.channel; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; + +import com.riiablo.net.reliable.MessageChannel; +import com.riiablo.net.reliable.ReliableConfiguration; + +public class ReliableMessageChannel extends MessageChannel { + private static final String TAG = "ReliableMessageChannel"; + + public ReliableMessageChannel(PacketTransceiver packetTransceiver) { + super(new ReliableConfiguration(), packetTransceiver); + } + + + @Override + public void reset() { + + } + + @Override + public void update(long time, DatagramChannel ch) { + + } + + @Override + public void sendMessage(int channelId, DatagramChannel ch, ByteBuf bb) { + + } + + @Override + public void onMessageReceived(ChannelHandlerContext ctx, DatagramPacket packet) { + + } + + @Override + public void onPacketTransmitted(ByteBuf bb) { + + } + + @Override + public void onPacketProcessed(int sequence, ByteBuf bb) { + + } +} diff --git a/server/netty/src/com/riiablo/net/reliable/channel/UnreliableMessageChannel.java b/server/netty/src/com/riiablo/net/reliable/channel/UnreliableMessageChannel.java new file mode 100644 index 00000000..fc8cc685 --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/channel/UnreliableMessageChannel.java @@ -0,0 +1,54 @@ +package com.riiablo.net.reliable.channel; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; + +import com.riiablo.net.reliable.Log; +import com.riiablo.net.reliable.MessageChannel; +import com.riiablo.net.reliable.ReliableConfiguration; + +public class UnreliableMessageChannel extends MessageChannel { + private static final String TAG = "UnreliableMessageChannel"; + + private static final boolean DEBUG = true; + private static final boolean DEBUG_SEND = DEBUG && true; + private static final boolean DEBUG_RECEIVE = DEBUG && true; + + public UnreliableMessageChannel(PacketTransceiver packetTransceiver) { + super(new ReliableConfiguration(), packetTransceiver); + } + + @Override + public void onPacketTransmitted(ByteBuf bb) { + } + + @Override + public void onPacketProcessed(int sequence, ByteBuf bb) { + if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + bb); + packetTransceiver.receivePacket(sequence, bb); + } + + @Override + public void reset() { + packetController.reset(); + } + + @Override + public void update(long time, DatagramChannel ch) { + packetController.update(time); + } + + @Override + public void sendMessage(int channelId, DatagramChannel ch, ByteBuf bb) { + if (DEBUG_SEND) Log.debug(TAG, "sendMessage " + bb); + packetController.sendPacket(channelId, ch, bb); + } + + @Override + public void onMessageReceived(ChannelHandlerContext ctx, DatagramPacket packet) { + if (DEBUG_RECEIVE) Log.debug(TAG, "onMessageReceived " + packet); + packetController.onPacketReceived(ctx, packet); + } +} diff --git a/server/netty/src/com/riiablo/net/reliable/channel/UnreliableOrderedMessageChannel.java b/server/netty/src/com/riiablo/net/reliable/channel/UnreliableOrderedMessageChannel.java new file mode 100644 index 00000000..0ff1786e --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/channel/UnreliableOrderedMessageChannel.java @@ -0,0 +1,54 @@ +package com.riiablo.net.reliable.channel; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; + +import com.riiablo.net.reliable.Log; +import com.riiablo.net.reliable.MessageChannel; +import com.riiablo.net.reliable.ReliableConfiguration; + +public class UnreliableOrderedMessageChannel extends MessageChannel { + private static final String TAG = "UnreliableOrderedMessageChannel"; + + private static final boolean DEBUG = true; + private static final boolean DEBUG_SEND = DEBUG && true; + private static final boolean DEBUG_RECEIVE = DEBUG && true; + + public UnreliableOrderedMessageChannel(PacketTransceiver packetTransceiver) { + super(new ReliableConfiguration(), packetTransceiver); + } + + @Override + public void reset() { + + } + + @Override + public void update(long time, DatagramChannel ch) { + + } + + @Override + public void sendMessage(int channelId, DatagramChannel ch, ByteBuf bb) { + if (DEBUG_SEND) Log.debug(TAG, "sendMessage " + bb); + packetController.sendPacket(channelId, ch, bb); + } + + @Override + public void onMessageReceived(ChannelHandlerContext ctx, DatagramPacket packet) { + if (DEBUG_RECEIVE) Log.debug(TAG, "onMessageReceived " + packet); + packetController.onPacketReceived(ctx, packet); + } + + @Override + public void onPacketTransmitted(ByteBuf bb) { + + } + + @Override + public void onPacketProcessed(int sequence, ByteBuf bb) { + + } +} diff --git a/server/netty/src/com/riiablo/net/reliable/data/FragmentReassemblyData.java b/server/netty/src/com/riiablo/net/reliable/data/FragmentReassemblyData.java new file mode 100644 index 00000000..ba56bbf0 --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/data/FragmentReassemblyData.java @@ -0,0 +1,18 @@ +package com.riiablo.net.reliable.data; + +import io.netty.buffer.ByteBuf; + +import com.artemis.utils.BitVector; + +public class FragmentReassemblyData { + public int sequence; + public int ack; + public int ackBits; + public int numFragmentsReceived; + public int numFragmentsTotal; + public ByteBuf dataBuffer; + public int packetBytes; + public int headerOffset; + + public final BitVector fragmentReceived = new BitVector(256); +} diff --git a/server/netty/src/com/riiablo/net/reliable/data/ReceivedPacketData.java b/server/netty/src/com/riiablo/net/reliable/data/ReceivedPacketData.java new file mode 100644 index 00000000..3d8f3306 --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/data/ReceivedPacketData.java @@ -0,0 +1,6 @@ +package com.riiablo.net.reliable.data; + +public class ReceivedPacketData { + public long time; + public int packetSize; +} diff --git a/server/netty/src/com/riiablo/net/reliable/data/SentPacketData.java b/server/netty/src/com/riiablo/net/reliable/data/SentPacketData.java new file mode 100644 index 00000000..1348ac87 --- /dev/null +++ b/server/netty/src/com/riiablo/net/reliable/data/SentPacketData.java @@ -0,0 +1,7 @@ +package com.riiablo.net.reliable.data; + +public class SentPacketData { + public long time; + public boolean acked; + public int packetSize; +}