Created experimental Netty impl of a TCP D2GS server

Created experimental Netty impl of a TCP D2GS server
Added salt field to Netty Connection fbs
Created ConnectionState for Connection fbs
This commit is contained in:
Collin Smith 2020-06-27 00:30:15 -07:00
parent 127186f385
commit 1781e46724
5 changed files with 365 additions and 1 deletions

View File

@ -14,8 +14,17 @@ public final class Connection extends Table {
public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; vtable_start = bb_pos - bb.getInt(bb_pos); vtable_size = bb.getShort(vtable_start); }
public Connection __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; }
public long salt() { int o = __offset(4); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public static void startConnection(FlatBufferBuilder builder) { builder.startObject(0); }
public static int createConnection(FlatBufferBuilder builder,
long salt) {
builder.startObject(1);
Connection.addSalt(builder, salt);
return Connection.endConnection(builder);
}
public static void startConnection(FlatBufferBuilder builder) { builder.startObject(1); }
public static void addSalt(FlatBufferBuilder builder, long salt) { builder.addLong(0, salt, 0L); }
public static int endConnection(FlatBufferBuilder builder) {
int o = builder.endObject();
return o;

View File

@ -0,0 +1,17 @@
// automatically generated by the FlatBuffers compiler, do not modify
package com.riiablo.net.packet.netty;
public final class ConnectionState {
private ConnectionState() { }
public static final byte request = 0;
public static final byte waiting_for_challenge = 1;
public static final byte answering_challenge = 2;
public static final byte awaiting_connect = 3;
public static final byte connection_payload = 4;
public static final String[] names = { "request", "waiting_for_challenge", "answering_challenge", "awaiting_connect", "connection_payload", };
public static String name(int e) { return names[e]; }
}

View File

@ -1,9 +1,18 @@
namespace com.riiablo.net.packet.netty;
enum ConnectionState:byte {
request,
waiting_for_challenge,
answering_challenge,
awaiting_connect,
connection_payload
}
table Connection {
// request
// response
salt:int64;
}
table Disconnect {

View File

@ -0,0 +1,219 @@
package com.riiablo.server.d2gs;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import com.badlogic.gdx.Application;
import com.badlogic.gdx.ApplicationAdapter;
import com.badlogic.gdx.Gdx;
import com.badlogic.gdx.backends.headless.HeadlessApplication;
import com.badlogic.gdx.backends.headless.HeadlessApplicationConfiguration;
import com.badlogic.gdx.math.MathUtils;
import com.riiablo.Riiablo;
import com.riiablo.codec.Animation;
import com.riiablo.net.Endpoint;
import com.riiablo.net.EndpointedChannelHandler;
import com.riiablo.net.PacketProcessor;
import com.riiablo.net.packet.netty.Connection;
import com.riiablo.net.packet.netty.Netty;
import com.riiablo.net.packet.netty.NettyData;
import com.riiablo.net.tcp.TcpEndpoint;
public class Main extends ApplicationAdapter implements PacketProcessor {
private static final String TAG = "D2GS";
private static final boolean DEBUG = true;
static final int PORT = 6114;
private static final int MAX_CLIENTS = Riiablo.MAX_PLAYERS;
public static void main(String[] args) {
HeadlessApplicationConfiguration config = new HeadlessApplicationConfiguration();
config.renderInterval = Animation.FRAME_DURATION;
new HeadlessApplication(new Main(), config);
}
private Endpoint<?> endpoint;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private final ConnectionLimiter connectionLimiter = new ConnectionLimiter();
private final ConcurrentHashMap<SocketAddress, Integer> connectionIds = new ConcurrentHashMap<>(32);
private final byte[] connectionState = new byte[MAX_CLIENTS];
private final ConcurrentHashMap<SocketAddress, Long> clientSalts = new ConcurrentHashMap<>(32);
private final ConcurrentHashMap<SocketAddress, Long> serverSalts = new ConcurrentHashMap<>(32);
private final ConcurrentHashMap<SocketAddress, Long> xor = new ConcurrentHashMap<>(32);
@Override
public void create() {
Gdx.app.setLogLevel(Application.LOG_DEBUG);
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
TcpEndpoint endpoint = new TcpEndpoint(ch, Main.this);
Main.this.endpoint = endpoint;
ch.pipeline()
.addFirst(connectionLimiter)
.addLast(new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) return;
in.markReaderIndex();
final int length = in.readIntLE();
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
}
out.add(in.readRetainedSlice(length));
}
})
.addLast(new EndpointedChannelHandler<>(ByteBuf.class, endpoint))
;
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(PORT).sync();
} catch (Throwable t) {
Gdx.app.error(TAG, t.getMessage(), t);
Gdx.app.exit();
}
}
@Override
public void render() {
if (endpoint == null || !endpoint.channel().isActive()) return;
endpoint.update(Gdx.graphics.getDeltaTime());
}
@Override
public void dispose() {
Gdx.app.log(TAG, "Shutting down...");
if (!workerGroup.isShuttingDown()) workerGroup.shutdownGracefully();
if (!bossGroup.isShuttingDown()) bossGroup.shutdownGracefully();
}
@Override
public void processPacket(ChannelHandlerContext ctx, ByteBuf bb) {
Gdx.app.debug(TAG, "Processing packet...");
Gdx.app.debug(TAG, " " + ByteBufUtil.hexDump(bb));
processPacket(ctx, Netty.getRootAsNetty(bb.nioBuffer()));
}
private int state = 0;
public static final int CLIENT_CONNECTING = 0;
public static final int CLIENT_DISCONNECTED = 0;
public static final int CLIENT_CONNECT = 0;
public void processPacket(ChannelHandlerContext ctx, Netty netty) {
Gdx.app.debug(TAG, " " + "dataType=" + NettyData.name(netty.dataType()));
InetSocketAddress from = (InetSocketAddress) ctx.channel().remoteAddress();
switch (netty.dataType()) {
case NettyData.Connection: {
Connection(from, netty);
break;
}
default:
Gdx.app.debug(TAG, "unknown data type: " + netty.dataType());
ctx.close();
}
}
private void Connection(InetSocketAddress from, Netty netty) {
Gdx.app.debug(TAG, "Connection from " + from);
Connection connection = (Connection) netty.data(new Connection());
boolean generateSalt = true;
long clientSalt = connection.salt();
Gdx.app.debug(TAG, " " + String.format("client salt=%016x", clientSalt));
if (clientSalts.containsKey(from)) {
long storedClientSalt = clientSalts.get(from);
if (storedClientSalt == clientSalt) {
Gdx.app.debug(TAG, " " + "client salt matches server record");
generateSalt = false;
} else {
Gdx.app.debug(TAG, " " + "client salt mismatch with server record");
Gdx.app.debug(TAG, " " + "updating client salt to server record");
clientSalts.put(from, clientSalt);
clientSalt = storedClientSalt;
Gdx.app.debug(TAG, " " + String.format("client salt=%016x", clientSalt));
}
} else {
Gdx.app.debug(TAG, " " + "no server record found matching client salt");
clientSalts.put(from, clientSalt);
}
long serverSalt;
if (generateSalt) {
Gdx.app.debug(TAG, " " + "generating server salt");
serverSalt = MathUtils.random.nextLong();
Long previousValue = serverSalts.put(from, serverSalt);
if (previousValue != null) {
Gdx.app.debug(TAG, " " + String.format("overwriting existing server salt %016x", previousValue));
}
Gdx.app.debug(TAG, " " + String.format("server salt=%016x", serverSalt));
} else {
serverSalt = serverSalts.get(from);
}
long salt = clientSalt ^ serverSalt;
xor.put(from, salt);
Gdx.app.debug(TAG, " " + String.format("salt=%016x", salt));
}
@ChannelHandler.Sharable
static class ConnectionLimiter extends ChannelInboundHandlerAdapter {
static final String TAG = "ConnectionLimiter";
final AtomicInteger connections = new AtomicInteger();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
int count = connections.incrementAndGet();
if (count <= MAX_CLIENTS) {
Gdx.app.debug(TAG, String.format("connection accepted. %d / %d", count, MAX_CLIENTS));
super.channelActive(ctx);
} else {
ctx.close();
Gdx.app.debug(TAG, String.format("closing connection. maximum concurrent connections reached %d / %d", count, MAX_CLIENTS));
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
int count = connections.decrementAndGet();
Gdx.app.debug(TAG, String.format("connection closed. %d / %d", count, MAX_CLIENTS));
}
}
}

