Added networking support for monster entities

Added networking support for dynamic object entities
Fixed issues with multiple packets in stream by prepending packet size
Fixed connection protocol for clients to wait until response received
Created Box2DDisposer system to destroy box2d bodies when an entity has Box2DBody removed
Created EntitySystemAdapter to simplify systems which are aspect event based (insert, removed, etc)
Removed Entity reference from Connection and changed to entity id instead
Renamed transform and alpha flag variables to tFlags and aFlags to remain consistent with elsewhere
Added DS1ObjectWrapperP to network DS1 objects (DS1 object reference is needed only for server-side/host)
This commit is contained in:
Collin Smith
2019-12-24 02:22:50 -08:00
parent 6b729c4350
commit b8a5049adb
10 changed files with 180 additions and 76 deletions

View File

@ -1,5 +1,6 @@
package com.riiablo.server.d2gs;
import com.google.flatbuffers.ByteBufferUtil;
import com.google.flatbuffers.FlatBufferBuilder;
import com.artemis.ComponentMapper;
@ -146,6 +147,7 @@ public class D2GS extends ApplicationAdapter {
ThreadGroup clientThreads;
final Client[] clients = new Client[MAX_CLIENTS];
int numClients = 0;
int connected = 0;
final BlockingQueue<Packet> packets = new ArrayBlockingQueue<>(32);
final Collection<Packet> cache = new ArrayList<>();
@ -162,7 +164,6 @@ public class D2GS extends ApplicationAdapter {
EntityFactory factory;
MapManager mapManager;
NetworkSynchronizer sync;
SerializationManager serializer;
protected ComponentMapper<Networked> mNetworked;
@ -281,6 +282,7 @@ public class D2GS extends ApplicationAdapter {
Gdx.app.log(TAG, "assigned " + socket.getRemoteAddress() + " to " + id);
Client client = clients[id] = new Client(id, socket);
numClients++;
connected |= (1 << id);
client.start();
}
} catch (Throwable t) {
@ -302,6 +304,7 @@ public class D2GS extends ApplicationAdapter {
}
}
numClients = 0;
connected = 0;
}
Gdx.app.log(TAG, "killing thread...");
@ -339,12 +342,12 @@ public class D2GS extends ApplicationAdapter {
for (Packet packet : cache) {
Gdx.app.log(TAG, "dispatching " + D2GSData.name(packet.data.dataType()) + " packet to " + String.format("0x%08X", packet.id));
for (int i = 0, flag = 1; i < MAX_CLIENTS; i++, flag <<= 1) {
if ((packet.id & flag) == flag) {
if ((packet.id & flag) == flag && (connected & flag) == flag) {
Client client = clients[i];
if (client == null) continue;
try {
System.out.println(" dispatching packet to " + i);
client.send(packet.data);
client.send(packet);
} catch (Throwable t) {
Gdx.app.error(TAG, t.getMessage(), t);
}
@ -400,10 +403,8 @@ public class D2GS extends ApplicationAdapter {
Connection.addEntityId(builder, entityId);
int connectionOffset = Connection.endConnection(builder);
int offset = com.riiablo.net.packet.d2gs.D2GS.createD2GS(builder, D2GSData.Connection, connectionOffset);
builder.finish(offset);
ByteBuffer buffer = builder.dataBuffer();
com.riiablo.net.packet.d2gs.D2GS responseData = com.riiablo.net.packet.d2gs.D2GS.getRootAsD2GS(buffer);
Packet response = Packet.obtain(1 << packet.id, responseData);
com.riiablo.net.packet.d2gs.D2GS.finishSizePrefixedD2GSBuffer(builder, offset);
Packet response = Packet.obtain(1 << packet.id, builder.dataBuffer());
outPackets.offer(response);
Synchronize(packet.id, entityId);
@ -439,10 +440,9 @@ public class D2GS extends ApplicationAdapter {
Connection.addCofTransforms(builder, transformsOffset);
int connectionOffset = Connection.endConnection(builder);
int offset = com.riiablo.net.packet.d2gs.D2GS.createD2GS(builder, D2GSData.Connection, connectionOffset);
builder.finish(offset);
com.riiablo.net.packet.d2gs.D2GS data = com.riiablo.net.packet.d2gs.D2GS.getRootAsD2GS(builder.dataBuffer());
com.riiablo.net.packet.d2gs.D2GS.finishSizePrefixedD2GSBuffer(builder, offset);
Packet broadcast = Packet.obtain(~(1 << id), data);
Packet broadcast = Packet.obtain(~(1 << id), builder.dataBuffer());
boolean success = outPackets.offer(broadcast);
assert success;
}
@ -453,10 +453,8 @@ public class D2GS extends ApplicationAdapter {
FlatBufferBuilder builder = new FlatBufferBuilder();
int disconnectOffset = Disconnect.createDisconnect(builder, entityId);
int offset = com.riiablo.net.packet.d2gs.D2GS.createD2GS(builder, D2GSData.Disconnect, disconnectOffset);
builder.finish(offset);
ByteBuffer buffer = builder.dataBuffer();
com.riiablo.net.packet.d2gs.D2GS responseData = com.riiablo.net.packet.d2gs.D2GS.getRootAsD2GS(buffer);
Packet broadcast = Packet.obtain(~(1 << id), responseData);
com.riiablo.net.packet.d2gs.D2GS.finishSizePrefixedD2GSBuffer(builder, offset);
Packet broadcast = Packet.obtain(~(1 << id), builder.dataBuffer());
outPackets.offer(broadcast);
world.delete(entityId);
@ -464,6 +462,7 @@ public class D2GS extends ApplicationAdapter {
synchronized (clients) {
clients[id] = null;
numClients--;
connected &= ~(1 << id);
}
}
@ -492,10 +491,11 @@ public class D2GS extends ApplicationAdapter {
this.socket = socket;
}
public void send(com.riiablo.net.packet.d2gs.D2GS data) throws IOException {
ByteBuffer buffer = data.getByteBuffer();
public void send(Packet packet) throws IOException {
WritableByteChannel out = Channels.newChannel(socket.getOutputStream());
out.write(buffer);
packet.buffer.mark();
out.write(packet.buffer);
packet.buffer.reset();
}
@Override
@ -513,9 +513,9 @@ public class D2GS extends ApplicationAdapter {
buffer.reset();
ByteBuffer copy = (ByteBuffer) ByteBuffer.wrap(new byte[buffer.limit()]).put(buffer).rewind();
com.riiablo.net.packet.d2gs.D2GS data = com.riiablo.net.packet.d2gs.D2GS.getRootAsD2GS(copy);
Gdx.app.log(TAG, "received " + D2GSData.name(data.dataType()) + " packet from " + socket.getRemoteAddress());
boolean success = packets.offer(Packet.obtain(id, data), 5, TimeUnit.MILLISECONDS);
Packet packet = Packet.obtain(id, copy);
Gdx.app.log(TAG, "received " + D2GSData.name(packet.data.dataType()) + " packet from " + socket.getRemoteAddress());
boolean success = packets.offer(packet, 5, TimeUnit.MILLISECONDS);
if (!success) {
Gdx.app.log(TAG, "failed to add to queue -- closing " + socket.getRemoteAddress());
kill = true;
@ -533,13 +533,15 @@ public class D2GS extends ApplicationAdapter {
}
public static class Packet {
int id;
com.riiablo.net.packet.d2gs.D2GS data;
public int id;
public ByteBuffer buffer;
public com.riiablo.net.packet.d2gs.D2GS data;
public static Packet obtain(int id, com.riiablo.net.packet.d2gs.D2GS data) {
public static Packet obtain(int id, ByteBuffer buffer) {
Packet packet = new Packet();
packet.id = id;
packet.data = data;
packet.buffer = buffer;
packet.data = com.riiablo.net.packet.d2gs.D2GS.getRootAsD2GS(ByteBufferUtil.removeSizePrefix(buffer));
return packet;
}
}

View File

@ -12,6 +12,7 @@ import com.riiablo.engine.server.component.Networked;
import com.riiablo.net.packet.d2gs.D2GS;
import com.riiablo.net.packet.d2gs.D2GSData;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
@All(Networked.class)
@ -33,18 +34,19 @@ public class NetworkSynchronizer extends IteratingSystem {
@Override
protected void process(int entityId) {
D2GS sync = sync(entityId);
ByteBuffer sync = sync(entityId);
int id = players.findKey(entityId, -1);
boolean success = outPackets.offer(com.riiablo.server.d2gs.D2GS.Packet.obtain(id != -1 ? ~(1 << id) : 0xFFFFFFFF, sync));
boolean success = outPackets.offer(com.riiablo.server.d2gs.D2GS.Packet
.obtain(id != -1 ? ~(1 << id) : 0xFFFFFFFF, sync));
assert success;
}
public D2GS sync(int entityId) {
public ByteBuffer sync(int entityId) {
FlatBufferBuilder builder = new FlatBufferBuilder(0);
int syncOffset = serializer.serialize(builder, entityId);
int root = D2GS.createD2GS(builder, D2GSData.Sync, syncOffset);
builder.finish(root);
return D2GS.getRootAsD2GS(builder.dataBuffer());
D2GS.finishSizePrefixedD2GSBuffer(builder, root);
return builder.dataBuffer();
}
public void sync(int entityId, D2GS packet) {