Experimental MNet-2 backend

This commit is contained in:
Anuken 2019-08-22 22:49:22 -04:00
parent 6d72c08d63
commit 1880c32f90
15 changed files with 399 additions and 70 deletions

View File

@ -129,8 +129,8 @@ public class AndroidLauncher extends AndroidApplication{
}
config.hideStatusBar = true;
Net.setClientProvider(new ArcNetClient());
Net.setServerProvider(new ArcNetServer());
Net.setClientProvider(new MClient());
Net.setServerProvider(new MServer());
initialize(new Mindustry(), config);
checkFiles(getIntent());
}

View File

@ -229,9 +229,11 @@ project(":core"){
compileJava.dependsOn(preGen)
compile "org.lz4:lz4-java:1.4.1"
compile arcModule("arc-core")
compile arcModule("extensions:freetype")
compile arcModule("extensions:arcnet")
compile arcModule("extensions:mnet")
if(localArc() && debugged()) compile arcModule("extensions:recorder")
compileOnly project(":annotations")
@ -294,7 +296,6 @@ project(":net"){
dependencies{
compile project(":core")
compile "org.lz4:lz4-java:1.4.1"
}
}

View File

@ -67,6 +67,6 @@ public class Mindustry extends ApplicationCore{
super.init();
Log.info("Time to load [total]: {0}", Time.elapsed());
Events.fire(new GameLoadEvent());
Events.fire(new ClientLoadEvent());
}
}

View File

@ -1,7 +1,7 @@
package io.anuke.mindustry;
import io.anuke.arc.*;
import io.anuke.arc.Application.ApplicationType;
import io.anuke.arc.Core;
import io.anuke.arc.files.FileHandle;
import io.anuke.arc.graphics.Color;
import io.anuke.arc.util.Structs;
@ -15,6 +15,7 @@ import io.anuke.mindustry.entities.traits.DrawTrait;
import io.anuke.mindustry.entities.traits.SyncTrait;
import io.anuke.mindustry.entities.type.*;
import io.anuke.mindustry.game.*;
import io.anuke.mindustry.game.EventType.*;
import io.anuke.mindustry.gen.Serialization;
import io.anuke.mindustry.net.Net;
import io.anuke.mindustry.world.blocks.defense.ForceProjector.ShieldEntity;
@ -222,5 +223,7 @@ public class Vars{
customMapDirectory = dataDirectory.child("maps/");
saveDirectory = dataDirectory.child("saves/");
tmpDirectory = dataDirectory.child("tmp/");
Events.fire(new AppLoadEvent());
}
}

View File

