Added SocketAddress from parameter to packet callbacks

from paramter will be either TCP channel remote address or UDP packet sender
This commit is contained in:
Collin Smith
2020-06-27 15:58:25 -07:00
parent 5f81d4d53a
commit 03ae8f103c
17 changed files with 60 additions and 36 deletions

View File

@ -1,9 +1,11 @@
package com.riiablo.net;
import io.netty.channel.ChannelHandlerContext;
import java.net.SocketAddress;
public interface Endpoint<T> extends PacketSender<Object> {
void reset();
void update(float delta);
void messageReceived(ChannelHandlerContext ctx, T msg);
void messageReceived(ChannelHandlerContext ctx, SocketAddress from, T msg);
SocketAddress getRemoteAddress(ChannelHandlerContext ctx, T msg);
}

View File

@ -7,7 +7,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.TypeParameterMatcher;
import java.net.InetSocketAddress;
@ -36,11 +35,11 @@ public class EndpointedChannelHandler<T> implements ChannelHandler, ChannelInbou
}
protected void messageReceived(ChannelHandlerContext ctx, T msg) throws Exception {
SocketAddress sender = endpoint.getRemoteAddress(ctx, msg);
if (DEBUG_CALLS) {
InetSocketAddress sender = msg instanceof DatagramPacket
? ((DatagramPacket) msg).sender()
: (InetSocketAddress) ctx.channel().remoteAddress();
Gdx.app.log(TAG, "messageReceived received packet from " + sender.getHostName() + ":" + sender.getPort());
assert sender instanceof InetSocketAddress;
InetSocketAddress casted = (InetSocketAddress) sender;
Gdx.app.log(TAG, "messageReceived received packet from " + casted.getHostName() + ":" + casted.getPort());
}
if (DEBUG_INBOUND) {
if (msg instanceof ByteBuf) {
@ -49,7 +48,7 @@ public class EndpointedChannelHandler<T> implements ChannelHandler, ChannelInbou
Gdx.app.debug(TAG, " " + msg);
}
}
endpoint.messageReceived(ctx, msg);
endpoint.messageReceived(ctx, sender, msg);
}
protected Object writeMessage(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {

View File

@ -2,7 +2,8 @@ package com.riiablo.net;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import java.net.SocketAddress;
public interface PacketProcessor {
void processPacket(ChannelHandlerContext ctx, ByteBuf bb);
void processPacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb);
}

View File

@ -4,6 +4,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import java.net.SocketAddress;
public abstract class MessageChannel implements ReliablePacketController.PacketListener {
protected final PacketTransceiver packetTransceiver;
@ -37,6 +38,6 @@ public abstract class MessageChannel implements ReliablePacketController.PacketL
public interface PacketTransceiver {
void sendPacket(ByteBuf bb);
void receivePacket(ChannelHandlerContext ctx, ByteBuf bb);
void receivePacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb);
}
}

View File

