Abstracted networking further and implemented TCP implementation #78

Deprecated ReliableChannelHandler (succeeded by EndpointedChannelHandler)
Added basic test netty TCP client and server
This commit is contained in:
Collin Smith 2020-06-24 15:46:41 -07:00
parent b832c89dcc
commit b74e870e9b
9 changed files with 455 additions and 6 deletions

View File

@ -0,0 +1,9 @@
package com.riiablo.net;
import io.netty.channel.ChannelHandlerContext;
public interface Endpoint<T, Q> extends PacketSender<Q> {
void reset();
void update(float delta);
void messageReceived(ChannelHandlerContext ctx, T msg);
}

View File

@ -0,0 +1,182 @@
package com.riiablo.net;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.TypeParameterMatcher;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import com.badlogic.gdx.Gdx;
public class EndpointedChannelHandler<T, Q> implements ChannelHandler, ChannelInboundHandler, ChannelOutboundHandler {
private static final String TAG = "EndpointedChannelHandler";
private static final boolean DEBUG = true;
private static final boolean DEBUG_CALLS = DEBUG && true;
private static final boolean DEBUG_INBOUND = DEBUG && true;
private static final boolean DEBUG_OUTBOUND = DEBUG && true;
private final TypeParameterMatcher matcher;
private final Endpoint<T, Q> endpoint;
public EndpointedChannelHandler(Class<T> packetType, Endpoint<T, Q> endpoint) {
this.endpoint = endpoint;
matcher = TypeParameterMatcher.get(packetType);
}
protected boolean accept(Object msg) throws Exception {
return matcher.match(msg);
}
protected void messageReceived(ChannelHandlerContext ctx, T msg) throws Exception {
if (DEBUG_CALLS) {
InetSocketAddress sender = msg instanceof DatagramPacket
? ((DatagramPacket) msg).sender()
: (InetSocketAddress) ctx.channel().remoteAddress();
Gdx.app.log(TAG, "messageReceived received packet from " + sender.getHostName() + ":" + sender.getPort());
}
endpoint.messageReceived(ctx, msg);
}
protected Object writeMessage(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
if (DEBUG_CALLS) {
InetSocketAddress receiver = (InetSocketAddress) ctx.channel().remoteAddress();
Gdx.app.log(TAG, "writeMessage sending packet to " + receiver.getHostName() + ":" + receiver.getPort());
}
ByteBuf out = msg;
if (DEBUG_OUTBOUND) Gdx.app.debug(TAG, " " + ByteBufUtil.hexDump(out));
return msg;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "handlerAdded");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "handlerRemoved");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "exceptionCaught");
ctx.fireExceptionCaught(cause);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "channelRegistered");
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "channelUnregistered");
ctx.fireChannelUnregistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "channelActive");
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "channelInactive");
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "channelRead");
boolean release = true;
try {
if (accept(msg)) {
@SuppressWarnings("unchecked")
T castedMsg = (T) msg;
messageReceived(ctx, castedMsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (release) ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "channelReadComplete");
ctx.fireChannelReadComplete();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "userEventTriggered");
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "channelWritabilityChanged");
ctx.fireChannelWritabilityChanged();
}
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "bind");
ctx.bind(localAddress, promise);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "connect");
ctx.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "disconnect");
ctx.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "close");
ctx.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "deregister");
ctx.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "read");
ctx.read();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "write");
msg = writeMessage(ctx, (ByteBuf) msg);
ctx.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
if (DEBUG_CALLS) Gdx.app.debug(TAG, "flush");
ctx.flush();
}
}

View File

