diff --git a/core/src/com/riiablo/screen/NetworkedGameScreen.java b/core/src/com/riiablo/screen/NetworkedGameScreen.java index 3944a6b3..eef5c54c 100644 --- a/core/src/com/riiablo/screen/NetworkedGameScreen.java +++ b/core/src/com/riiablo/screen/NetworkedGameScreen.java @@ -174,7 +174,6 @@ public class NetworkedGameScreen extends GameScreen { int flags2 = Dirty.NONE; Gdx.app.log(TAG, "syncing " + entityId); for (int i = 0, len = s.dataTypeLength(); i < len; i++) { - System.out.println(SyncData.name(s.dataType(i))); switch (s.dataType(i)) { case SyncData.CofComponents: { com.riiablo.net.packet.d2gs.CofComponents data = (com.riiablo.net.packet.d2gs.CofComponents) s.data(new com.riiablo.net.packet.d2gs.CofComponents(), i); @@ -197,6 +196,8 @@ public class NetworkedGameScreen extends GameScreen { } break; } + default: + Gdx.app.error(TAG, "Unknown packet type: " + SyncData.name(s.dataType(i))); } } diff --git a/server/d2gs/src/com/riiablo/server/d2gs/D2GS.java b/server/d2gs/src/com/riiablo/server/d2gs/D2GS.java index c1557659..d7496b7c 100644 --- a/server/d2gs/src/com/riiablo/server/d2gs/D2GS.java +++ b/server/d2gs/src/com/riiablo/server/d2gs/D2GS.java @@ -50,6 +50,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; +import org.apache.commons.lang3.ArrayUtils; import java.io.IOException; import java.net.InetAddress; @@ -65,8 +66,7 @@ import java.util.Calendar; import java.util.Collection; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.TimeUnit; public class D2GS extends ApplicationAdapter { private static final String TAG = "D2GS"; @@ -131,9 +131,10 @@ public class D2GS extends ApplicationAdapter { ServerSocket server; Thread connectionListener; - AtomicBoolean kill; + volatile boolean kill = false; ThreadGroup clientThreads; - CopyOnWriteArrayList CLIENTS = new CopyOnWriteArrayList<>(); + final Client[] clients = new Client[MAX_CLIENTS]; + int numClients = 0; final BlockingQueue packets = new ArrayBlockingQueue<>(32); final Collection cache = new ArrayList<>(); @@ -230,34 +231,45 @@ public class D2GS extends ApplicationAdapter { Gdx.app.log(TAG, "Starting server..."); server = Gdx.net.newServerSocket(Net.Protocol.TCP, PORT, null); - kill = new AtomicBoolean(false); connectionListener = new Thread(new Runnable() { @Override public void run() { - while (!kill.get()) { + while (!kill) { Gdx.app.log(TAG, "waiting..."); Socket socket = server.accept(null); Gdx.app.log(TAG, "connection from " + socket.getRemoteAddress()); - if (CLIENTS.size() >= MAX_CLIENTS) { + if (numClients >= MAX_CLIENTS) { + // TODO: send server is full message socket.dispose(); } else { try { - int id = CLIENTS.size(); - Gdx.app.log(TAG, "assigned " + id); - Client client = new Client(id, socket); - CLIENTS.add(client); - client.start(); - } catch (Throwable ignored) { + synchronized (clients) { + int id = ArrayUtils.indexOf(clients, null); + assert id != ArrayUtils.INDEX_NOT_FOUND : "numClients=" + numClients + " but no index available"; + Gdx.app.log(TAG, "assigned " + socket.getRemoteAddress() + " to " + id); + Client client = clients[id] = new Client(id, socket); + numClients++; + client.start(); + } + } catch (Throwable t) { + Gdx.app.error(TAG, t.getMessage(), t); socket.dispose(); } } } Gdx.app.log(TAG, "killing child threads..."); - for (Client client : CLIENTS) { - if (client != null) { - client.kill.set(true); + synchronized (clients) { + for (Client client : clients) { + if (client != null) { + client.kill = true; + client.socket.dispose(); + try { + client.join(); + } catch (Throwable ignored) {} + } } + numClients = 0; } Gdx.app.log(TAG, "killing thread..."); @@ -270,7 +282,7 @@ public class D2GS extends ApplicationAdapter { @Override public void dispose() { Gdx.app.log(TAG, "Shutting down..."); - kill.set(true); + kill = true; server.dispose(); try { connectionListener.join(); @@ -293,32 +305,20 @@ public class D2GS extends ApplicationAdapter { cache.clear(); outPackets.drainTo(cache); for (Packet packet : cache) { - Gdx.app.log(TAG, "dispatching packet to " + packet.id); - if (packet.id == -1) { - for (Client client : CLIENTS) { + Gdx.app.log(TAG, "dispatching " + D2GSData.name(packet.data.dataType()) + " packet to " + String.format("0x%08X", packet.id)); + for (int i = 0, flag = 1; i < MAX_CLIENTS; i++, flag <<= 1) { + if ((packet.id & flag) == flag) { + Client client = clients[i]; + if (client == null) continue; try { - System.out.println(" dispatching packet to " + client.id); + System.out.println(" dispatching packet to " + i); client.send(packet.data); } catch (Throwable t) { Gdx.app.error(TAG, t.getMessage(), t); } } - } else { - for (int i = 0, flag = 1; i < MAX_CLIENTS; i++, flag <<= 1) { - if ((packet.id & flag) == flag && i < CLIENTS.size()) { - try { - System.out.println(" dispatching packet to " + i); - CLIENTS.get(i).send(packet.data); - } catch (Throwable t) { - Gdx.app.error(TAG, t.getMessage(), t); - } - } - } } } - - for (Client client : CLIENTS) { - } } private void process(Packet packet) { @@ -338,7 +338,7 @@ public class D2GS extends ApplicationAdapter { Connection connection = (Connection) packet.data.data(new Connection()); String charName = connection.charName(); int charClass = connection.charClass(); - Gdx.app.log(TAG, "Connection from " + CLIENTS.get(packet.id).socket.getRemoteAddress() + " : " + charName); + Gdx.app.log(TAG, "Connection from " + clients[packet.id].socket.getRemoteAddress() + " : " + charName); byte[] cofComponents = new byte[16]; connection.cofComponentsAsByteBuffer().get(cofComponents); @@ -422,6 +422,13 @@ public class D2GS extends ApplicationAdapter { com.riiablo.net.packet.d2gs.D2GS responseData = com.riiablo.net.packet.d2gs.D2GS.getRootAsD2GS(buffer); Packet broadcast = Packet.obtain(~(1 << id), responseData); outPackets.offer(broadcast); + + world.delete(entityId); + player.put(id, Engine.INVALID_ENTITY); + synchronized (clients) { + clients[id] = null; + numClients--; + } } private void Synchronize(Packet packet) { @@ -439,13 +446,16 @@ public class D2GS extends ApplicationAdapter { } private class Client extends Thread { + final String TAG; + int id; Socket socket; ByteBuffer buffer = ByteBuffer.allocate(4096); - AtomicBoolean kill = new AtomicBoolean(false); + volatile boolean kill = false; Client(int id, Socket socket) { super(clientThreads, generateClientName()); + TAG = D2GS.TAG + "{" + id + "}"; this.id = id; this.socket = socket; } @@ -458,13 +468,13 @@ public class D2GS extends ApplicationAdapter { @Override public void run() { - while (!kill.get()) { + while (!kill) { try { buffer.clear(); buffer.mark(); ReadableByteChannel in = Channels.newChannel(socket.getInputStream()); if (in.read(buffer) == -1) { - kill.set(true); + kill = true; break; } buffer.limit(buffer.position()); @@ -472,21 +482,20 @@ public class D2GS extends ApplicationAdapter { ByteBuffer copy = (ByteBuffer) ByteBuffer.wrap(new byte[buffer.limit()]).put(buffer).rewind(); com.riiablo.net.packet.d2gs.D2GS data = com.riiablo.net.packet.d2gs.D2GS.getRootAsD2GS(copy); - Gdx.app.log(TAG, "packet type " + D2GSData.name(data.dataType())); - boolean success = packets.offer(Packet.obtain(id, data)); + Gdx.app.log(TAG, "received " + D2GSData.name(data.dataType()) + " packet from " + socket.getRemoteAddress()); + boolean success = packets.offer(Packet.obtain(id, data), 5, TimeUnit.MILLISECONDS); if (!success) { - Gdx.app.log(TAG, "queue full -- kicking client"); - kill.set(true); + Gdx.app.log(TAG, "failed to add to queue -- closing " + socket.getRemoteAddress()); + kill = true; } } catch (Throwable t) { Gdx.app.log(TAG, t.getMessage(), t); - kill.set(true); + kill = true; } } - Gdx.app.log(TAG, "closing socket..."); + Gdx.app.log(TAG, "closing socket to " + socket.getRemoteAddress()); if (socket != null) socket.dispose(); - CLIENTS.remove(this); Disconnect(id); } } diff --git a/server/d2gs/src/com/riiablo/server/d2gs/NetworkSynchronizer.java b/server/d2gs/src/com/riiablo/server/d2gs/NetworkSynchronizer.java index 5c7eb978..8622b023 100644 --- a/server/d2gs/src/com/riiablo/server/d2gs/NetworkSynchronizer.java +++ b/server/d2gs/src/com/riiablo/server/d2gs/NetworkSynchronizer.java @@ -41,7 +41,7 @@ public class NetworkSynchronizer extends IteratingSystem { @Override protected void process(int entityId) { com.riiablo.net.packet.d2gs.D2GS sync = sync(entityId); - int id = player.get(entityId, -1); + int id = player.findKey(entityId, -1); assert id != -1; boolean success = outPackets.offer(D2GS.Packet.obtain(~(1 << id), sync)); assert success; @@ -89,9 +89,8 @@ public class NetworkSynchronizer extends IteratingSystem { } public void sync(int entityId, Sync sync) { - Gdx.app.log(TAG, "syncing " + sync.entityId()); + Gdx.app.log(TAG, "syncing " + entityId); for (int i = 0, len = sync.dataTypeLength(); i < len; i++) { - System.out.println(SyncData.name(sync.dataType(i))); switch (sync.dataType(i)) { case SyncData.CofComponents: { int[] component = mCofComponents.get(entityId).component; @@ -120,6 +119,8 @@ public class NetworkSynchronizer extends IteratingSystem { Gdx.app.log(TAG, " " + Arrays.toString(alpha)); break; } + default: + Gdx.app.error(TAG, "Unknown packet type: " + SyncData.name(sync.dataType(i))); } } }