@ -46,7 +46,7 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan
Gdx.app.log(TAG, "messageReceived received packet from " + sender.getHostName() + ":" + sender.getPort());
ByteBuf in = packet.content();
if (DEBUG_INBOUND) Gdx.app.debug(TAG, " " + ByteBufUtil.hexDump(in));
endpoint.messageReceived(ctx, packet);
endpoint.messageReceived(ctx, packet.sender(), packet);
}
protected Object writeMessage(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {

View File

@ -7,6 +7,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import org.apache.commons.lang3.Validate;
@ -107,7 +108,12 @@ public class ReliableEndpoint implements UnicastEndpoint<DatagramPacket>, Messag
}
@Override
public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) {
public SocketAddress getRemoteAddress(ChannelHandlerContext ctx, DatagramPacket msg) {
return msg.sender();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, SocketAddress from, DatagramPacket packet) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onMessageReceived");
int channelId = Packet.getChannelId(packet.content());
if (DEBUG_QOS) {
@ -131,8 +137,8 @@ public class ReliableEndpoint implements UnicastEndpoint<DatagramPacket>, Messag
}
@Override
public void receivePacket(ChannelHandlerContext ctx, ByteBuf bb) {
packetProcessor.processPacket(ctx, bb);
public void receivePacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb) {
packetProcessor.processPacket(ctx, from, bb);
}
public static final Stats stats = new Stats();

View File

@ -4,6 +4,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import java.net.SocketAddress;
import com.badlogic.gdx.math.MathUtils;
@ -270,7 +271,7 @@ public class ReliablePacketController {
if (!isStale && !isAck) {
if (DEBUG_RECEIVE) Log.debug(TAG, "processing packet %d", sequence);
ByteBuf slice = bb.readSlice(bb.readableBytes());
channel.onPacketProcessed(ctx, sequence, slice);
channel.onPacketProcessed(ctx, packet.sender(), sequence, slice);
synchronized (receivedPackets) {
ReceivedPacketData receivedPacketData = receivedPackets.insert(sequence);
receivedPacketData.time = time;
@ -288,7 +289,7 @@ public class ReliablePacketController {
if (DEBUG_RECEIVE) Log.debug(TAG, "acked packet %d", ackSequence);
ReliableEndpoint.stats.NUM_PACKETS_ACKED++;
sentPacketData.acked = true;
channel.onAckProcessed(ctx, ackSequence);
channel.onAckProcessed(ctx, packet.sender(), ackSequence);
float rtt = (time - sentPacketData.time) * 1000f;
if ((this.rtt == 0.0f && rtt > 0.0f) || MathUtils.isEqual(this.rtt, rtt, TOLERANCE)) {
@ -318,7 +319,7 @@ public class ReliablePacketController {
public interface PacketListener {
void onPacketTransmitted(ByteBuf bb);
void onAckProcessed(ChannelHandlerContext ctx, int sequence);
void onPacketProcessed(ChannelHandlerContext ctx, int sequence, ByteBuf bb);
void onAckProcessed(ChannelHandlerContext ctx, SocketAddress from, int sequence);
void onPacketProcessed(ChannelHandlerContext ctx, SocketAddress from, int sequence, ByteBuf bb);
}
}

View File

@ -5,6 +5,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import java.net.SocketAddress;
import com.badlogic.gdx.math.MathUtils;
import com.badlogic.gdx.utils.IntArray;
@ -255,7 +256,7 @@ public class ReliableMessageChannel extends MessageChannel {
}
@Override
public void onAckProcessed(ChannelHandlerContext ctx, int sequence) {
public void onAckProcessed(ChannelHandlerContext ctx, SocketAddress from, int sequence) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onAckProcessed " + sequence);
// first, map sequence to message IDs and ack them
OutgoingPacketSet outgoingPacket = ackBuffer.find(sequence);
@ -289,9 +290,9 @@ public class ReliableMessageChannel extends MessageChannel {
}
@Override
public void onPacketProcessed(ChannelHandlerContext ctx, int sequence, ByteBuf bb) {
public void onPacketProcessed(ChannelHandlerContext ctx, SocketAddress from, int sequence, ByteBuf bb) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + sequence + " " + bb);
packetTransceiver.receivePacket(ctx, bb);
packetTransceiver.receivePacket(ctx, from, bb);
// TODO: this is different from original function, see above note within #sendMessage
}

View File

@ -4,6 +4,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import java.net.SocketAddress;
import com.riiablo.net.reliable.Log;
import com.riiablo.net.reliable.MessageChannel;
@ -25,14 +26,14 @@ public class UnreliableMessageChannel extends MessageChannel {
}
@Override
public void onAckProcessed(ChannelHandlerContext ctx, int sequence) {
public void onAckProcessed(ChannelHandlerContext ctx, SocketAddress from, int sequence) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onAckProcessed " + sequence);
}
@Override
public void onPacketProcessed(ChannelHandlerContext ctx, int sequence, ByteBuf bb) {
public void onPacketProcessed(ChannelHandlerContext ctx, SocketAddress from, int sequence, ByteBuf bb) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + sequence + " " + bb);
packetTransceiver.receivePacket(ctx, bb);
packetTransceiver.receivePacket(ctx, from, bb);
}
@Override

View File

@ -4,6 +4,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import java.net.SocketAddress;
import com.riiablo.net.reliable.Log;
import com.riiablo.net.reliable.MessageChannel;
@ -53,16 +54,16 @@ public class UnreliableOrderedMessageChannel extends MessageChannel {
}
@Override
public void onAckProcessed(ChannelHandlerContext ctx, int sequence) {
public void onAckProcessed(ChannelHandlerContext ctx, SocketAddress from, int sequence) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onAckProcessed " + sequence);
}
@Override
public void onPacketProcessed(ChannelHandlerContext ctx, int sequence, ByteBuf bb) {
public void onPacketProcessed(ChannelHandlerContext ctx, SocketAddress from, int sequence, ByteBuf bb) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + sequence + " " + bb);
if (sequence == nextSequence || ReliableUtils.sequenceGreaterThan(sequence, nextSequence)) {
nextSequence = (sequence + 1) & Packet.USHORT_MAX_VALUE;
packetTransceiver.receivePacket(ctx, bb);
packetTransceiver.receivePacket(ctx, from, bb);
}
}
}

View File

@ -5,6 +5,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import com.badlogic.gdx.Gdx;
@ -33,9 +34,14 @@ public class TcpEndpoint implements UnicastEndpoint<ByteBuf> {
}
@Override
public void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) {
public SocketAddress getRemoteAddress(ChannelHandlerContext ctx, ByteBuf msg) {
return ctx.channel().remoteAddress();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, SocketAddress from, ByteBuf msg) {
if (DEBUG_RECEIVE) Gdx.app.debug(TAG, "onMessageReceived");
packetProcessor.processPacket(ctx, msg);
packetProcessor.processPacket(ctx, from, msg);
}
@Override

View File

@ -13,6 +13,7 @@ import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import com.badlogic.gdx.Application;
import com.badlogic.gdx.ApplicationAdapter;
@ -93,7 +94,7 @@ public class TestClient extends ApplicationAdapter implements PacketProcessor {
}
@Override
public void processPacket(ChannelHandlerContext ctx, ByteBuf bb) {
public void processPacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb) {
Gdx.app.debug(TAG, "Processing packet...");
Gdx.app.log(TAG, ByteBufUtil.hexDump(bb));
}

View File

@ -12,6 +12,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import com.badlogic.gdx.Application;
@ -81,7 +82,7 @@ public class TestServer extends ApplicationAdapter implements PacketProcessor {
}
@Override
public void processPacket(ChannelHandlerContext ctx, ByteBuf bb) {
public void processPacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb) {
Gdx.app.debug(TAG, "Processing packet...");
Gdx.app.log(TAG, ByteBufUtil.hexDump(bb));

View File

@ -13,6 +13,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import com.badlogic.gdx.Application;
import com.badlogic.gdx.ApplicationAdapter;
@ -95,7 +96,7 @@ public class TestClient extends ApplicationAdapter implements PacketProcessor {
}
@Override
public void processPacket(ChannelHandlerContext ctx, ByteBuf bb) {
public void processPacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb) {
Gdx.app.debug(TAG, "Processing packet...");
Gdx.app.log(TAG, ByteBufUtil.hexDump(bb));
}

View File

@ -11,6 +11,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.SocketAddress;
import com.badlogic.gdx.Application;
import com.badlogic.gdx.ApplicationAdapter;
@ -81,7 +82,7 @@ public class TestServer extends ApplicationAdapter implements PacketProcessor {
}
@Override
public void processPacket(ChannelHandlerContext ctx, ByteBuf bb) {
public void processPacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb) {
Gdx.app.debug(TAG, "Processing packet...");
Gdx.app.log(TAG, " " + ByteBufUtil.hexDump(bb));
}

View File

@ -119,7 +119,7 @@ public class Main extends ApplicationAdapter implements PacketProcessor {
}
@Override
public void processPacket(ChannelHandlerContext ctx, ByteBuf bb) {
public void processPacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb) {
Gdx.app.debug(TAG, "Processing packet...");
Gdx.app.debug(TAG, " " + ByteBufUtil.hexDump(bb));
processPacket(ctx, Netty.getRootAsNetty(bb.nioBuffer()));
@ -136,7 +136,7 @@ public class Main extends ApplicationAdapter implements PacketProcessor {
InetSocketAddress from = (InetSocketAddress) ctx.channel().remoteAddress();
switch (netty.dataType()) {
case NettyData.Connection: {
Connection(from, netty);
Connection(ctx, from, netty);
break;
}
default:
@ -145,7 +145,7 @@ public class Main extends ApplicationAdapter implements PacketProcessor {
}
}
private void Connection(InetSocketAddress from, Netty netty) {
private void Connection(ChannelHandlerContext ctx, InetSocketAddress from, Netty netty) {
Gdx.app.debug(TAG, "Connection from " + from);
Connection connection = (Connection) netty.data(new Connection());

View File

@ -13,6 +13,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import com.badlogic.gdx.Application;
import com.badlogic.gdx.ApplicationAdapter;
@ -103,7 +104,7 @@ public class TestClient extends ApplicationAdapter implements PacketProcessor {
}
@Override
public void processPacket(ChannelHandlerContext ctx, ByteBuf bb) {
public void processPacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb) {
Gdx.app.debug(TAG, "Processing packet...");
Gdx.app.log(TAG, ByteBufUtil.hexDump(bb));
}