@ -15,6 +15,13 @@ import java.net.SocketAddress;
import com.badlogic.gdx.Gdx;
import com.riiablo.net.EndpointedChannelHandler;
/**
* Replaced by {@link EndpointedChannelHandler}
* @see EndpointedChannelHandler
*/
@Deprecated
public class ReliableChannelHandler implements ChannelHandler, ChannelInboundHandler, ChannelOutboundHandler {
private static final String TAG = "ReliableChannelHandler";

View File

@ -9,14 +9,14 @@ import io.netty.channel.socket.DatagramPacket;
import java.nio.ByteBuffer;
import org.apache.commons.lang3.Validate;
import com.riiablo.net.Endpoint;
import com.riiablo.net.PacketProcessor;
import com.riiablo.net.PacketSender;
import com.riiablo.net.reliable.channel.ReliableMessageChannel;
import com.riiablo.net.reliable.channel.UnreliableMessageChannel;
import com.riiablo.net.reliable.channel.UnreliableOrderedMessageChannel;
import com.riiablo.util.EnumIntMap;
public class ReliableEndpoint implements PacketSender<QoS>, MessageChannel.PacketTransceiver {
public class ReliableEndpoint implements Endpoint<DatagramPacket, QoS>, MessageChannel.PacketTransceiver {
private static final String TAG = "ReliableEndpoint";
private static final boolean DEBUG = true;
@ -52,10 +52,12 @@ public class ReliableEndpoint implements PacketSender<QoS>, MessageChannel.Packe
return channel;
}
@Override
public void reset() {
for (MessageChannel mc : channels) if (mc != null) mc.reset();
}
@Override
public void update(float delta) {
for (MessageChannel mc : channels) if (mc != null) mc.update(delta, channel);
}
@ -82,6 +84,7 @@ public class ReliableEndpoint implements PacketSender<QoS>, MessageChannel.Packe
mc.sendMessage(channelId, channel, Unpooled.wrappedBuffer(bb)); // automatically released
}
@Override
public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) {
if (DEBUG_RECEIVE) Log.debug(TAG, "onMessageReceived");
int channelId = Packet.getChannelId(packet.content());

View File

@ -11,6 +11,7 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.net.InetSocketAddress;
@ -21,6 +22,8 @@ import com.badlogic.gdx.backends.headless.HeadlessApplication;
import com.badlogic.gdx.backends.headless.HeadlessApplicationConfiguration;
import com.riiablo.codec.Animation;
import com.riiablo.net.Endpoint;
import com.riiablo.net.EndpointedChannelHandler;
import com.riiablo.net.PacketProcessor;
import com.riiablo.net.packet.netty.Connection;
import com.riiablo.net.packet.netty.Netty;
@ -36,7 +39,7 @@ public class TestClient extends ApplicationAdapter implements PacketProcessor {
new HeadlessApplication(new TestClient(), config);
}
private ReliableEndpoint endpoint;
private Endpoint<DatagramPacket, QoS> endpoint;
@Override
public void create() {
@ -52,7 +55,7 @@ public class TestClient extends ApplicationAdapter implements PacketProcessor {
protected void initChannel(DatagramChannel ch) {
endpoint = new ReliableEndpoint(ch, TestClient.this);
ch.pipeline()
.addLast(new ReliableChannelHandler(endpoint))
.addLast(new EndpointedChannelHandler<>(DatagramPacket.class, endpoint))
.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

View File

@ -9,6 +9,7 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.nio.ByteBuffer;
@ -19,6 +20,8 @@ import com.badlogic.gdx.backends.headless.HeadlessApplication;
import com.badlogic.gdx.backends.headless.HeadlessApplicationConfiguration;
import com.riiablo.codec.Animation;
import com.riiablo.net.Endpoint;
import com.riiablo.net.EndpointedChannelHandler;
import com.riiablo.net.PacketProcessor;
import com.riiablo.net.packet.netty.Netty;
import com.riiablo.net.packet.netty.NettyData;
@ -34,7 +37,7 @@ public class TestServer extends ApplicationAdapter implements PacketProcessor {
new HeadlessApplication(new TestServer(), config);
}
private ReliableEndpoint endpoint;
private Endpoint<DatagramPacket, QoS> endpoint;
@Override
public void create() {
@ -51,7 +54,7 @@ public class TestServer extends ApplicationAdapter implements PacketProcessor {
protected void initChannel(DatagramChannel ch) {
endpoint = new ReliableEndpoint(ch, TestServer.this);
ch.pipeline()
.addLast(new ReliableChannelHandler(endpoint))
.addLast(new EndpointedChannelHandler<>(DatagramPacket.class, endpoint))
;
}
})

View File

@ -0,0 +1,56 @@
package com.riiablo.net.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.nio.ByteBuffer;
import com.badlogic.gdx.Gdx;
import com.riiablo.net.Endpoint;
import com.riiablo.net.PacketProcessor;
public class TcpEndpoint implements Endpoint<ByteBuf, Object> {
private static final String TAG = "TcpEndpoint";
private static final boolean DEBUG = true;
private static final boolean DEBUG_SEND = DEBUG && true;
private static final boolean DEBUG_RECEIVE = DEBUG && true;
private final Channel channel;
private final PacketProcessor packetProcessor;
public TcpEndpoint(Channel channel, PacketProcessor packetProcessor) {
this.channel = channel;
this.packetProcessor = packetProcessor;
}
@Override
public Channel channel() {
return channel;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) {
if (DEBUG_RECEIVE) Gdx.app.debug(TAG, "onMessageReceived");
packetProcessor.processPacket(msg);
}
@Override
public void sendMessage(ByteBuffer bb) {
if (DEBUG_SEND) Gdx.app.debug(TAG, "sendMessage");
channel.writeAndFlush(Unpooled.wrappedBuffer(bb)); // releases msg
}
@Override
public void sendMessage(Object qos, ByteBuffer bb) {
sendMessage(bb);
}
@Override
public void reset() {}
@Override
public void update(float delta) {}
}

View File

@ -0,0 +1,105 @@
package com.riiablo.net.tcp;
import com.google.flatbuffers.FlatBufferBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
import com.badlogic.gdx.Application;
import com.badlogic.gdx.ApplicationAdapter;
import com.badlogic.gdx.Gdx;
import com.badlogic.gdx.backends.headless.HeadlessApplication;
import com.badlogic.gdx.backends.headless.HeadlessApplicationConfiguration;
import com.riiablo.codec.Animation;
import com.riiablo.net.Endpoint;
import com.riiablo.net.EndpointedChannelHandler;
import com.riiablo.net.PacketProcessor;
import com.riiablo.net.packet.netty.Connection;
import com.riiablo.net.packet.netty.Netty;
import com.riiablo.net.packet.netty.NettyData;
import com.riiablo.net.reliable.QoS;
public class TestClient extends ApplicationAdapter implements PacketProcessor {
private static final String TAG = "Client";
public static void main(String[] args) throws Exception {
Thread.sleep(1000);
HeadlessApplicationConfiguration config = new HeadlessApplicationConfiguration();
config.renderInterval = Animation.FRAME_DURATION;
new HeadlessApplication(new TestClient(), config);
}
private Endpoint<ByteBuf, Object> endpoint;
@Override
public void create() {
Gdx.app.setLogLevel(Application.LOG_DEBUG);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap()
.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
endpoint = new TcpEndpoint(ch, TestClient.this);
ch.pipeline()
.addLast(new EndpointedChannelHandler<>(ByteBuf.class, endpoint))
.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
init();
ctx.pipeline().remove(this);
}
void init() {
InetSocketAddress remoteAddress = (InetSocketAddress) endpoint.channel().remoteAddress();
Gdx.app.log(TAG, "Sending Connection packet to " + remoteAddress.getHostString() + ":" + remoteAddress.getPort());
FlatBufferBuilder builder = new FlatBufferBuilder();
Connection.startConnection(builder);
int dataOffset = Connection.endConnection(builder);
int offset = Netty.createNetty(builder, NettyData.Connection, dataOffset);
Netty.finishNettyBuffer(builder, offset);
endpoint.sendMessage(QoS.Unreliable, builder.dataBuffer());
}
})
;
}
});
ChannelFuture f = b.connect("localhost", TestServer.PORT).sync();
f.channel().closeFuture().sync();
} catch (Throwable t) {
Gdx.app.error(TAG, t.getMessage(), t);
} finally {
workerGroup.shutdownGracefully();
Gdx.app.exit();
}
}
@Override
public void render() {
endpoint.update(Gdx.graphics.getDeltaTime());
}
@Override
public void processPacket(ByteBuf bb) {
Gdx.app.debug(TAG, "Processing packet...");
Gdx.app.log(TAG, ByteBufUtil.hexDump(bb));
}
}

