diff --git a/server/netty/src/com/riiablo/net/reliable/MessageChannel.java b/server/netty/src/com/riiablo/net/reliable/MessageChannel.java index 55c999a3..f9b56cb2 100644 --- a/server/netty/src/com/riiablo/net/reliable/MessageChannel.java +++ b/server/netty/src/com/riiablo/net/reliable/MessageChannel.java @@ -19,7 +19,7 @@ public abstract class MessageChannel implements ReliablePacketController.PacketL } public abstract void reset(); - public abstract void update(float delta, DatagramChannel ch); + public abstract void update(float delta, int channelId, DatagramChannel ch); public abstract void sendMessage(int channelId, DatagramChannel ch, ByteBuf bb); public abstract void onMessageReceived(ChannelHandlerContext ctx, DatagramPacket packet); diff --git a/server/netty/src/com/riiablo/net/reliable/Packet.java b/server/netty/src/com/riiablo/net/reliable/Packet.java index c6513e77..3382df4c 100644 --- a/server/netty/src/com/riiablo/net/reliable/Packet.java +++ b/server/netty/src/com/riiablo/net/reliable/Packet.java @@ -11,8 +11,8 @@ public class Packet { public static final int USHORT_MAX_VALUE = 0xFFFF; - static final int MAX_PACKET_HEADER_SIZE = 10; - static final int FRAGMENT_HEADER_SIZE = 6; + public static final int MAX_PACKET_HEADER_SIZE = 10; + public static final int FRAGMENT_HEADER_SIZE = 6; static final int SINGLE = 0; static final int FRAGMENTED = 1 << 0; diff --git a/server/netty/src/com/riiablo/net/reliable/ReliableEndpoint.java b/server/netty/src/com/riiablo/net/reliable/ReliableEndpoint.java index ceafbb5d..e232fdf2 100644 --- a/server/netty/src/com/riiablo/net/reliable/ReliableEndpoint.java +++ b/server/netty/src/com/riiablo/net/reliable/ReliableEndpoint.java @@ -54,12 +54,18 @@ public class ReliableEndpoint implements Endpoint, MessageChanne @Override public void reset() { - for (MessageChannel mc : channels) if (mc != null) mc.reset(); + final MessageChannel[] channels = this.channels; + for (int i = 0, s = channels.length; i < s; i++) { + channels[i].reset(); + } } @Override public void update(float delta) { - for (MessageChannel mc : channels) if (mc != null) mc.update(delta, channel); + final MessageChannel[] channels = this.channels; + for (int i = 0, s = channels.length; i < s; i++) { + channels[i].update(delta, i, channel); + } } @Override diff --git a/server/netty/src/com/riiablo/net/reliable/ReliablePacketController.java b/server/netty/src/com/riiablo/net/reliable/ReliablePacketController.java index 117f72ac..9a41e5d9 100644 --- a/server/netty/src/com/riiablo/net/reliable/ReliablePacketController.java +++ b/server/netty/src/com/riiablo/net/reliable/ReliablePacketController.java @@ -5,6 +5,8 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramPacket; +import com.badlogic.gdx.math.MathUtils; + import com.riiablo.net.reliable.data.FragmentReassemblyData; import com.riiablo.net.reliable.data.ReceivedPacketData; import com.riiablo.net.reliable.data.SentPacketData; @@ -16,6 +18,8 @@ public class ReliablePacketController { private static final boolean DEBUG_SEND = DEBUG && true; private static final boolean DEBUG_RECEIVE = DEBUG && true; + private static final float TOLERANCE = 0.00001f; + private final ReliableConfiguration config; private final MessageChannel channel; @@ -23,7 +27,12 @@ public class ReliablePacketController { private final SequenceBuffer receivedPackets; private final SequenceBuffer fragmentReassembly; - private long time; + private float time; + private float rtt; + private float packetLoss; + private float sentBandwidth; + private float receivedBandwidth; + private float ackedBandwidth; public ReliablePacketController(ReliableConfiguration config, MessageChannel channel) { this.config = config; @@ -42,15 +51,132 @@ public class ReliablePacketController { return channel.sequence = (channel.sequence + 1) & Packet.USHORT_MAX_VALUE; } - public void reset() { + public float rtt() { + return rtt; + } + public void reset() { + channel.sequence = 0; + for (int i = 0, s = config.fragmentReassemblyBufferSize; i < s; i++) { + FragmentReassemblyData reassemblyData = fragmentReassembly.atIndex(i); + if (reassemblyData != null) reassemblyData.dataBuffer.clear(); + } + + sentPackets.reset(); + receivedPackets.reset(); + fragmentReassembly.reset(); } public void update(float delta) { - this.time = time; + time += delta; + updatePacketLoss(); + updateSentBandwidth(); + updateReceivedBandwidth(); + updateAckedBandwidth(); + } + + private void updatePacketLoss() { + int baseSequence = (sentPackets.getSequence() - config.sentPacketBufferSize + 1 + Packet.USHORT_MAX_VALUE) & Packet.USHORT_MAX_VALUE; + + int numDropped = 0; + int numSamples = config.sentPacketBufferSize / 2; + for (int i = 0; i < numSamples; i++) { + int sequence = (baseSequence + i) & Packet.USHORT_MAX_VALUE; + SentPacketData sentPacketData = sentPackets.find(sequence); + if (sentPacketData != null && !sentPacketData.acked) numDropped++; + } + + float packetLoss = numDropped / (float) numSamples; + if (MathUtils.isEqual(this.packetLoss, packetLoss, TOLERANCE)) { + this.packetLoss += (packetLoss - this.packetLoss) * config.packetLossSmoothingFactor; + } else { + this.packetLoss = packetLoss; + } + } + + private void updateSentBandwidth() { + int baseSequence = (sentPackets.getSequence() - config.sentPacketBufferSize + 1 + Packet.USHORT_MAX_VALUE) & Packet.USHORT_MAX_VALUE; + + int bytesSent = 0; + float startTime = Float.MAX_VALUE; + float finishTime = 0f; + int numSamples = config.sentPacketBufferSize / 2; + for (int i = 0; i < numSamples; i++) { + int sequence = (baseSequence + i) & Packet.USHORT_MAX_VALUE; + SentPacketData sentPacketData = sentPackets.find(sequence); + if (sentPacketData == null) continue; + bytesSent += sentPacketData.packetSize; + startTime = Math.min(startTime, sentPacketData.time); + finishTime = Math.max(finishTime, sentPacketData.time); + } + + if (startTime != Float.MAX_VALUE && finishTime != 0f) { + float sentBandwidth = bytesSent / (finishTime - startTime) * 8f / 1000f; + if (MathUtils.isEqual(this.sentBandwidth, sentBandwidth, TOLERANCE)) { + this.sentBandwidth += (sentBandwidth - this.sentBandwidth) * config.bandwidthSmoothingFactor; + } else { + this.sentBandwidth = sentBandwidth; + } + } + } + + private void updateReceivedBandwidth() { + synchronized (receivedPackets) { + int baseSequence = (receivedPackets.getSequence() - config.receivedPacketBufferSize + 1 + Packet.USHORT_MAX_VALUE) & Packet.USHORT_MAX_VALUE; + + int bytesReceived = 0; + float startTime = Float.MAX_VALUE; + float finishTime = 0f; + int numSamples = config.receivedPacketBufferSize / 2; + for (int i = 0; i < numSamples; i++) { + int sequence = (baseSequence + i) & Packet.USHORT_MAX_VALUE; + ReceivedPacketData receivedPacketData = receivedPackets.find(sequence); + if (receivedPacketData == null) continue; + bytesReceived += receivedPacketData.packetSize; + startTime = Math.min(startTime, receivedPacketData.time); + finishTime = Math.max(finishTime, receivedPacketData.time); + } + + if (startTime != Float.MAX_VALUE && finishTime != 0f) { + float receivedBandwidth = bytesReceived / (finishTime - startTime) * 8f / 1000f; + if (MathUtils.isEqual(this.receivedBandwidth, receivedBandwidth, TOLERANCE)) { + this.receivedBandwidth += (receivedBandwidth - this.receivedBandwidth) * config.bandwidthSmoothingFactor; + } else { + this.receivedBandwidth = receivedBandwidth; + } + } + } + } + + private void updateAckedBandwidth() { + int baseSequence = (sentPackets.getSequence() - config.sentPacketBufferSize + 1 + Packet.USHORT_MAX_VALUE) & Packet.USHORT_MAX_VALUE; + + int bytesSent = 0; + float startTime = Float.MAX_VALUE; + float finishTime = 0f; + int numSamples = config.sentPacketBufferSize / 2; + for (int i = 0; i < numSamples; i++) { + int sequence = (baseSequence + i) & Packet.USHORT_MAX_VALUE; + SentPacketData sentPacketData = sentPackets.find(sequence); + if (sentPacketData == null || !sentPacketData.acked) continue; + bytesSent += sentPacketData.packetSize; + startTime = Math.min(startTime, sentPacketData.time); + finishTime = Math.max(finishTime, sentPacketData.time); + } + + if (startTime != Float.MAX_VALUE && finishTime != 0f) { + float ackedBandwidth = bytesSent / (finishTime - startTime) * 8f / 1000f; + if (MathUtils.isEqual(this.ackedBandwidth, ackedBandwidth, TOLERANCE)) { + this.ackedBandwidth += (ackedBandwidth - this.ackedBandwidth) * config.bandwidthSmoothingFactor; + } else { + this.ackedBandwidth = ackedBandwidth; + } + } } public void sendAck(int channelId, DatagramChannel ch) { + if (DEBUG_SEND) Log.debug(TAG, "sendAck"); + int ack, ackBits; synchronized (receivedPackets) { ack = receivedPackets.generateAck(); @@ -87,7 +213,7 @@ public class ReliablePacketController { SentPacketData sentPacketData = sentPackets.insert(sequence); sentPacketData.time = this.time; -// sentPacketData.packetSize = + sentPacketData.packetSize = packetSize; sentPacketData.acked = false; if (packetSize <= config.fragmentThreshold) { @@ -165,8 +291,15 @@ public class ReliablePacketController { if (DEBUG_RECEIVE) Log.debug(TAG, "acked packet %d", ackSequence); ReliableEndpoint.stats.NUM_PACKETS_ACKED++; sentPacketData.acked = true; + // ack packet callback - // TODO: rtt + + float rtt = (time - sentPacketData.time) * 1000f; + if ((this.rtt == 0.0f && rtt > 0.0f) || MathUtils.isEqual(this.rtt, rtt, TOLERANCE)) { + this.rtt = rtt; + } else { + this.rtt += (rtt - this.rtt) * config.rttSmoothingFactor; + } } } } diff --git a/server/netty/src/com/riiablo/net/reliable/SequenceBuffer.java b/server/netty/src/com/riiablo/net/reliable/SequenceBuffer.java index 6827b2ae..63557d78 100644 --- a/server/netty/src/com/riiablo/net/reliable/SequenceBuffer.java +++ b/server/netty/src/com/riiablo/net/reliable/SequenceBuffer.java @@ -8,7 +8,7 @@ public class SequenceBuffer { private int sequence; - private final int numEntries; + public final int numEntries; private final int entrySequence[]; private final Object entryData[]; @@ -26,6 +26,10 @@ public class SequenceBuffer { sequence = 0; } + public int getSequence() { + return sequence; + } + public void reset() { sequence = 0; Arrays.fill(entrySequence, INVALID_SEQUENCE); diff --git a/server/netty/src/com/riiablo/net/reliable/channel/ReliableMessageChannel.java b/server/netty/src/com/riiablo/net/reliable/channel/ReliableMessageChannel.java index 22ec2c71..80416159 100644 --- a/server/netty/src/com/riiablo/net/reliable/channel/ReliableMessageChannel.java +++ b/server/netty/src/com/riiablo/net/reliable/channel/ReliableMessageChannel.java @@ -1,39 +1,228 @@ package com.riiablo.net.reliable.channel; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramPacket; +import com.badlogic.gdx.math.MathUtils; +import com.badlogic.gdx.utils.IntArray; +import com.badlogic.gdx.utils.Queue; + +import com.riiablo.net.reliable.Log; import com.riiablo.net.reliable.MessageChannel; +import com.riiablo.net.reliable.Packet; import com.riiablo.net.reliable.ReliableConfiguration; +import com.riiablo.net.reliable.ReliableUtils; +import com.riiablo.net.reliable.SequenceBuffer; public class ReliableMessageChannel extends MessageChannel { private static final String TAG = "ReliableMessageChannel"; + private static final boolean DEBUG = true; + private static final boolean DEBUG_SEND = DEBUG && true; + private static final boolean DEBUG_RECEIVE = DEBUG && true; + + private final ByteBuf packetBuffer = Unpooled.buffer(); + private final SequenceBuffer sendBuffer; + private final SequenceBuffer receiveBuffer; + private final SequenceBuffer ackBuffer; + + private final Queue messageQueue = new Queue<>(64, ByteBuf.class); + private final IntArray outgoingMessageIds = new IntArray(256); + + private float time; + private float lastBufferFlush; + private float lastMessageSend; + + private int oldestUnacked; +// private int sequence; // hides MessageChannel#sequence + private int nextReceive; + + private boolean congestionControl = false; + private float congestionDisableTimer; + private float congestionDisableInterval; + private float lastCongestionSwitchTime; + public ReliableMessageChannel(PacketTransceiver packetTransceiver) { super(new ReliableConfiguration(), packetTransceiver); + + this.sendBuffer = new SequenceBuffer<>(BufferedPacket.class, 256); + this.receiveBuffer = new SequenceBuffer<>(BufferedPacket.class, 256); + this.ackBuffer = new SequenceBuffer<>(OutgoingPacketSet.class, 256); + + time = 0.0f; + lastBufferFlush = -1.0f; + lastMessageSend = 0.0f; + + congestionDisableInterval = 5.0f; + + sequence = 0; + nextReceive = 0; + oldestUnacked = 0; } @Override public void reset() { + packetController.reset(); + sendBuffer.reset(); +// receiveBuffer.reset(); // this isn't in the original code? why? + ackBuffer.reset(); + + lastBufferFlush = -1.0f; + lastMessageSend = 0.0f; + + congestionControl = false; + lastCongestionSwitchTime = 0.0f; + congestionDisableTimer = 0.0f; + congestionDisableInterval = 5.0f; + + sequence = 0; + nextReceive = 0; + oldestUnacked = 0; } @Override - public void update(float delta, DatagramChannel ch) { + public void update(float delta, int channelId, DatagramChannel ch) { + packetController.update(delta); + time += delta; + + // see if we can pop messages off of the message queue and put them into the send queue + updateQueue(channelId, ch); + updateCongestion(delta, channelId, ch); + } + + private void updateQueue(int channelId, DatagramChannel ch) { + if (messageQueue.size > 0) { + int sendBufferSize = 0; + for (int seq = oldestUnacked; ReliableUtils.sequenceLessThan(seq, sequence); seq = (seq + 1) & Packet.USHORT_MAX_VALUE) { + if (sendBuffer.exists(seq)) sendBufferSize++; + } + + if (sendBufferSize < sendBuffer.numEntries) { + ByteBuf packetData = messageQueue.removeFirst(); + sendMessage(channelId, ch, packetData); + } + } + } + + private void updateCongestion(float delta, int channelId, DatagramChannel ch) { + boolean conditionsBad = packetController.rtt() >= 250.0f; // 250ms + + // if conditions are bad, immediately enable congestion control and reset the congestion timer + if (conditionsBad) { + if (!congestionControl) { + // if we're within 10 seconds of the last time we switched, double the threshold interval + if (time - lastCongestionSwitchTime < 10.0) { + congestionDisableInterval = Math.min(congestionDisableInterval * 2, 60.0f); + } + + lastCongestionSwitchTime = time; + } + + congestionControl = true; + congestionDisableTimer = 0.0f; + } + + // if we're in bad mode, and conditions are good, update the timer and see if we can disable + // congestion control + if (congestionControl && !conditionsBad) { + congestionDisableTimer += delta; + if (congestionDisableTimer >= congestionDisableInterval) { + congestionControl = false; + lastCongestionSwitchTime = time; + congestionDisableTimer = 0.0f; + } + } + + // as long as conditions are good, halve the threshold interval every 10 seconds + if (!congestionControl) { + congestionDisableTimer += delta; + if (congestionDisableTimer > 10.0f) { + congestionDisableInterval = Math.max(congestionDisableInterval * 0.5f, 5.0f); + } + } + + // if we're in congestion control mode, only send packets 10 times per second. otherwise, send + // 30 times per second + float flushInterval = congestionControl ? (1.0f / 10) : (1.0f / 30); + if (time - lastBufferFlush >= flushInterval) { + lastBufferFlush = time; + processSendBuffer(channelId, ch); + } + } + + private void processSendBuffer(int channelId, DatagramChannel ch) { +// int numUnacked = 0; +// for (int seq = oldestUnacked; ReliableUtils.sequenceLessThan(seq, sequence); seq = (seq + 1) & Packet.USHORT_MAX_VALUE) { +// numUnacked++; +// } + + for (int seq = oldestUnacked; ReliableUtils.sequenceLessThan(seq, sequence); seq = (seq + 1) & Packet.USHORT_MAX_VALUE) { + // never send message ID >= (oldestUnacked + bufferSize) + if (seq >= (oldestUnacked + 256)) break; + + // for any message that hasn't been sent in the last 0.1 seconds and fits in the available + // space of our message packer, add it + BufferedPacket packet = sendBuffer.find(seq); + if (packet != null && !packet.writeLock) { + if (MathUtils.isEqual(time, packet.time, 0.1f)) continue; + boolean packetFits = false; + int packetSize = packetBuffer.readableBytes() + packet.bb.readableBytes(); + if (packet.bb.readableBytes() < config.fragmentThreshold) { + packetFits = packetSize <= (config.fragmentThreshold - Packet.MAX_PACKET_HEADER_SIZE); + } else { + packetFits = packetSize <= (config.maxPacketSize - Packet.FRAGMENT_HEADER_SIZE - Packet.MAX_PACKET_HEADER_SIZE); + } + + // if the packet won't fit, flush the message packet + if (!packetFits) { + flushPacketBuffer(channelId, ch); + } + + packet.time = time; + packetBuffer.writeBytes(packet.bb); + outgoingMessageIds.add(seq); + lastMessageSend = time; + } + } + + // if it has been 0.1 seconds since the last time we sent a message, send an empty message + if (time - lastMessageSend >= 0.1f) { + packetController.sendAck(channelId, ch); + lastMessageSend = time; + } + + // flush and remaining messages in the packet buffer + flushPacketBuffer(channelId, ch); + } + + private void flushPacketBuffer(int channelId, DatagramChannel ch) { + if (packetBuffer.readableBytes() > 0) { + int outgoingSeq = packetController.sendPacket(channelId, ch, packetBuffer); + OutgoingPacketSet outgoingPacket = ackBuffer.insert(outgoingSeq); + + // store message IDs so we can map packet-level acks to message ID acks + outgoingPacket.messageIds.clear(); + outgoingPacket.messageIds.addAll(outgoingMessageIds); + + packetBuffer.clear(); + outgoingMessageIds.clear(); + } } @Override public void sendMessage(int channelId, DatagramChannel ch, ByteBuf bb) { - + if (DEBUG_SEND) Log.debug(TAG, "sendMessage " + bb); } @Override public void onMessageReceived(ChannelHandlerContext ctx, DatagramPacket packet) { - + if (DEBUG_SEND) Log.debug(TAG, "onMessageReceived " + packet); } @Override @@ -45,4 +234,14 @@ public class ReliableMessageChannel extends MessageChannel { public void onPacketProcessed(int sequence, ByteBuf bb) { } + + public static class BufferedPacket { + boolean writeLock = true; + float time; + ByteBuf bb; + } + + public static class OutgoingPacketSet { + final IntArray messageIds = new IntArray(); + } } diff --git a/server/netty/src/com/riiablo/net/reliable/channel/UnreliableMessageChannel.java b/server/netty/src/com/riiablo/net/reliable/channel/UnreliableMessageChannel.java index 04159fc0..6171310e 100644 --- a/server/netty/src/com/riiablo/net/reliable/channel/UnreliableMessageChannel.java +++ b/server/netty/src/com/riiablo/net/reliable/channel/UnreliableMessageChannel.java @@ -36,7 +36,7 @@ public class UnreliableMessageChannel extends MessageChannel { } @Override - public void update(float delta, DatagramChannel ch) { + public void update(float delta, int channelId, DatagramChannel ch) { packetController.update(delta); } diff --git a/server/netty/src/com/riiablo/net/reliable/channel/UnreliableOrderedMessageChannel.java b/server/netty/src/com/riiablo/net/reliable/channel/UnreliableOrderedMessageChannel.java index e4372acc..4a519b47 100644 --- a/server/netty/src/com/riiablo/net/reliable/channel/UnreliableOrderedMessageChannel.java +++ b/server/netty/src/com/riiablo/net/reliable/channel/UnreliableOrderedMessageChannel.java @@ -31,7 +31,7 @@ public class UnreliableOrderedMessageChannel extends MessageChannel { } @Override - public void update(float delta, DatagramChannel ch) { + public void update(float delta, int channelId, DatagramChannel ch) { packetController.update(delta); } diff --git a/server/netty/src/com/riiablo/net/reliable/data/ReceivedPacketData.java b/server/netty/src/com/riiablo/net/reliable/data/ReceivedPacketData.java index 3d8f3306..ee8870c3 100644 --- a/server/netty/src/com/riiablo/net/reliable/data/ReceivedPacketData.java +++ b/server/netty/src/com/riiablo/net/reliable/data/ReceivedPacketData.java @@ -1,6 +1,6 @@ package com.riiablo.net.reliable.data; public class ReceivedPacketData { - public long time; + public float time; public int packetSize; } diff --git a/server/netty/src/com/riiablo/net/reliable/data/SentPacketData.java b/server/netty/src/com/riiablo/net/reliable/data/SentPacketData.java index 1348ac87..1ef77760 100644 --- a/server/netty/src/com/riiablo/net/reliable/data/SentPacketData.java +++ b/server/netty/src/com/riiablo/net/reliable/data/SentPacketData.java @@ -1,7 +1,7 @@ package com.riiablo.net.reliable.data; public class SentPacketData { - public long time; + public float time; public boolean acked; public int packetSize; }