@ -27,8 +27,13 @@ public class EventType{
}
}
/** Called when the game is first loaded. */
public static class GameLoadEvent{
/** Called when the client game is first loaded. */
public static class ClientLoadEvent{
}
/** Called when the core app is first loaded. */
public static class AppLoadEvent{
}

View File

@ -1,19 +1,18 @@
package io.anuke.mindustry.net;
import io.anuke.arc.Core;
import io.anuke.arc.*;
import io.anuke.arc.collection.*;
import io.anuke.arc.function.BiConsumer;
import io.anuke.arc.function.Consumer;
import io.anuke.arc.function.*;
import io.anuke.arc.util.*;
import io.anuke.arc.util.pooling.Pools;
import io.anuke.mindustry.core.Platform;
import io.anuke.mindustry.gen.Call;
import io.anuke.arc.util.pooling.*;
import io.anuke.mindustry.core.*;
import io.anuke.mindustry.gen.*;
import io.anuke.mindustry.net.Packets.*;
import io.anuke.mindustry.net.Streamable.StreamBuilder;
import io.anuke.mindustry.net.Streamable.*;
import net.jpountz.lz4.*;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.io.*;
import java.nio.*;
import static io.anuke.mindustry.Vars.*;
@ -28,6 +27,8 @@ public class Net{
private static ClientProvider clientProvider;
private static ServerProvider serverProvider;
private static IntMap<StreamBuilder> streams = new IntMap<>();
private static final LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor();
private static final LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor();
/** Display a network error. Call on the graphics thread. */
public static void showError(Throwable e){
@ -144,11 +145,11 @@ public class Net{
}
public static byte[] compressSnapshot(byte[] input){
return serverProvider.compressSnapshot(input);
return compressor.compress(input);
}
public static byte[] decompressSnapshot(byte[] input, int size){
return clientProvider.decompressSnapshot(input, size);
return decompressor.decompress(input, size);
}
/**
@ -354,9 +355,6 @@ public class Net{
/** Disconnect from the server. */
void disconnect();
/** Decompress an input snapshot byte array. */
byte[] decompressSnapshot(byte[] input, int size);
/**
* Discover servers. This should run the callback regardless of whether any servers are found. Should not block.
* Callback should be run on libGDX main thread.
@ -428,9 +426,6 @@ public class Net{
/** Close the server connection. */
void close();
/** Compress an input snapshot byte array. */
byte[] compressSnapshot(byte[] input);
/** Return all connected users. */
Iterable<? extends NetConnection> getConnections();

View File

@ -1,4 +1,5 @@
package io.anuke.mindustry.desktop;
import io.anuke.arc.Files.*;
import io.anuke.arc.backends.sdl.*;
import io.anuke.mindustry.*;
@ -11,8 +12,8 @@ public class DesktopLauncher{
try{
Platform.instance = new DesktopPlatform(arg);
Net.setClientProvider(new ArcNetClient());
Net.setServerProvider(new ArcNetServer());
Net.setClientProvider(new MClient());
Net.setServerProvider(new MServer());
new SdlApplication(new Mindustry(), new SdlConfig(){{
title = "Mindustry";

View File

@ -26,8 +26,8 @@ public class IOSLauncher extends IOSApplication.Delegate{
@Override
protected IOSApplication createApplication(){
Net.setClientProvider(new ArcNetClient());
Net.setServerProvider(new ArcNetServer());
Net.setClientProvider(new MClient());
Net.setServerProvider(new MServer());
if(UIDevice.getCurrentDevice().getUserInterfaceIdiom() == UIUserInterfaceIdiom.Pad){
UnitScl.dp.addition = 0.5f;

View File

@ -8,7 +8,6 @@ import io.anuke.arc.util.async.*;
import io.anuke.arc.util.pooling.*;
import io.anuke.mindustry.net.Net.*;
import io.anuke.mindustry.net.Packets.*;
import net.jpountz.lz4.*;
import java.io.*;
import java.net.*;
@ -20,7 +19,6 @@ import static io.anuke.mindustry.Vars.*;
public class ArcNetClient implements ClientProvider{
final Client client;
final Supplier<DatagramPacket> packetSupplier = () -> new DatagramPacket(new byte[256], 256);
final LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor();
public ArcNetClient(){
client = new Client(8192, 4096, new PacketSerializer());
@ -75,14 +73,6 @@ public class ArcNetClient implements ClientProvider{
}
}
@Override
public byte[] decompressSnapshot(byte[] input, int size){
byte[] result = new byte[size];
decompressor.decompress(input, result);
return result;
}
@Override
public void connect(String ip, int port, Runnable success){
Threads.daemon(() -> {

View File

@ -6,7 +6,6 @@ import io.anuke.arc.util.*;
import io.anuke.arc.util.async.*;
import io.anuke.mindustry.net.Net.*;
import io.anuke.mindustry.net.Packets.*;
import net.jpountz.lz4.*;
import java.io.*;
import java.nio.*;
@ -18,7 +17,6 @@ import static io.anuke.mindustry.Vars.*;
public class ArcNetServer implements ServerProvider{
final Server server;
final CopyOnWriteArrayList<ArcConnection> connections = new CopyOnWriteArrayList<>();
final LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor();
Thread serverThread;
public ArcNetServer(){
@ -87,11 +85,6 @@ public class ArcNetServer implements ServerProvider{
server.addListener(listener);
}
@Override
public byte[] compressSnapshot(byte[] input){
return compressor.compress(input);
}
@Override
public Iterable<ArcConnection> getConnections(){
return connections;

View File

@ -0,0 +1,177 @@
package io.anuke.mindustry.net;
import io.anuke.arc.*;
import io.anuke.arc.function.*;
import io.anuke.arc.util.async.*;
import io.anuke.mindustry.game.EventType.*;
import io.anuke.mindustry.net.Net.*;
import io.anuke.mindustry.net.Packets.*;
import io.anuke.mnet.*;
import io.anuke.mnet.MSocket;
import io.anuke.mnet.MSocketImpl;
import java.io.*;
import java.net.*;
import java.nio.*;
import java.util.*;
import static io.anuke.mindustry.Vars.*;
public class MClient implements ClientProvider, ApplicationListener{
MSocket socket;
public MClient(){
Events.on(AppLoadEvent.class, e -> {
Core.app.addListener(this);
});
}
public void connect(String ip, int port, Runnable success) throws IOException{
socket = new MSocketImpl(InetAddress.getByName(ip), port, new PacketSerializer());
socket.addDcListener((sock, reason) -> Core.app.post(() -> Net.handleClientReceived(new Disconnect())));
socket.connectAsync(null, 2000, response -> {
if(response.getType() == ResponseType.ACCEPTED){
Core.app.post(() -> {
success.run();
Net.handleClientReceived(new Connect());
});
}else if(response.getType() == ResponseType.WRONG_STATE){
Core.app.post(() -> Net.showError(new IOException("alreadyconnected")));
}else{
Core.app.post(() -> Net.showError(new IOException("connection refused")));
}
});
}
@Override
public void updatePing(){
}
@Override
public void dispose(){
disconnect();
}
public void send(Object object, SendMode mode){
if(mode == SendMode.tcp){
socket.send(object);
}else{
socket.sendUnreliable(object);
}
}
public void update(){
if(socket == null) return;
socket.update((sock, object) -> Core.app.post(() -> {
try{
Net.handleClientReceived(object);
}catch(Exception e){
Net.showError(e);
netClient.disconnectQuietly();
}
}));
}
public int getPing(){
return socket == null ? 0 : (int)socket.getPing();
}
public void disconnect(){
if(socket != null) socket.close();
}
public void discover(Consumer<Host> callback, Runnable done){
Threads.daemon(() -> {
byte[] bytes = new byte[512];
ByteBuffer buffer = ByteBuffer.wrap(bytes);
DatagramPacket packet = new DatagramPacket(bytes, bytes.length);
ArrayList<InetAddress> foundAddresses = new ArrayList<>();
try(DatagramSocket socket = new DatagramSocket()){
broadcast(port, socket);
socket.setSoTimeout(4000);
outer:
while(true){
try{
socket.receive(packet);
}catch(SocketTimeoutException ex){
done.run();
return;
}
buffer.position(0);
InetAddress address = ((InetSocketAddress)packet.getSocketAddress()).getAddress();
for(InetAddress other : foundAddresses){
if(other.equals(address) || (isLocal(other) && isLocal(address))){
continue outer;
}
}
Host host = NetworkIO.readServerData(address.getHostName(), buffer);
callback.accept(host);
foundAddresses.add(address);
}
}catch(IOException ex){
done.run();
}
});
}
public void pingHost(String address, int port, Consumer<Host> valid, Consumer<Exception> failed){
Threads.daemon(() -> {
try{
DatagramPacket packet = new DatagramPacket(new byte[512], 512);
DatagramSocket socket = new DatagramSocket();
socket.send(new DatagramPacket(new byte[]{-2}, 1, InetAddress.getByName(address), port));
socket.setSoTimeout(4000);
socket.receive(packet);
ByteBuffer buffer = ByteBuffer.wrap(packet.getData());
Host host = NetworkIO.readServerData(packet.getAddress().getHostAddress(), buffer);
Core.app.post(() -> valid.accept(host));
}catch(Exception e){
Core.app.post(() -> failed.accept(e));
}
});
}
private void broadcast (int udpPort, DatagramSocket socket) throws IOException{
byte[] data = {-2};
for (NetworkInterface iface : Collections.list(NetworkInterface.getNetworkInterfaces())){
for (InetAddress address : Collections.list(iface.getInetAddresses())){
byte[] ip = address.getAddress(); //255.255.255.255
try{
socket.send(new DatagramPacket(data, data.length, InetAddress.getByAddress(ip), udpPort));
}catch (Exception ignored){}
ip[3] = -1; //255.255.255.0
try{
socket.send(new DatagramPacket(data, data.length, InetAddress.getByAddress(ip), udpPort));
}catch (Exception ignored){}
ip[2] = -1; //255.255.0.0
try{
socket.send(new DatagramPacket(data, data.length, InetAddress.getByAddress(ip), udpPort));
}catch (Exception ignored){}
}
}
}
private boolean isLocal(InetAddress addr) {
if (addr.isAnyLocalAddress() || addr.isLoopbackAddress()) return true;
try {
return NetworkInterface.getByInetAddress(addr) != null;
} catch (Exception e) {
return false;
}
}
}

View File

@ -0,0 +1,120 @@
package io.anuke.mindustry.net;
import io.anuke.arc.*;
import io.anuke.arc.util.*;
import io.anuke.mindustry.game.EventType.*;
import io.anuke.mindustry.net.Net.*;
import io.anuke.mindustry.net.Packets.*;
import io.anuke.mnet.*;
import io.anuke.mnet.MServerSocket;
import java.io.*;
import java.net.*;
import java.nio.*;
import java.util.concurrent.*;
public class MServer implements ServerProvider, ApplicationListener{
final CopyOnWriteArrayList<MConnectionImpl> connections = new CopyOnWriteArrayList<>();
MServerSocket socket;
public MServer(){
Events.on(AppLoadEvent.class, e -> {
Core.app.addListener(this);
});
}
public void host(int port) throws IOException{
socket = new MServerSocket(port, con -> {
MSocket sock = con.accept(null);
MConnectionImpl kn = new MConnectionImpl(sock);
sock.setUserData(kn);
String ip = sock.getRemoteAddress().getHostAddress();
Connect c = new Connect();
c.id = kn.id;
c.addressTCP = ip;
Log.info("&bRecieved connection: {0} / {1}", c.id, c.addressTCP);
connections.add(kn);
Core.app.post(() -> Net.handleServerReceived(kn.id, c));
sock.addDcListener((socket, message) -> {
Log.info("&bLost connection {0}. Reason: {1}", kn.id, message);
Disconnect dc = new Disconnect();
dc.id = kn.id;
Core.app.post(() -> {
Net.handleServerReceived(kn.id, dc);
connections.remove(kn);
});
});
}, PacketSerializer::new, () -> {
ByteBuffer buf = NetworkIO.writeServerData();
byte[] bytes = buf.array();
return new DatagramPacket(bytes, bytes.length);
});
connections.clear();
}
public void update(){
if(socket == null) return;
socket.update();
for(MSocket socket : socket.getSockets()){
MConnectionImpl c = socket.getUserData();
socket.update((s, msg) -> Core.app.post(() -> {
try{
Net.handleServerReceived(c.id, msg);
}catch(Exception e){
e.printStackTrace();
}
}));
}
}
public void close(){
if(socket != null) socket.close();
}
public Iterable<? extends NetConnection> getConnections(){
return connections;
}
public MConnectionImpl getByID(int id){
for(MConnectionImpl n : connections){
if(n.id == id){
return n;
}
}
return null;
}
class MConnectionImpl extends NetConnection{
private final MSocket sock; //sock.
public MConnectionImpl(MSocket con){
super(con.getRemoteAddress().getHostAddress());
this.sock = con;
}
@Override
public void send(Object object, SendMode mode){
if(mode == SendMode.tcp){
sock.send(object);
}else{
sock.sendUnreliable(object);
}
}
@Override
public void close(){
sock.close();
}
}
}

View File

@ -1,42 +1,85 @@
package io.anuke.mindustry.net;
import io.anuke.arc.function.Supplier;
import io.anuke.arc.net.FrameworkMessage;
import io.anuke.arc.net.*;
import io.anuke.arc.net.FrameworkMessage.*;
import io.anuke.arc.net.NetSerializer;
import io.anuke.arc.util.pooling.Pools;
import io.anuke.arc.util.pooling.*;
import io.anuke.mnet.*;
import java.nio.ByteBuffer;
import java.nio.*;
import java.util.*;
@SuppressWarnings("unchecked")
public class PacketSerializer implements NetSerializer{
public class PacketSerializer implements NetSerializer, MSerializer{
private ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024 * 8);
@Override
public void write(ByteBuffer byteBuffer, Object o){
if(o instanceof FrameworkMessage){
byteBuffer.put((byte)-2); //code for framework message
writeFramework(byteBuffer, (FrameworkMessage)o);
}else{
if(!(o instanceof Packet))
throw new RuntimeException("All sent objects must implement be Packets! Class: " + o.getClass());
byte id = Registrator.getID(o.getClass());
if(id == -1)
throw new RuntimeException("Unregistered class: " + o.getClass());
byteBuffer.put(id);
((Packet)o).write(byteBuffer);
if(o == null){
byteBuffer.put((byte)-1);
return;
}
if (!(o instanceof Packet))
throw new RuntimeException("All sent objects must implement be Packets! Class: " + o.getClass());
byte id = Registrator.getID(o.getClass());
if (id == -1)
throw new RuntimeException("Unregistered class: " + o.getClass());
byteBuffer.put(id);
((Packet) o).write(byteBuffer);
}
@Override
public Object read(ByteBuffer byteBuffer){
byte id = byteBuffer.get();
if(id == -2){
return readFramework(byteBuffer);
}else{
Packet packet = Pools.obtain((Class<Packet>)Registrator.getByID(id).type, (Supplier<Packet>)Registrator.getByID(id).constructor);
packet.read(byteBuffer);
return packet;
if(id == -1){
return null;
}
Packet packet = Pools.obtain((Class<Packet>) Registrator.getByID(id).type, (Supplier<Packet>) Registrator.getByID(id).constructor);
packet.read(byteBuffer);
return packet;
}
@Override
public byte[] serialize(Object o){
buffer.position(0);
write(buffer, o);
return Arrays.copyOfRange(buffer.array(), 0, buffer.position());
}
@Override
public byte[] serialize(Object o, int offset){
buffer.position(0);
write(buffer, o);
int length = buffer.position();
byte[] bytes = new byte[length + offset];
System.arraycopy(buffer.array(), 0, bytes, offset, length);
return bytes;
}
@Override
public int serialize(Object o, byte[] bytes, int offset){
buffer.position(0);
write(buffer, o);
int length = buffer.position();
System.arraycopy(buffer.array(), 0, bytes, offset, length);
return length;
}
@Override
public Object deserialize(byte[] bytes){
buffer.position(0);
buffer.put(bytes);
buffer.position(0);
return read(buffer);
}
@Override
public Object deserialize(byte[] bytes, int offset, int length){
buffer.position(0);
buffer.put(bytes, offset, length);
buffer.position(0);
return read(buffer);
}

View File

@ -8,8 +8,8 @@ public class ServerLauncher{
public static void main(String[] args){
try{
Net.setClientProvider(new ArcNetClient());
Net.setServerProvider(new ArcNetServer());
Net.setClientProvider(new MClient());
Net.setServerProvider(new MServer());
new HeadlessApplication(new MindustryServer(args), null, throwable -> CrashSender.send(throwable, f -> {}));
}catch(Throwable t){
CrashSender.send(t, f -> {});

View File

@ -27,6 +27,7 @@ if(!hasProperty("release")){
use(':Arc:extensions:freetype')
use(':Arc:extensions:recorder')
use(':Arc:extensions:arcnet')
use(':Arc:extensions:mnet')
use(':Arc:extensions:packer')
use(':Arc:backends')
use(':Arc:backends:backend-sdl')