Improved D2GS client implementation

D2GS clients are now indexed using a fixed-size array
Connecting clients will now use the first available slot
Disconnecting clients will now have their corresponding entity deleted
Shutting down server will now wait for client threads to end
Removed separate iteration method for broadcast packets -- I don't think this distinction is necessary
Refactored AtomicBoolean with volatile boolean
Debug logging has been significantly been improved to indicate clients / packet types / remote addresses
This commit is contained in:
Collin Smith 2019-12-14 03:55:23 -08:00
parent 34eebb41aa
commit 15687b71c6
3 changed files with 61 additions and 50 deletions

View File

@ -174,7 +174,6 @@ public class NetworkedGameScreen extends GameScreen {
int flags2 = Dirty.NONE;
Gdx.app.log(TAG, "syncing " + entityId);
for (int i = 0, len = s.dataTypeLength(); i < len; i++) {
System.out.println(SyncData.name(s.dataType(i)));
switch (s.dataType(i)) {
case SyncData.CofComponents: {
com.riiablo.net.packet.d2gs.CofComponents data = (com.riiablo.net.packet.d2gs.CofComponents) s.data(new com.riiablo.net.packet.d2gs.CofComponents(), i);
@ -197,6 +196,8 @@ public class NetworkedGameScreen extends GameScreen {
}
break;
}
default:
Gdx.app.error(TAG, "Unknown packet type: " + SyncData.name(s.dataType(i)));
}
}

View File

@ -50,6 +50,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.ArrayUtils;
import java.io.IOException;
import java.net.InetAddress;
@ -65,8 +66,7 @@ import java.util.Calendar;
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.TimeUnit;
public class D2GS extends ApplicationAdapter {
private static final String TAG = "D2GS";
@ -131,9 +131,10 @@ public class D2GS extends ApplicationAdapter {
ServerSocket server;
Thread connectionListener;
AtomicBoolean kill;
volatile boolean kill = false;
ThreadGroup clientThreads;
CopyOnWriteArrayList<Client> CLIENTS = new CopyOnWriteArrayList<>();
final Client[] clients = new Client[MAX_CLIENTS];
int numClients = 0;
final BlockingQueue<Packet> packets = new ArrayBlockingQueue<>(32);
final Collection<Packet> cache = new ArrayList<>();
@ -230,34 +231,45 @@ public class D2GS extends ApplicationAdapter {
Gdx.app.log(TAG, "Starting server...");
server = Gdx.net.newServerSocket(Net.Protocol.TCP, PORT, null);
kill = new AtomicBoolean(false);
connectionListener = new Thread(new Runnable() {
@Override
public void run() {
while (!kill.get()) {
while (!kill) {
Gdx.app.log(TAG, "waiting...");
Socket socket = server.accept(null);
Gdx.app.log(TAG, "connection from " + socket.getRemoteAddress());
if (CLIENTS.size() >= MAX_CLIENTS) {
if (numClients >= MAX_CLIENTS) {
// TODO: send server is full message
socket.dispose();
} else {
try {
int id = CLIENTS.size();
Gdx.app.log(TAG, "assigned " + id);
Client client = new Client(id, socket);
CLIENTS.add(client);
client.start();
} catch (Throwable ignored) {
synchronized (clients) {
int id = ArrayUtils.indexOf(clients, null);
assert id != ArrayUtils.INDEX_NOT_FOUND : "numClients=" + numClients + " but no index available";
Gdx.app.log(TAG, "assigned " + socket.getRemoteAddress() + " to " + id);
Client client = clients[id] = new Client(id, socket);
numClients++;
client.start();
}
} catch (Throwable t) {
Gdx.app.error(TAG, t.getMessage(), t);
socket.dispose();
}
}
}
Gdx.app.log(TAG, "killing child threads...");
for (Client client : CLIENTS) {
if (client != null) {
client.kill.set(true);
synchronized (clients) {
for (Client client : clients) {
if (client != null) {
client.kill = true;
client.socket.dispose();
try {
client.join();
} catch (Throwable ignored) {}
}
}
numClients = 0;
}
Gdx.app.log(TAG, "killing thread...");
@ -270,7 +282,7 @@ public class D2GS extends ApplicationAdapter {
@Override
public void dispose() {
Gdx.app.log(TAG, "Shutting down...");
kill.set(true);
kill = true;
server.dispose();
try {
connectionListener.join();
@ -293,32 +305,20 @@ public class D2GS extends ApplicationAdapter {
cache.clear();
outPackets.drainTo(cache);
for (Packet packet : cache) {
Gdx.app.log(TAG, "dispatching packet to " + packet.id);
if (packet.id == -1) {
for (Client client : CLIENTS) {
Gdx.app.log(TAG, "dispatching " + D2GSData.name(packet.data.dataType()) + " packet to " + String.format("0x%08X", packet.id));
for (int i = 0, flag = 1; i < MAX_CLIENTS; i++, flag <<= 1) {
if ((packet.id & flag) == flag) {
Client client = clients[i];
if (client == null) continue;
try {
System.out.println(" dispatching packet to " + client.id);
System.out.println(" dispatching packet to " + i);
client.send(packet.data);
} catch (Throwable t) {
Gdx.app.error(TAG, t.getMessage(), t);
}
}
} else {
for (int i = 0, flag = 1; i < MAX_CLIENTS; i++, flag <<= 1) {
if ((packet.id & flag) == flag && i < CLIENTS.size()) {
try {
System.out.println(" dispatching packet to " + i);
CLIENTS.get(i).send(packet.data);
} catch (Throwable t) {
Gdx.app.error(TAG, t.getMessage(), t);
}
}
}
}
}
for (Client client : CLIENTS) {
}
}
private void process(Packet packet) {
@ -338,7 +338,7 @@ public class D2GS extends ApplicationAdapter {
Connection connection = (Connection) packet.data.data(new Connection());
String charName = connection.charName();
int charClass = connection.charClass();
Gdx.app.log(TAG, "Connection from " + CLIENTS.get(packet.id).socket.getRemoteAddress() + " : " + charName);
Gdx.app.log(TAG, "Connection from " + clients[packet.id].socket.getRemoteAddress() + " : " + charName);
byte[] cofComponents = new byte[16];
connection.cofComponentsAsByteBuffer().get(cofComponents);
@ -422,6 +422,13 @@ public class D2GS extends ApplicationAdapter {
com.riiablo.net.packet.d2gs.D2GS responseData = com.riiablo.net.packet.d2gs.D2GS.getRootAsD2GS(buffer);
Packet broadcast = Packet.obtain(~(1 << id), responseData);
outPackets.offer(broadcast);
world.delete(entityId);
player.put(id, Engine.INVALID_ENTITY);
synchronized (clients) {
clients[id] = null;
numClients--;
}
}
private void Synchronize(Packet packet) {
@ -439,13 +446,16 @@ public class D2GS extends ApplicationAdapter {
}
private class Client extends Thread {
final String TAG;
int id;
Socket socket;
ByteBuffer buffer = ByteBuffer.allocate(4096);
AtomicBoolean kill = new AtomicBoolean(false);
volatile boolean kill = false;
Client(int id, Socket socket) {
super(clientThreads, generateClientName());
TAG = D2GS.TAG + "{" + id + "}";
this.id = id;
this.socket = socket;
}
@ -458,13 +468,13 @@ public class D2GS extends ApplicationAdapter {
@Override
public void run() {
while (!kill.get()) {
while (!kill) {
try {
buffer.clear();
buffer.mark();
ReadableByteChannel in = Channels.newChannel(socket.getInputStream());
if (in.read(buffer) == -1) {
kill.set(true);
kill = true;
break;
}
buffer.limit(buffer.position());
@ -472,21 +482,20 @@ public class D2GS extends ApplicationAdapter {
ByteBuffer copy = (ByteBuffer) ByteBuffer.wrap(new byte[buffer.limit()]).put(buffer).rewind();
com.riiablo.net.packet.d2gs.D2GS data = com.riiablo.net.packet.d2gs.D2GS.getRootAsD2GS(copy);
Gdx.app.log(TAG, "packet type " + D2GSData.name(data.dataType()));
boolean success = packets.offer(Packet.obtain(id, data));
Gdx.app.log(TAG, "received " + D2GSData.name(data.dataType()) + " packet from " + socket.getRemoteAddress());
boolean success = packets.offer(Packet.obtain(id, data), 5, TimeUnit.MILLISECONDS);
if (!success) {
Gdx.app.log(TAG, "queue full -- kicking client");
kill.set(true);
Gdx.app.log(TAG, "failed to add to queue -- closing " + socket.getRemoteAddress());
kill = true;
}
} catch (Throwable t) {
Gdx.app.log(TAG, t.getMessage(), t);
kill.set(true);
kill = true;
}
}
Gdx.app.log(TAG, "closing socket...");
Gdx.app.log(TAG, "closing socket to " + socket.getRemoteAddress());
if (socket != null) socket.dispose();
CLIENTS.remove(this);
Disconnect(id);
}
}

View File

@ -41,7 +41,7 @@ public class NetworkSynchronizer extends IteratingSystem {
@Override
protected void process(int entityId) {
com.riiablo.net.packet.d2gs.D2GS sync = sync(entityId);
int id = player.get(entityId, -1);
int id = player.findKey(entityId, -1);
assert id != -1;
boolean success = outPackets.offer(D2GS.Packet.obtain(~(1 << id), sync));
assert success;
@ -89,9 +89,8 @@ public class NetworkSynchronizer extends IteratingSystem {
}
public void sync(int entityId, Sync sync) {
Gdx.app.log(TAG, "syncing " + sync.entityId());
Gdx.app.log(TAG, "syncing " + entityId);
for (int i = 0, len = sync.dataTypeLength(); i < len; i++) {
System.out.println(SyncData.name(sync.dataType(i)));
switch (sync.dataType(i)) {
case SyncData.CofComponents: {
int[] component = mCofComponents.get(entityId).component;
@ -120,6 +119,8 @@ public class NetworkSynchronizer extends IteratingSystem {
Gdx.app.log(TAG, " " + Arrays.toString(alpha));
break;
}
default:
Gdx.app.error(TAG, "Unknown packet type: " + SyncData.name(sync.dataType(i)));
}
}
}