diff --git a/server/netty/src/com/riiablo/net/reliable/ReliablePacketController.java b/server/netty/src/com/riiablo/net/reliable/ReliablePacketController.java index 30fb57c5..099f80e5 100644 --- a/server/netty/src/com/riiablo/net/reliable/ReliablePacketController.java +++ b/server/netty/src/com/riiablo/net/reliable/ReliablePacketController.java @@ -183,6 +183,7 @@ public class ReliablePacketController { return; } + channel.onPacketTransmitted(packet); ch.writeAndFlush(packet); } @@ -218,6 +219,7 @@ public class ReliablePacketController { .addComponent(true, header) .addComponent(true, bb); + channel.onPacketTransmitted(composite); ch.writeAndFlush(composite); return headerSize; } else { @@ -283,8 +285,7 @@ public class ReliablePacketController { if (DEBUG_RECEIVE) Log.debug(TAG, "acked packet %d", ackSequence); ReliableEndpoint.stats.NUM_PACKETS_ACKED++; sentPacketData.acked = true; - - // ack packet callback + channel.onAckProcessed(ackSequence); float rtt = (time - sentPacketData.time) * 1000f; if ((this.rtt == 0.0f && rtt > 0.0f) || MathUtils.isEqual(this.rtt, rtt, TOLERANCE)) { @@ -314,6 +315,7 @@ public class ReliablePacketController { public interface PacketListener { void onPacketTransmitted(ByteBuf bb); + void onAckProcessed(int sequence); void onPacketProcessed(int sequence, ByteBuf bb); } } diff --git a/server/netty/src/com/riiablo/net/reliable/channel/ReliableMessageChannel.java b/server/netty/src/com/riiablo/net/reliable/channel/ReliableMessageChannel.java index 49512ab3..50c37ec2 100644 --- a/server/netty/src/com/riiablo/net/reliable/channel/ReliableMessageChannel.java +++ b/server/netty/src/com/riiablo/net/reliable/channel/ReliableMessageChannel.java @@ -235,7 +235,9 @@ public class ReliableMessageChannel extends MessageChannel { packet.time = -1.0f; // ensure size for header - // Is this appending the header length? + // TODO: there appears to be extra code here outside of the spec that aggregates multiple + // messages together and prepends messageId and messageLength fields. Maybe this was done + // to group up smaller messages? Needs to be looked into more. // https://github.com/KillaMaaki/ReliableNetcode.NET/blob/c5a7339e2de70f52bfda2078f1bbdab2ec9a85c1/ReliableNetcode/MessageChannel.cs#L331-L393 packet.bb = bb; @@ -253,10 +255,45 @@ public class ReliableMessageChannel extends MessageChannel { } + @Override + public void onAckProcessed(int sequence) { + if (DEBUG_RECEIVE) Log.debug(TAG, "onAckProcessed " + sequence); + // first, map sequence to message IDs and ack them + OutgoingPacketSet outgoingPacket = ackBuffer.find(sequence); + if (outgoingPacket == null) return; + + // process messages + final int[] messageIds = outgoingPacket.messageIds.items; + for (int i = 0, s = outgoingPacket.messageIds.size; i < s; i++) { + // remove acked message from send buffer + int messageId = messageIds[i]; + if (sendBuffer.exists(messageId)) { + sendBuffer.find(messageId).writeLock = true; + sendBuffer.remove(messageId); + } + } + + // update oldest unacked message + boolean allAcked = true; + for (int seq = oldestUnacked; + seq == this.sequence || ReliableUtils.sequenceLessThan(seq, this.sequence); + seq = (seq + 1) & Packet.USHORT_MAX_VALUE) { + // if it's still in the send buffer, it hasn't been acked + if (sendBuffer.exists(seq)) { + oldestUnacked = seq; + allAcked = false; + break; + } + } + + if (allAcked) oldestUnacked = this.sequence; + } + @Override public void onPacketProcessed(int sequence, ByteBuf bb) { - if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + bb); + if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + sequence + " " + bb); packetTransceiver.receivePacket(bb); + // TODO: this is different from original function, see above note within #sendMessage } public static class BufferedPacket { diff --git a/server/netty/src/com/riiablo/net/reliable/channel/UnreliableMessageChannel.java b/server/netty/src/com/riiablo/net/reliable/channel/UnreliableMessageChannel.java index 6171310e..d8e34ac9 100644 --- a/server/netty/src/com/riiablo/net/reliable/channel/UnreliableMessageChannel.java +++ b/server/netty/src/com/riiablo/net/reliable/channel/UnreliableMessageChannel.java @@ -24,9 +24,14 @@ public class UnreliableMessageChannel extends MessageChannel { public void onPacketTransmitted(ByteBuf bb) { } + @Override + public void onAckProcessed(int sequence) { + if (DEBUG_RECEIVE) Log.debug(TAG, "onAckProcessed " + sequence); + } + @Override public void onPacketProcessed(int sequence, ByteBuf bb) { - if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + bb); + if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + sequence + " " + bb); packetTransceiver.receivePacket(bb); } diff --git a/server/netty/src/com/riiablo/net/reliable/channel/UnreliableOrderedMessageChannel.java b/server/netty/src/com/riiablo/net/reliable/channel/UnreliableOrderedMessageChannel.java index 4a519b47..ee303296 100644 --- a/server/netty/src/com/riiablo/net/reliable/channel/UnreliableOrderedMessageChannel.java +++ b/server/netty/src/com/riiablo/net/reliable/channel/UnreliableOrderedMessageChannel.java @@ -52,8 +52,14 @@ public class UnreliableOrderedMessageChannel extends MessageChannel { packetTransceiver.sendPacket(bb); } + @Override + public void onAckProcessed(int sequence) { + if (DEBUG_RECEIVE) Log.debug(TAG, "onAckProcessed " + sequence); + } + @Override public void onPacketProcessed(int sequence, ByteBuf bb) { + if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + sequence + " " + bb); if (sequence == nextSequence || ReliableUtils.sequenceGreaterThan(sequence, nextSequence)) { nextSequence = (sequence + 1) & Packet.USHORT_MAX_VALUE; packetTransceiver.receivePacket(bb);