Created com.riiablo.net.reliable and partial Java implementation of reliable.io UDP scheme #78

This commit is contained in:
Collin Smith 2020-06-21 19:41:08 -07:00
parent d13fb953d5
commit 092a6c23fe
2 changed files with 228 additions and 0 deletions

View File

@ -0,0 +1,211 @@
package com.riiablo.net.reliable;
import io.netty.buffer.ByteBuf;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.builder.ToStringBuilder;
public abstract class Packet {
static final int MAX_PACKET_HEADER_SIZE = 10;
static final int FRAGMENT_HEADER_SIZE = 6;
static final int SINGLE = 0;
static final int FRAGMENTED = 1 << 0;
static final int ACK_BYTE0 = 1 << 1;
static final int ACK_BYTE1 = 1 << 2;
static final int ACK_BYTE2 = 1 << 3;
static final int ACK_BYTE3 = 1 << 4;
static final int SEQ_DIFF = 1 << 5;
static final int ACK = 1 << 7;
static final int TYPE_MASK = SINGLE | FRAGMENTED;
static final int ACK_BYTE0_MASK = 0x000000FF;
static final int ACK_BYTE1_MASK = 0x0000FF00;
static final int ACK_BYTE2_MASK = 0x00FF0000;
static final int ACK_BYTE3_MASK = 0xFF000000;
public static boolean isSinglePacket(int prefixByte) {
return (prefixByte & TYPE_MASK) == SINGLE;
}
static int getAckBitByteFlags(int ackBits, int prefixByte) {
if ((ackBits & ACK_BYTE0_MASK) != ACK_BYTE0_MASK) prefixByte |= ACK_BYTE0;
if ((ackBits & ACK_BYTE1_MASK) != ACK_BYTE1_MASK) prefixByte |= ACK_BYTE1;
if ((ackBits & ACK_BYTE2_MASK) != ACK_BYTE2_MASK) prefixByte |= ACK_BYTE2;
if ((ackBits & ACK_BYTE3_MASK) != ACK_BYTE3_MASK) prefixByte |= ACK_BYTE3;
return prefixByte;
}
static void writeAckBitByteFlags(ByteBuf bb, int ackBits) {
if ((ackBits & ACK_BYTE0_MASK) != ACK_BYTE0_MASK) bb.writeByte(ackBits);
if ((ackBits & ACK_BYTE1_MASK) != ACK_BYTE1_MASK) bb.writeByte(ackBits >> 8);
if ((ackBits & ACK_BYTE2_MASK) != ACK_BYTE2_MASK) bb.writeByte(ackBits >> 16);
if ((ackBits & ACK_BYTE3_MASK) != ACK_BYTE3_MASK) bb.writeByte(ackBits >> 24);
}
public static Packet readHeader(ReliableConfig config, ByteBuf bb) {
Validate.isTrue(bb.readableBytes() >= 1, "buffer too small for packet header");
byte prefixByte = bb.readByte();
if (isSinglePacket(prefixByte)) {
return new SinglePacket().readHeader(config, bb, prefixByte);
} else {
return new FragmentedPacket().readHeader(config, bb, prefixByte);
}
}
abstract Packet readHeader(ReliableConfig config, ByteBuf bb, final byte prefixByte);
static class SinglePacket extends Packet {
public int channelId;
public int sequence;
public int ack;
public int ackBits;
int headerSize;
@Override
public String toString() {
return new ToStringBuilder(this)
.append("channelId", channelId)
.append("sequence", sequence)
.append("ack", ack)
.append("ackBits", String.format("%08x", ackBits))
.append("headerSize", headerSize)
.build();
}
@Override
SinglePacket readHeader(ReliableConfig config, ByteBuf bb, byte prefixByte) {
Validate.isTrue((prefixByte & TYPE_MASK) == SINGLE, "packet header not flagged as single packet");
int startIndex = bb.readerIndex();
Validate.isTrue(bb.readableBytes() >= 3, "buffer too small for packet header (1)");
channelId = bb.readUnsignedByte();
// ack packets don't have sequence numbers
sequence = (prefixByte & ACK) == ACK ? 0 : bb.readUnsignedShortLE();
if ((prefixByte & SEQ_DIFF) == SEQ_DIFF) {
Validate.isTrue(bb.readableBytes() >= 1, "buffer too small for packet header (2)");
int sequenceDiff = bb.readUnsignedByte();
ack = (short)(sequence - sequenceDiff);
} else {
Validate.isTrue(bb.readableBytes() >= 2, "buffer too small for packet header (3)");
ack = bb.readUnsignedShortLE();
}
int expectedBytes = 0;
for (int i = ACK_BYTE0; i <= ACK_BYTE3; i <<= 1) {
if ((prefixByte & i) == i) expectedBytes++;
}
Validate.isTrue(bb.readableBytes() >= expectedBytes, "buffer too small for packet header (4)");
ackBits = 0xFFFFFFFF;
if ((prefixByte & ACK_BYTE0) == ACK_BYTE0) {
ackBits &= ~ACK_BYTE0_MASK;
ackBits |= bb.readByte();
}
if ((prefixByte & ACK_BYTE1) == ACK_BYTE1) {
ackBits &= ~ACK_BYTE1_MASK;
ackBits |= (bb.readByte() << 8);
}
if ((prefixByte & ACK_BYTE2) == ACK_BYTE2) {
ackBits &= ~ACK_BYTE2_MASK;
ackBits |= (bb.readByte() << 16);
}
if ((prefixByte & ACK_BYTE3) == ACK_BYTE3) {
ackBits &= ~ACK_BYTE3_MASK;
ackBits |= (bb.readByte() << 24);
}
headerSize = bb.readerIndex() - startIndex + 1; // include prefixByte
return this;
}
static void writeHeader(ByteBuf bb, int channelId, int sequence, int ack, int ackBits) {
int prefixByte = getAckBitByteFlags(ackBits, 0);
int sequenceDiff = sequence - ack;
if (sequenceDiff < 0) sequenceDiff += 0xFFFF;
if (sequenceDiff <= 0xFF) prefixByte |= SEQ_DIFF;
bb.writeByte(prefixByte);
bb.writeByte(channelId);
bb.writeShortLE(sequence);
if (sequenceDiff <= 0xFF) {
bb.writeByte(sequenceDiff);
} else {
bb.writeShortLE(ack);
}
writeAckBitByteFlags(bb, ackBits);
}
}
static class FragmentedPacket extends Packet {
public int channelId;
public int sequence;
public int ack;
public int ackBits;
public int fragmentId;
public int numFragments;
@Override
public String toString() {
return new ToStringBuilder(this)
.append("channelId", channelId)
.append("sequence", sequence)
.append("ack", ack)
.append("ackBits", String.format("%08x", ackBits))
.append("fragmentId", fragmentId)
.append("numFragments", numFragments)
.build();
}
@Override
FragmentedPacket readHeader(ReliableConfig config, ByteBuf bb, final byte prefixByte) {
Validate.isTrue((prefixByte & TYPE_MASK) == FRAGMENTED, "packet header not flagged as fragmented packet");
Validate.isTrue(bb.readableBytes() >= FRAGMENT_HEADER_SIZE, "buffer too small for fragment header");
channelId = bb.readUnsignedByte();
sequence = bb.readUnsignedShortLE();
fragmentId = bb.readUnsignedByte();
numFragments = bb.readUnsignedByte() + 1;
Validate.isTrue(numFragments <= config.maxFragments, "num fragments %d outside of range of max fragments %d", numFragments, config.maxFragments);
Validate.isTrue(fragmentId < numFragments, "fragment id %d outside of range of num fragments %d", fragmentId, numFragments);
if (fragmentId == 0) {
Validate.isTrue(bb.readableBytes() >= 1, "buffer too small for packet header");
SinglePacket packetHeader = new SinglePacket().readHeader(config, bb, bb.readByte());
Validate.isTrue(packetHeader.sequence == sequence, "bad packet sequence in fragment. expected %d, got %d", sequence, packetHeader.sequence);
ack = packetHeader.ack;
ackBits = packetHeader.ackBits;
} else {
ack = 0;
ackBits = 0;
}
// TODO: validate fragmentBytes <= fragmentSize
// TODO: validate size of fragmentId == fragmentSize
return this;
}
}
static class AckPacket {
static void writeHeader(ByteBuf bb, int channelId, int ack, int ackBits) {
int prefixByte = getAckBitByteFlags(ackBits, ACK);
bb.writeByte(prefixByte);
bb.writeByte(channelId);
bb.writeShortLE(ack);
writeAckBitByteFlags(bb, ackBits);
}
}
}

View File

@ -0,0 +1,17 @@
package com.riiablo.net.reliable;
public class ReliableConfig {
public static final ReliableConfig DEFAULT_CONFIG = new ReliableConfig();
public int maxPacketSize = 16384;
public int fragmentThreshold = 1024;
public int maxFragments = 16;
public int fragmentSize = 1024;
public int sentPacketBufferSize = 256;
public int receivedPacketBufferSize = 256;
public int fragmentReassemblyBufferSize = 64;
public float rttSmoothingFactor = 0.25f;
public float packetLossSmoothingFactor = 0.1f;
public float bandwidthSmoothingFactor = 0.1f;
public int packetHeaderSize = 28;
}