Improved support for reliable UDP

Implemented PacketListener#onPacketTransmitted callback for sending messages and acks
Created PacketListener#onAckProcessed callback for received acks
Documented some unknown code related to copying outgoing bytebuf and prepending some data to it
Todo: add support for fragmented messages
This commit is contained in:
Collin Smith
2020-06-25 01:11:24 -07:00
parent 7b3827823d
commit cf1fea539f
4 changed files with 55 additions and 5 deletions

View File

@ -183,6 +183,7 @@ public class ReliablePacketController {
return;
}
channel.onPacketTransmitted(packet);
ch.writeAndFlush(packet);
}
@ -218,6 +219,7 @@ public class ReliablePacketController {
.addComponent(true, header)
.addComponent(true, bb);
channel.onPacketTransmitted(composite);
ch.writeAndFlush(composite);
return headerSize;
} else {
@ -283,8 +285,7 @@ public class ReliablePacketController {
if (DEBUG_RECEIVE) Log.debug(TAG, "acked packet %d", ackSequence);
ReliableEndpoint.stats.NUM_PACKETS_ACKED++;
sentPacketData.acked = true;
// ack packet callback
channel.onAckProcessed(ackSequence);
float rtt = (time - sentPacketData.time) * 1000f;
if ((this.rtt == 0.0f && rtt > 0.0f) || MathUtils.isEqual(this.rtt, rtt, TOLERANCE)) {
@ -314,6 +315,7 @@ public class ReliablePacketController {
public interface PacketListener {
void onPacketTransmitted(ByteBuf bb);
void onAckProcessed(int sequence);
void onPacketProcessed(int sequence, ByteBuf bb);
}
}

View File

@ -235,7 +235,9 @@ public class ReliableMessageChannel extends MessageChannel {
packet.time = -1.0f;
// ensure size for header
// Is this appending the header length?
// TODO: there appears to be extra code here outside of the spec that aggregates multiple
// messages together and prepends messageId and messageLength fields. Maybe this was done
// to group up smaller messages? Needs to be looked into more.
// https://github.com/KillaMaaki/ReliableNetcode.NET/blob/c5a7339e2de70f52bfda2078f1bbdab2ec9a85c1/ReliableNetcode/MessageChannel.cs#L331-L393
packet.bb = bb;
@ -253,10 +255,45 @@ public class ReliableMessageChannel extends MessageChannel {
}
@Override
public void onAckProcessed(int sequence) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onAckProcessed " + sequence);
// first, map sequence to message IDs and ack them
OutgoingPacketSet outgoingPacket = ackBuffer.find(sequence);
if (outgoingPacket == null) return;
// process messages
final int[] messageIds = outgoingPacket.messageIds.items;
for (int i = 0, s = outgoingPacket.messageIds.size; i < s; i++) {
// remove acked message from send buffer
int messageId = messageIds[i];
if (sendBuffer.exists(messageId)) {
sendBuffer.find(messageId).writeLock = true;
sendBuffer.remove(messageId);
}
}
// update oldest unacked message
boolean allAcked = true;
for (int seq = oldestUnacked;
seq == this.sequence || ReliableUtils.sequenceLessThan(seq, this.sequence);
seq = (seq + 1) & Packet.USHORT_MAX_VALUE) {
// if it's still in the send buffer, it hasn't been acked
if (sendBuffer.exists(seq)) {
oldestUnacked = seq;
allAcked = false;
break;
}
}
if (allAcked) oldestUnacked = this.sequence;
}
@Override
public void onPacketProcessed(int sequence, ByteBuf bb) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + bb);
if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + sequence + " " + bb);
packetTransceiver.receivePacket(bb);
// TODO: this is different from original function, see above note within #sendMessage
}
public static class BufferedPacket {

View File

@ -24,9 +24,14 @@ public class UnreliableMessageChannel extends MessageChannel {
public void onPacketTransmitted(ByteBuf bb) {
}
@Override
public void onAckProcessed(int sequence) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onAckProcessed " + sequence);
}
@Override
public void onPacketProcessed(int sequence, ByteBuf bb) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + bb);
if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + sequence + " " + bb);
packetTransceiver.receivePacket(bb);
}

View File

@ -52,8 +52,14 @@ public class UnreliableOrderedMessageChannel extends MessageChannel {
packetTransceiver.sendPacket(bb);
}
@Override
public void onAckProcessed(int sequence) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onAckProcessed " + sequence);
}
@Override
public void onPacketProcessed(int sequence, ByteBuf bb) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + sequence + " " + bb);
if (sequence == nextSequence || ReliableUtils.sequenceGreaterThan(sequence, nextSequence)) {
nextSequence = (sequence + 1) & Packet.USHORT_MAX_VALUE;
packetTransceiver.receivePacket(bb);