View File

@ -0,0 +1,81 @@
package com.riiablo.net.tcp;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import com.badlogic.gdx.Application;
import com.badlogic.gdx.ApplicationAdapter;
import com.badlogic.gdx.Gdx;
import com.badlogic.gdx.backends.headless.HeadlessApplication;
import com.badlogic.gdx.backends.headless.HeadlessApplicationConfiguration;
import com.riiablo.codec.Animation;
import com.riiablo.net.Endpoint;
import com.riiablo.net.EndpointedChannelHandler;
import com.riiablo.net.PacketProcessor;
public class TestServer extends ApplicationAdapter implements PacketProcessor {
private static final String TAG = "Server";
static final int PORT = 6114;
public static void main(String[] args) throws Exception {
HeadlessApplicationConfiguration config = new HeadlessApplicationConfiguration();
config.renderInterval = Animation.FRAME_DURATION;
new HeadlessApplication(new TestServer(), config);
}
private Endpoint<ByteBuf, Object> endpoint;
@Override
public void create() {
Gdx.app.setLogLevel(Application.LOG_DEBUG);
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
endpoint = new TcpEndpoint(ch, TestServer.this);
ch.pipeline()
.addLast(new EndpointedChannelHandler<>(ByteBuf.class, endpoint))
;
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
} catch (Throwable t) {
Gdx.app.error(TAG, t.getMessage(), t);
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
Gdx.app.exit();
}
}
@Override
public void render() {
endpoint.update(Gdx.graphics.getDeltaTime());
}
@Override
public void processPacket(ByteBuf bb) {
Gdx.app.debug(TAG, "Processing packet...");
Gdx.app.log(TAG, " " + ByteBufUtil.hexDump(bb));
}
}