diff --git a/core/src/com/riiablo/net/Endpoint.java b/core/src/com/riiablo/net/Endpoint.java index e86a0dce..61504419 100644 --- a/core/src/com/riiablo/net/Endpoint.java +++ b/core/src/com/riiablo/net/Endpoint.java @@ -1,9 +1,11 @@ package com.riiablo.net; import io.netty.channel.ChannelHandlerContext; +import java.net.SocketAddress; public interface Endpoint extends PacketSender { void reset(); void update(float delta); - void messageReceived(ChannelHandlerContext ctx, T msg); + void messageReceived(ChannelHandlerContext ctx, SocketAddress from, T msg); + SocketAddress getRemoteAddress(ChannelHandlerContext ctx, T msg); } diff --git a/core/src/com/riiablo/net/EndpointedChannelHandler.java b/core/src/com/riiablo/net/EndpointedChannelHandler.java index 782b6ff3..3e122588 100644 --- a/core/src/com/riiablo/net/EndpointedChannelHandler.java +++ b/core/src/com/riiablo/net/EndpointedChannelHandler.java @@ -7,7 +7,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPromise; -import io.netty.channel.socket.DatagramPacket; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.TypeParameterMatcher; import java.net.InetSocketAddress; @@ -36,11 +35,11 @@ public class EndpointedChannelHandler implements ChannelHandler, ChannelInbou } protected void messageReceived(ChannelHandlerContext ctx, T msg) throws Exception { + SocketAddress sender = endpoint.getRemoteAddress(ctx, msg); if (DEBUG_CALLS) { - InetSocketAddress sender = msg instanceof DatagramPacket - ? ((DatagramPacket) msg).sender() - : (InetSocketAddress) ctx.channel().remoteAddress(); - Gdx.app.log(TAG, "messageReceived received packet from " + sender.getHostName() + ":" + sender.getPort()); + assert sender instanceof InetSocketAddress; + InetSocketAddress casted = (InetSocketAddress) sender; + Gdx.app.log(TAG, "messageReceived received packet from " + casted.getHostName() + ":" + casted.getPort()); } if (DEBUG_INBOUND) { if (msg instanceof ByteBuf) { @@ -49,7 +48,7 @@ public class EndpointedChannelHandler implements ChannelHandler, ChannelInbou Gdx.app.debug(TAG, " " + msg); } } - endpoint.messageReceived(ctx, msg); + endpoint.messageReceived(ctx, sender, msg); } protected Object writeMessage(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { diff --git a/core/src/com/riiablo/net/PacketProcessor.java b/core/src/com/riiablo/net/PacketProcessor.java index 732b8a01..25224b4b 100644 --- a/core/src/com/riiablo/net/PacketProcessor.java +++ b/core/src/com/riiablo/net/PacketProcessor.java @@ -2,7 +2,8 @@ package com.riiablo.net; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; +import java.net.SocketAddress; public interface PacketProcessor { - void processPacket(ChannelHandlerContext ctx, ByteBuf bb); + void processPacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb); } diff --git a/core/src/com/riiablo/net/reliable/MessageChannel.java b/core/src/com/riiablo/net/reliable/MessageChannel.java index 559d269a..56cf13b2 100644 --- a/core/src/com/riiablo/net/reliable/MessageChannel.java +++ b/core/src/com/riiablo/net/reliable/MessageChannel.java @@ -4,6 +4,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramPacket; +import java.net.SocketAddress; public abstract class MessageChannel implements ReliablePacketController.PacketListener { protected final PacketTransceiver packetTransceiver; @@ -37,6 +38,6 @@ public abstract class MessageChannel implements ReliablePacketController.PacketL public interface PacketTransceiver { void sendPacket(ByteBuf bb); - void receivePacket(ChannelHandlerContext ctx, ByteBuf bb); + void receivePacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb); } } diff --git a/core/src/com/riiablo/net/reliable/ReliableChannelHandler.java b/core/src/com/riiablo/net/reliable/ReliableChannelHandler.java index 17cb3c4b..d1dc94b2 100644 --- a/core/src/com/riiablo/net/reliable/ReliableChannelHandler.java +++ b/core/src/com/riiablo/net/reliable/ReliableChannelHandler.java @@ -46,7 +46,7 @@ public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHan Gdx.app.log(TAG, "messageReceived received packet from " + sender.getHostName() + ":" + sender.getPort()); ByteBuf in = packet.content(); if (DEBUG_INBOUND) Gdx.app.debug(TAG, " " + ByteBufUtil.hexDump(in)); - endpoint.messageReceived(ctx, packet); + endpoint.messageReceived(ctx, packet.sender(), packet); } protected Object writeMessage(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { diff --git a/core/src/com/riiablo/net/reliable/ReliableEndpoint.java b/core/src/com/riiablo/net/reliable/ReliableEndpoint.java index 50913ee4..4f3a4926 100644 --- a/core/src/com/riiablo/net/reliable/ReliableEndpoint.java +++ b/core/src/com/riiablo/net/reliable/ReliableEndpoint.java @@ -7,6 +7,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramPacket; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.nio.ByteBuffer; import org.apache.commons.lang3.Validate; @@ -107,7 +108,12 @@ public class ReliableEndpoint implements UnicastEndpoint, Messag } @Override - public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) { + public SocketAddress getRemoteAddress(ChannelHandlerContext ctx, DatagramPacket msg) { + return msg.sender(); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, SocketAddress from, DatagramPacket packet) { if (DEBUG_RECEIVE) Log.debug(TAG, "onMessageReceived"); int channelId = Packet.getChannelId(packet.content()); if (DEBUG_QOS) { @@ -131,8 +137,8 @@ public class ReliableEndpoint implements UnicastEndpoint, Messag } @Override - public void receivePacket(ChannelHandlerContext ctx, ByteBuf bb) { - packetProcessor.processPacket(ctx, bb); + public void receivePacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb) { + packetProcessor.processPacket(ctx, from, bb); } public static final Stats stats = new Stats(); diff --git a/core/src/com/riiablo/net/reliable/ReliablePacketController.java b/core/src/com/riiablo/net/reliable/ReliablePacketController.java index df4e00ca..ccbbb34b 100644 --- a/core/src/com/riiablo/net/reliable/ReliablePacketController.java +++ b/core/src/com/riiablo/net/reliable/ReliablePacketController.java @@ -4,6 +4,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramPacket; +import java.net.SocketAddress; import com.badlogic.gdx.math.MathUtils; @@ -270,7 +271,7 @@ public class ReliablePacketController { if (!isStale && !isAck) { if (DEBUG_RECEIVE) Log.debug(TAG, "processing packet %d", sequence); ByteBuf slice = bb.readSlice(bb.readableBytes()); - channel.onPacketProcessed(ctx, sequence, slice); + channel.onPacketProcessed(ctx, packet.sender(), sequence, slice); synchronized (receivedPackets) { ReceivedPacketData receivedPacketData = receivedPackets.insert(sequence); receivedPacketData.time = time; @@ -288,7 +289,7 @@ public class ReliablePacketController { if (DEBUG_RECEIVE) Log.debug(TAG, "acked packet %d", ackSequence); ReliableEndpoint.stats.NUM_PACKETS_ACKED++; sentPacketData.acked = true; - channel.onAckProcessed(ctx, ackSequence); + channel.onAckProcessed(ctx, packet.sender(), ackSequence); float rtt = (time - sentPacketData.time) * 1000f; if ((this.rtt == 0.0f && rtt > 0.0f) || MathUtils.isEqual(this.rtt, rtt, TOLERANCE)) { @@ -318,7 +319,7 @@ public class ReliablePacketController { public interface PacketListener { void onPacketTransmitted(ByteBuf bb); - void onAckProcessed(ChannelHandlerContext ctx, int sequence); - void onPacketProcessed(ChannelHandlerContext ctx, int sequence, ByteBuf bb); + void onAckProcessed(ChannelHandlerContext ctx, SocketAddress from, int sequence); + void onPacketProcessed(ChannelHandlerContext ctx, SocketAddress from, int sequence, ByteBuf bb); } } diff --git a/core/src/com/riiablo/net/reliable/channel/ReliableMessageChannel.java b/core/src/com/riiablo/net/reliable/channel/ReliableMessageChannel.java index f4fdf69c..18a09ef6 100644 --- a/core/src/com/riiablo/net/reliable/channel/ReliableMessageChannel.java +++ b/core/src/com/riiablo/net/reliable/channel/ReliableMessageChannel.java @@ -5,6 +5,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramPacket; +import java.net.SocketAddress; import com.badlogic.gdx.math.MathUtils; import com.badlogic.gdx.utils.IntArray; @@ -255,7 +256,7 @@ public class ReliableMessageChannel extends MessageChannel { } @Override - public void onAckProcessed(ChannelHandlerContext ctx, int sequence) { + public void onAckProcessed(ChannelHandlerContext ctx, SocketAddress from, int sequence) { if (DEBUG_RECEIVE) Log.debug(TAG, "onAckProcessed " + sequence); // first, map sequence to message IDs and ack them OutgoingPacketSet outgoingPacket = ackBuffer.find(sequence); @@ -289,9 +290,9 @@ public class ReliableMessageChannel extends MessageChannel { } @Override - public void onPacketProcessed(ChannelHandlerContext ctx, int sequence, ByteBuf bb) { + public void onPacketProcessed(ChannelHandlerContext ctx, SocketAddress from, int sequence, ByteBuf bb) { if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + sequence + " " + bb); - packetTransceiver.receivePacket(ctx, bb); + packetTransceiver.receivePacket(ctx, from, bb); // TODO: this is different from original function, see above note within #sendMessage } diff --git a/core/src/com/riiablo/net/reliable/channel/UnreliableMessageChannel.java b/core/src/com/riiablo/net/reliable/channel/UnreliableMessageChannel.java index 632e3186..e3e9b776 100644 --- a/core/src/com/riiablo/net/reliable/channel/UnreliableMessageChannel.java +++ b/core/src/com/riiablo/net/reliable/channel/UnreliableMessageChannel.java @@ -4,6 +4,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramPacket; +import java.net.SocketAddress; import com.riiablo.net.reliable.Log; import com.riiablo.net.reliable.MessageChannel; @@ -25,14 +26,14 @@ public class UnreliableMessageChannel extends MessageChannel { } @Override - public void onAckProcessed(ChannelHandlerContext ctx, int sequence) { + public void onAckProcessed(ChannelHandlerContext ctx, SocketAddress from, int sequence) { if (DEBUG_RECEIVE) Log.debug(TAG, "onAckProcessed " + sequence); } @Override - public void onPacketProcessed(ChannelHandlerContext ctx, int sequence, ByteBuf bb) { + public void onPacketProcessed(ChannelHandlerContext ctx, SocketAddress from, int sequence, ByteBuf bb) { if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + sequence + " " + bb); - packetTransceiver.receivePacket(ctx, bb); + packetTransceiver.receivePacket(ctx, from, bb); } @Override diff --git a/core/src/com/riiablo/net/reliable/channel/UnreliableOrderedMessageChannel.java b/core/src/com/riiablo/net/reliable/channel/UnreliableOrderedMessageChannel.java index 59aec462..fbe95209 100644 --- a/core/src/com/riiablo/net/reliable/channel/UnreliableOrderedMessageChannel.java +++ b/core/src/com/riiablo/net/reliable/channel/UnreliableOrderedMessageChannel.java @@ -4,6 +4,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramPacket; +import java.net.SocketAddress; import com.riiablo.net.reliable.Log; import com.riiablo.net.reliable.MessageChannel; @@ -53,16 +54,16 @@ public class UnreliableOrderedMessageChannel extends MessageChannel { } @Override - public void onAckProcessed(ChannelHandlerContext ctx, int sequence) { + public void onAckProcessed(ChannelHandlerContext ctx, SocketAddress from, int sequence) { if (DEBUG_RECEIVE) Log.debug(TAG, "onAckProcessed " + sequence); } @Override - public void onPacketProcessed(ChannelHandlerContext ctx, int sequence, ByteBuf bb) { + public void onPacketProcessed(ChannelHandlerContext ctx, SocketAddress from, int sequence, ByteBuf bb) { if (DEBUG_RECEIVE) Log.debug(TAG, "onPacketProcessed " + sequence + " " + bb); if (sequence == nextSequence || ReliableUtils.sequenceGreaterThan(sequence, nextSequence)) { nextSequence = (sequence + 1) & Packet.USHORT_MAX_VALUE; - packetTransceiver.receivePacket(ctx, bb); + packetTransceiver.receivePacket(ctx, from, bb); } } } diff --git a/core/src/com/riiablo/net/tcp/TcpEndpoint.java b/core/src/com/riiablo/net/tcp/TcpEndpoint.java index 6b9edcf1..8a73fb63 100644 --- a/core/src/com/riiablo/net/tcp/TcpEndpoint.java +++ b/core/src/com/riiablo/net/tcp/TcpEndpoint.java @@ -5,6 +5,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.nio.ByteBuffer; import com.badlogic.gdx.Gdx; @@ -33,9 +34,14 @@ public class TcpEndpoint implements UnicastEndpoint { } @Override - public void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) { + public SocketAddress getRemoteAddress(ChannelHandlerContext ctx, ByteBuf msg) { + return ctx.channel().remoteAddress(); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, SocketAddress from, ByteBuf msg) { if (DEBUG_RECEIVE) Gdx.app.debug(TAG, "onMessageReceived"); - packetProcessor.processPacket(ctx, msg); + packetProcessor.processPacket(ctx, from, msg); } @Override diff --git a/server/netty/src/com/riiablo/net/reliable/TestClient.java b/server/netty/src/com/riiablo/net/reliable/TestClient.java index 4664c58f..6eb2439c 100644 --- a/server/netty/src/com/riiablo/net/reliable/TestClient.java +++ b/server/netty/src/com/riiablo/net/reliable/TestClient.java @@ -13,6 +13,7 @@ import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.nio.NioDatagramChannel; import java.net.InetSocketAddress; +import java.net.SocketAddress; import com.badlogic.gdx.Application; import com.badlogic.gdx.ApplicationAdapter; @@ -93,7 +94,7 @@ public class TestClient extends ApplicationAdapter implements PacketProcessor { } @Override - public void processPacket(ChannelHandlerContext ctx, ByteBuf bb) { + public void processPacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb) { Gdx.app.debug(TAG, "Processing packet..."); Gdx.app.log(TAG, ByteBufUtil.hexDump(bb)); } diff --git a/server/netty/src/com/riiablo/net/reliable/TestServer.java b/server/netty/src/com/riiablo/net/reliable/TestServer.java index 8395c222..1a94fed5 100644 --- a/server/netty/src/com/riiablo/net/reliable/TestServer.java +++ b/server/netty/src/com/riiablo/net/reliable/TestServer.java @@ -12,6 +12,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.nio.NioDatagramChannel; +import java.net.SocketAddress; import java.nio.ByteBuffer; import com.badlogic.gdx.Application; @@ -81,7 +82,7 @@ public class TestServer extends ApplicationAdapter implements PacketProcessor { } @Override - public void processPacket(ChannelHandlerContext ctx, ByteBuf bb) { + public void processPacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb) { Gdx.app.debug(TAG, "Processing packet..."); Gdx.app.log(TAG, ByteBufUtil.hexDump(bb)); diff --git a/server/netty/src/com/riiablo/net/tcp/TestClient.java b/server/netty/src/com/riiablo/net/tcp/TestClient.java index 8890c00b..4194f138 100644 --- a/server/netty/src/com/riiablo/net/tcp/TestClient.java +++ b/server/netty/src/com/riiablo/net/tcp/TestClient.java @@ -13,6 +13,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; +import java.net.SocketAddress; import com.badlogic.gdx.Application; import com.badlogic.gdx.ApplicationAdapter; @@ -95,7 +96,7 @@ public class TestClient extends ApplicationAdapter implements PacketProcessor { } @Override - public void processPacket(ChannelHandlerContext ctx, ByteBuf bb) { + public void processPacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb) { Gdx.app.debug(TAG, "Processing packet..."); Gdx.app.log(TAG, ByteBufUtil.hexDump(bb)); } diff --git a/server/netty/src/com/riiablo/net/tcp/TestServer.java b/server/netty/src/com/riiablo/net/tcp/TestServer.java index a62b6ca2..20c0cabc 100644 --- a/server/netty/src/com/riiablo/net/tcp/TestServer.java +++ b/server/netty/src/com/riiablo/net/tcp/TestServer.java @@ -11,6 +11,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +import java.net.SocketAddress; import com.badlogic.gdx.Application; import com.badlogic.gdx.ApplicationAdapter; @@ -81,7 +82,7 @@ public class TestServer extends ApplicationAdapter implements PacketProcessor { } @Override - public void processPacket(ChannelHandlerContext ctx, ByteBuf bb) { + public void processPacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb) { Gdx.app.debug(TAG, "Processing packet..."); Gdx.app.log(TAG, " " + ByteBufUtil.hexDump(bb)); } diff --git a/server/netty/src/com/riiablo/server/d2gs/Main.java b/server/netty/src/com/riiablo/server/d2gs/Main.java index 3c9fb523..9cb11479 100644 --- a/server/netty/src/com/riiablo/server/d2gs/Main.java +++ b/server/netty/src/com/riiablo/server/d2gs/Main.java @@ -119,7 +119,7 @@ public class Main extends ApplicationAdapter implements PacketProcessor { } @Override - public void processPacket(ChannelHandlerContext ctx, ByteBuf bb) { + public void processPacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb) { Gdx.app.debug(TAG, "Processing packet..."); Gdx.app.debug(TAG, " " + ByteBufUtil.hexDump(bb)); processPacket(ctx, Netty.getRootAsNetty(bb.nioBuffer())); @@ -136,7 +136,7 @@ public class Main extends ApplicationAdapter implements PacketProcessor { InetSocketAddress from = (InetSocketAddress) ctx.channel().remoteAddress(); switch (netty.dataType()) { case NettyData.Connection: { - Connection(from, netty); + Connection(ctx, from, netty); break; } default: @@ -145,7 +145,7 @@ public class Main extends ApplicationAdapter implements PacketProcessor { } } - private void Connection(InetSocketAddress from, Netty netty) { + private void Connection(ChannelHandlerContext ctx, InetSocketAddress from, Netty netty) { Gdx.app.debug(TAG, "Connection from " + from); Connection connection = (Connection) netty.data(new Connection()); diff --git a/server/netty/src/com/riiablo/server/d2gs/TestClient.java b/server/netty/src/com/riiablo/server/d2gs/TestClient.java index b1dfb363..30269ccf 100644 --- a/server/netty/src/com/riiablo/server/d2gs/TestClient.java +++ b/server/netty/src/com/riiablo/server/d2gs/TestClient.java @@ -13,6 +13,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; +import java.net.SocketAddress; import com.badlogic.gdx.Application; import com.badlogic.gdx.ApplicationAdapter; @@ -103,7 +104,7 @@ public class TestClient extends ApplicationAdapter implements PacketProcessor { } @Override - public void processPacket(ChannelHandlerContext ctx, ByteBuf bb) { + public void processPacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb) { Gdx.app.debug(TAG, "Processing packet..."); Gdx.app.log(TAG, ByteBufUtil.hexDump(bb)); }