Changed PacketProcessor references to D2GSPacketProcessor

D2GSPacketProcessor can process the packet type directly
Server will process the ByteBuf into the packet type already and pass it
Moved packet cache and queues fields to D2GSPacketProcessor
Created D2GSPacket to store the flatbuffer table
This commit is contained in:
Collin Smith 2020-06-29 00:53:05 -07:00
parent 1af1fd968e
commit e76fe35f49
4 changed files with 35 additions and 17 deletions

View File

@ -0,0 +1,22 @@
package com.riiablo.server.d2gs;
import com.google.flatbuffers.Table;
import java.nio.ByteBuffer;
import com.badlogic.gdx.utils.TimeUtils;
public class D2GSPacket<T extends Table> {
public int id;
public long time;
public ByteBuffer buffer;
public T data;
public static <T extends Table> D2GSPacket<T> obtain(int id, T fb) {
D2GSPacket<T> packet = new D2GSPacket<>();
packet.id = id;
packet.time = TimeUtils.millis();
packet.buffer = fb.getByteBuffer();
packet.data = fb;
return packet;
}
}

View File

@ -4,6 +4,10 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import com.badlogic.gdx.Gdx;
@ -14,6 +18,10 @@ import com.riiablo.nnet.PacketProcessor;
public class D2GSPacketProcessor implements PacketProcessor {
private static final String TAG = "D2GSPacketProcessor";
final BlockingQueue<D2GSPacket<Netty>> packets = new ArrayBlockingQueue<>(32);
final Collection<D2GSPacket<Netty>> cache = new ArrayList<>(1024);
final BlockingQueue<D2GSPacket<Netty>> outPackets = new ArrayBlockingQueue<>(1024);
@Override
public void processPacket(ChannelHandlerContext ctx, SocketAddress from, ByteBuf bb) {
Gdx.app.debug(TAG, "Processing packet from " + from + "...");
@ -26,8 +34,6 @@ public class D2GSPacketProcessor implements PacketProcessor {
switch (netty.dataType()) {
default:
Gdx.app.debug(TAG, "unknown data type: " + netty.dataType());
Gdx.app.debug(TAG, " " + "closing " + ctx);
ctx.close();
}
}
}

View File

@ -60,7 +60,6 @@ import com.riiablo.map.DT1Loader;
import com.riiablo.map.Map;
import com.riiablo.map.MapManager;
import com.riiablo.mpq.MPQFileHandleResolver;
import com.riiablo.nnet.PacketProcessor;
public class Main extends ApplicationAdapter {
private static final String TAG = "Main";
@ -131,7 +130,7 @@ public class Main extends ApplicationAdapter {
AtomicBoolean kill;
Server server;
PacketProcessor packetProcessor;
D2GSPacketProcessor packetProcessor;
World world;
Map map;
@ -227,7 +226,7 @@ public class Main extends ApplicationAdapter {
.register("map", map)
.register("factory", factory)
.register("player", server.player)
.register("outPackets", server.outPackets)
.register("outPackets", packetProcessor.outPackets)
;
Riiablo.engine = world = new World(config);

View File

@ -19,11 +19,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.Validate;
@ -47,7 +43,7 @@ public class Server implements PacketProcessor {
private final InetAddress address;
private final int port;
private final PacketProcessor packetProcessor;
private final D2GSPacketProcessor packetProcessor;
private ChannelFuture future;
private ServerBootstrap bootstrap;
@ -82,15 +78,9 @@ public class Server implements PacketProcessor {
};
}
// final BlockingQueue<Packet> packets = new ArrayBlockingQueue<>(32);
// final Collection<Packet> cache = new ArrayList<>(1024);
// final BlockingQueue<Packet> outPackets = new ArrayBlockingQueue<>(1024);
final BlockingQueue packets = new ArrayBlockingQueue<>(32);
final Collection cache = new ArrayList<>(1024);
final BlockingQueue outPackets = new ArrayBlockingQueue<>(1024);
final IntIntMap player = new IntIntMap();
public Server(InetAddress address, int port, PacketProcessor packetProcessor) {
public Server(InetAddress address, int port, D2GSPacketProcessor packetProcessor) {
this.address = address;
this.port = port;
this.packetProcessor = packetProcessor;
@ -203,6 +193,7 @@ public class Server implements PacketProcessor {
break;
default:
Gdx.app.debug(TAG, " " + "not connection-related. propagating to " + packetProcessor);
packetProcessor.processPacket(ctx, from, netty);
}
}