View File

@ -0,0 +1,110 @@
package com.riiablo.server.d2gs;
import com.google.flatbuffers.FlatBufferBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
import com.badlogic.gdx.Application;
import com.badlogic.gdx.ApplicationAdapter;
import com.badlogic.gdx.Gdx;
import com.badlogic.gdx.backends.headless.HeadlessApplication;
import com.badlogic.gdx.backends.headless.HeadlessApplicationConfiguration;
import com.badlogic.gdx.math.MathUtils;
import com.riiablo.codec.Animation;
import com.riiablo.net.EndpointedChannelHandler;
import com.riiablo.net.PacketProcessor;
import com.riiablo.net.UnicastEndpoint;
import com.riiablo.net.packet.netty.Connection;
import com.riiablo.net.packet.netty.Netty;
import com.riiablo.net.packet.netty.NettyData;
import com.riiablo.net.reliable.QoS;
import com.riiablo.net.tcp.TcpEndpoint;
public class TestClient extends ApplicationAdapter implements PacketProcessor {
private static final String TAG = "Client";
public static void main(String[] args) throws Exception {
Thread.sleep(1000);
HeadlessApplicationConfiguration config = new HeadlessApplicationConfiguration();
config.renderInterval = Animation.FRAME_DURATION;
new HeadlessApplication(new TestClient(), config);
}
private UnicastEndpoint<?> endpoint;
private EventLoopGroup group;
private long pingSent;
@Override
public void create() {
Gdx.app.setLogLevel(Application.LOG_DEBUG);
group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
UnicastEndpoint<ByteBuf> endpoint = new TcpEndpoint(ch, TestClient.this);
TestClient.this.endpoint = endpoint;
ch.pipeline()
.addLast(new EndpointedChannelHandler<>(ByteBuf.class, endpoint))
;
}
});
ChannelFuture f = b.connect("localhost", Main.PORT).sync();
sendPacket();
Thread.currentThread().sleep(1);
sendPacket();
} catch (Throwable t) {
Gdx.app.error(TAG, t.getMessage(), t);
Gdx.app.exit();
}
}
private void sendPacket() {
InetSocketAddress remoteAddress = (InetSocketAddress) endpoint.channel().remoteAddress();
Gdx.app.log(TAG, "Sending Connection packet to " + remoteAddress.getHostString() + ":" + remoteAddress.getPort());
long salt = MathUtils.random.nextLong();
FlatBufferBuilder builder = new FlatBufferBuilder();
Connection.startConnection(builder);
Connection.addSalt(builder, salt);
int dataOffset = Connection.endConnection(builder);
int offset = Netty.createNetty(builder, salt, NettyData.Connection, dataOffset);
Netty.finishSizePrefixedNettyBuffer(builder, offset);
endpoint.sendMessage(QoS.Unreliable, builder.dataBuffer());
}
@Override
public void render() {
endpoint.update(Gdx.graphics.getDeltaTime());
}
@Override
public void dispose() {
if (!group.isShuttingDown()) group.shutdownGracefully();
}
@Override
public void processPacket(ChannelHandlerContext ctx, ByteBuf bb) {
Gdx.app.debug(TAG, "Processing packet...");
Gdx.app.log(TAG, ByteBufUtil.hexDump(bb));
}
}