diff --git a/android/src/io/anuke/mindustry/AndroidLauncher.java b/android/src/io/anuke/mindustry/AndroidLauncher.java index 4792bd6d89..e65c3be427 100644 --- a/android/src/io/anuke/mindustry/AndroidLauncher.java +++ b/android/src/io/anuke/mindustry/AndroidLauncher.java @@ -129,8 +129,8 @@ public class AndroidLauncher extends AndroidApplication{ } config.hideStatusBar = true; - Net.setClientProvider(new ArcNetClient()); - Net.setServerProvider(new ArcNetServer()); + Net.setClientProvider(new MClient()); + Net.setServerProvider(new MServer()); initialize(new Mindustry(), config); checkFiles(getIntent()); } diff --git a/build.gradle b/build.gradle index 736131c9e3..e3830aa61f 100644 --- a/build.gradle +++ b/build.gradle @@ -229,9 +229,11 @@ project(":core"){ compileJava.dependsOn(preGen) + compile "org.lz4:lz4-java:1.4.1" compile arcModule("arc-core") compile arcModule("extensions:freetype") compile arcModule("extensions:arcnet") + compile arcModule("extensions:mnet") if(localArc() && debugged()) compile arcModule("extensions:recorder") compileOnly project(":annotations") @@ -294,7 +296,6 @@ project(":net"){ dependencies{ compile project(":core") - compile "org.lz4:lz4-java:1.4.1" } } diff --git a/core/src/io/anuke/mindustry/Mindustry.java b/core/src/io/anuke/mindustry/Mindustry.java index 9ed18ca79d..571ade02a3 100644 --- a/core/src/io/anuke/mindustry/Mindustry.java +++ b/core/src/io/anuke/mindustry/Mindustry.java @@ -67,6 +67,6 @@ public class Mindustry extends ApplicationCore{ super.init(); Log.info("Time to load [total]: {0}", Time.elapsed()); - Events.fire(new GameLoadEvent()); + Events.fire(new ClientLoadEvent()); } } diff --git a/core/src/io/anuke/mindustry/Vars.java b/core/src/io/anuke/mindustry/Vars.java index f995df588b..76c492a3bc 100644 --- a/core/src/io/anuke/mindustry/Vars.java +++ b/core/src/io/anuke/mindustry/Vars.java @@ -1,7 +1,7 @@ package io.anuke.mindustry; +import io.anuke.arc.*; import io.anuke.arc.Application.ApplicationType; -import io.anuke.arc.Core; import io.anuke.arc.files.FileHandle; import io.anuke.arc.graphics.Color; import io.anuke.arc.util.Structs; @@ -15,6 +15,7 @@ import io.anuke.mindustry.entities.traits.DrawTrait; import io.anuke.mindustry.entities.traits.SyncTrait; import io.anuke.mindustry.entities.type.*; import io.anuke.mindustry.game.*; +import io.anuke.mindustry.game.EventType.*; import io.anuke.mindustry.gen.Serialization; import io.anuke.mindustry.net.Net; import io.anuke.mindustry.world.blocks.defense.ForceProjector.ShieldEntity; @@ -222,5 +223,7 @@ public class Vars{ customMapDirectory = dataDirectory.child("maps/"); saveDirectory = dataDirectory.child("saves/"); tmpDirectory = dataDirectory.child("tmp/"); + + Events.fire(new AppLoadEvent()); } } diff --git a/core/src/io/anuke/mindustry/game/EventType.java b/core/src/io/anuke/mindustry/game/EventType.java index c9da8c9c57..84d4262650 100644 --- a/core/src/io/anuke/mindustry/game/EventType.java +++ b/core/src/io/anuke/mindustry/game/EventType.java @@ -27,8 +27,13 @@ public class EventType{ } } - /** Called when the game is first loaded. */ - public static class GameLoadEvent{ + /** Called when the client game is first loaded. */ + public static class ClientLoadEvent{ + + } + + /** Called when the core app is first loaded. */ + public static class AppLoadEvent{ } diff --git a/core/src/io/anuke/mindustry/net/Net.java b/core/src/io/anuke/mindustry/net/Net.java index 19f459ea60..006def1fcf 100644 --- a/core/src/io/anuke/mindustry/net/Net.java +++ b/core/src/io/anuke/mindustry/net/Net.java @@ -1,19 +1,18 @@ package io.anuke.mindustry.net; -import io.anuke.arc.Core; +import io.anuke.arc.*; import io.anuke.arc.collection.*; -import io.anuke.arc.function.BiConsumer; -import io.anuke.arc.function.Consumer; +import io.anuke.arc.function.*; import io.anuke.arc.util.*; -import io.anuke.arc.util.pooling.Pools; -import io.anuke.mindustry.core.Platform; -import io.anuke.mindustry.gen.Call; +import io.anuke.arc.util.pooling.*; +import io.anuke.mindustry.core.*; +import io.anuke.mindustry.gen.*; import io.anuke.mindustry.net.Packets.*; -import io.anuke.mindustry.net.Streamable.StreamBuilder; +import io.anuke.mindustry.net.Streamable.*; +import net.jpountz.lz4.*; -import java.io.IOException; -import java.nio.BufferOverflowException; -import java.nio.BufferUnderflowException; +import java.io.*; +import java.nio.*; import static io.anuke.mindustry.Vars.*; @@ -28,6 +27,8 @@ public class Net{ private static ClientProvider clientProvider; private static ServerProvider serverProvider; private static IntMap streams = new IntMap<>(); + private static final LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor(); + private static final LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor(); /** Display a network error. Call on the graphics thread. */ public static void showError(Throwable e){ @@ -144,11 +145,11 @@ public class Net{ } public static byte[] compressSnapshot(byte[] input){ - return serverProvider.compressSnapshot(input); + return compressor.compress(input); } public static byte[] decompressSnapshot(byte[] input, int size){ - return clientProvider.decompressSnapshot(input, size); + return decompressor.decompress(input, size); } /** @@ -354,9 +355,6 @@ public class Net{ /** Disconnect from the server. */ void disconnect(); - /** Decompress an input snapshot byte array. */ - byte[] decompressSnapshot(byte[] input, int size); - /** * Discover servers. This should run the callback regardless of whether any servers are found. Should not block. * Callback should be run on libGDX main thread. @@ -428,9 +426,6 @@ public class Net{ /** Close the server connection. */ void close(); - /** Compress an input snapshot byte array. */ - byte[] compressSnapshot(byte[] input); - /** Return all connected users. */ Iterable getConnections(); diff --git a/desktop/src/io/anuke/mindustry/desktop/DesktopLauncher.java b/desktop/src/io/anuke/mindustry/desktop/DesktopLauncher.java index 3fcd880bf1..5838584185 100644 --- a/desktop/src/io/anuke/mindustry/desktop/DesktopLauncher.java +++ b/desktop/src/io/anuke/mindustry/desktop/DesktopLauncher.java @@ -1,4 +1,5 @@ package io.anuke.mindustry.desktop; + import io.anuke.arc.Files.*; import io.anuke.arc.backends.sdl.*; import io.anuke.mindustry.*; @@ -11,8 +12,8 @@ public class DesktopLauncher{ try{ Platform.instance = new DesktopPlatform(arg); - Net.setClientProvider(new ArcNetClient()); - Net.setServerProvider(new ArcNetServer()); + Net.setClientProvider(new MClient()); + Net.setServerProvider(new MServer()); new SdlApplication(new Mindustry(), new SdlConfig(){{ title = "Mindustry"; diff --git a/ios/src/io/anuke/mindustry/IOSLauncher.java b/ios/src/io/anuke/mindustry/IOSLauncher.java index 4161d44017..c8dcc9b510 100644 --- a/ios/src/io/anuke/mindustry/IOSLauncher.java +++ b/ios/src/io/anuke/mindustry/IOSLauncher.java @@ -26,8 +26,8 @@ public class IOSLauncher extends IOSApplication.Delegate{ @Override protected IOSApplication createApplication(){ - Net.setClientProvider(new ArcNetClient()); - Net.setServerProvider(new ArcNetServer()); + Net.setClientProvider(new MClient()); + Net.setServerProvider(new MServer()); if(UIDevice.getCurrentDevice().getUserInterfaceIdiom() == UIUserInterfaceIdiom.Pad){ UnitScl.dp.addition = 0.5f; diff --git a/net/src/io/anuke/mindustry/net/ArcNetClient.java b/net/src/io/anuke/mindustry/net/ArcNetClient.java index 618aea5eb4..9f8f09459d 100644 --- a/net/src/io/anuke/mindustry/net/ArcNetClient.java +++ b/net/src/io/anuke/mindustry/net/ArcNetClient.java @@ -8,7 +8,6 @@ import io.anuke.arc.util.async.*; import io.anuke.arc.util.pooling.*; import io.anuke.mindustry.net.Net.*; import io.anuke.mindustry.net.Packets.*; -import net.jpountz.lz4.*; import java.io.*; import java.net.*; @@ -20,7 +19,6 @@ import static io.anuke.mindustry.Vars.*; public class ArcNetClient implements ClientProvider{ final Client client; final Supplier packetSupplier = () -> new DatagramPacket(new byte[256], 256); - final LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor(); public ArcNetClient(){ client = new Client(8192, 4096, new PacketSerializer()); @@ -75,14 +73,6 @@ public class ArcNetClient implements ClientProvider{ } } - - @Override - public byte[] decompressSnapshot(byte[] input, int size){ - byte[] result = new byte[size]; - decompressor.decompress(input, result); - return result; - } - @Override public void connect(String ip, int port, Runnable success){ Threads.daemon(() -> { diff --git a/net/src/io/anuke/mindustry/net/ArcNetServer.java b/net/src/io/anuke/mindustry/net/ArcNetServer.java index 08c571c119..219d37b437 100644 --- a/net/src/io/anuke/mindustry/net/ArcNetServer.java +++ b/net/src/io/anuke/mindustry/net/ArcNetServer.java @@ -6,7 +6,6 @@ import io.anuke.arc.util.*; import io.anuke.arc.util.async.*; import io.anuke.mindustry.net.Net.*; import io.anuke.mindustry.net.Packets.*; -import net.jpountz.lz4.*; import java.io.*; import java.nio.*; @@ -18,7 +17,6 @@ import static io.anuke.mindustry.Vars.*; public class ArcNetServer implements ServerProvider{ final Server server; final CopyOnWriteArrayList connections = new CopyOnWriteArrayList<>(); - final LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor(); Thread serverThread; public ArcNetServer(){ @@ -87,11 +85,6 @@ public class ArcNetServer implements ServerProvider{ server.addListener(listener); } - @Override - public byte[] compressSnapshot(byte[] input){ - return compressor.compress(input); - } - @Override public Iterable getConnections(){ return connections; diff --git a/net/src/io/anuke/mindustry/net/MClient.java b/net/src/io/anuke/mindustry/net/MClient.java new file mode 100644 index 0000000000..ea961f1090 --- /dev/null +++ b/net/src/io/anuke/mindustry/net/MClient.java @@ -0,0 +1,177 @@ +package io.anuke.mindustry.net; + +import io.anuke.arc.*; +import io.anuke.arc.function.*; +import io.anuke.arc.util.async.*; +import io.anuke.mindustry.game.EventType.*; +import io.anuke.mindustry.net.Net.*; +import io.anuke.mindustry.net.Packets.*; +import io.anuke.mnet.*; +import io.anuke.mnet.MSocket; +import io.anuke.mnet.MSocketImpl; + +import java.io.*; +import java.net.*; +import java.nio.*; +import java.util.*; + +import static io.anuke.mindustry.Vars.*; + +public class MClient implements ClientProvider, ApplicationListener{ + MSocket socket; + + public MClient(){ + Events.on(AppLoadEvent.class, e -> { + Core.app.addListener(this); + }); + } + + public void connect(String ip, int port, Runnable success) throws IOException{ + socket = new MSocketImpl(InetAddress.getByName(ip), port, new PacketSerializer()); + socket.addDcListener((sock, reason) -> Core.app.post(() -> Net.handleClientReceived(new Disconnect()))); + socket.connectAsync(null, 2000, response -> { + if(response.getType() == ResponseType.ACCEPTED){ + Core.app.post(() -> { + success.run(); + Net.handleClientReceived(new Connect()); + }); + }else if(response.getType() == ResponseType.WRONG_STATE){ + Core.app.post(() -> Net.showError(new IOException("alreadyconnected"))); + }else{ + Core.app.post(() -> Net.showError(new IOException("connection refused"))); + } + }); + } + + @Override + public void updatePing(){ + + } + + @Override + public void dispose(){ + disconnect(); + } + + public void send(Object object, SendMode mode){ + if(mode == SendMode.tcp){ + socket.send(object); + }else{ + socket.sendUnreliable(object); + } + } + + public void update(){ + if(socket == null) return; + + socket.update((sock, object) -> Core.app.post(() -> { + try{ + Net.handleClientReceived(object); + }catch(Exception e){ + Net.showError(e); + netClient.disconnectQuietly(); + } + })); + } + + public int getPing(){ + return socket == null ? 0 : (int)socket.getPing(); + } + + public void disconnect(){ + if(socket != null) socket.close(); + } + + public void discover(Consumer callback, Runnable done){ + Threads.daemon(() -> { + byte[] bytes = new byte[512]; + ByteBuffer buffer = ByteBuffer.wrap(bytes); + DatagramPacket packet = new DatagramPacket(bytes, bytes.length); + ArrayList foundAddresses = new ArrayList<>(); + + try(DatagramSocket socket = new DatagramSocket()){ + broadcast(port, socket); + + socket.setSoTimeout(4000); + + outer: + while(true){ + + try{ + socket.receive(packet); + }catch(SocketTimeoutException ex){ + done.run(); + return; + } + + buffer.position(0); + + InetAddress address = ((InetSocketAddress)packet.getSocketAddress()).getAddress(); + + for(InetAddress other : foundAddresses){ + if(other.equals(address) || (isLocal(other) && isLocal(address))){ + continue outer; + } + } + + Host host = NetworkIO.readServerData(address.getHostName(), buffer); + callback.accept(host); + foundAddresses.add(address); + } + }catch(IOException ex){ + done.run(); + } + }); + } + + public void pingHost(String address, int port, Consumer valid, Consumer failed){ + Threads.daemon(() -> { + try{ + DatagramPacket packet = new DatagramPacket(new byte[512], 512); + + DatagramSocket socket = new DatagramSocket(); + socket.send(new DatagramPacket(new byte[]{-2}, 1, InetAddress.getByName(address), port)); + socket.setSoTimeout(4000); + socket.receive(packet); + + ByteBuffer buffer = ByteBuffer.wrap(packet.getData()); + Host host = NetworkIO.readServerData(packet.getAddress().getHostAddress(), buffer); + + Core.app.post(() -> valid.accept(host)); + }catch(Exception e){ + Core.app.post(() -> failed.accept(e)); + } + }); + } + private void broadcast (int udpPort, DatagramSocket socket) throws IOException{ + byte[] data = {-2}; + + for (NetworkInterface iface : Collections.list(NetworkInterface.getNetworkInterfaces())){ + for (InetAddress address : Collections.list(iface.getInetAddresses())){ + + byte[] ip = address.getAddress(); //255.255.255.255 + try{ + socket.send(new DatagramPacket(data, data.length, InetAddress.getByAddress(ip), udpPort)); + }catch (Exception ignored){} + ip[3] = -1; //255.255.255.0 + try{ + socket.send(new DatagramPacket(data, data.length, InetAddress.getByAddress(ip), udpPort)); + }catch (Exception ignored){} + ip[2] = -1; //255.255.0.0 + try{ + socket.send(new DatagramPacket(data, data.length, InetAddress.getByAddress(ip), udpPort)); + }catch (Exception ignored){} + } + } + } + + private boolean isLocal(InetAddress addr) { + if (addr.isAnyLocalAddress() || addr.isLoopbackAddress()) return true; + + try { + return NetworkInterface.getByInetAddress(addr) != null; + } catch (Exception e) { + return false; + } + } +} diff --git a/net/src/io/anuke/mindustry/net/MServer.java b/net/src/io/anuke/mindustry/net/MServer.java new file mode 100644 index 0000000000..0e32f97ecb --- /dev/null +++ b/net/src/io/anuke/mindustry/net/MServer.java @@ -0,0 +1,120 @@ +package io.anuke.mindustry.net; + +import io.anuke.arc.*; +import io.anuke.arc.util.*; +import io.anuke.mindustry.game.EventType.*; +import io.anuke.mindustry.net.Net.*; +import io.anuke.mindustry.net.Packets.*; +import io.anuke.mnet.*; +import io.anuke.mnet.MServerSocket; + +import java.io.*; +import java.net.*; +import java.nio.*; +import java.util.concurrent.*; + +public class MServer implements ServerProvider, ApplicationListener{ + final CopyOnWriteArrayList connections = new CopyOnWriteArrayList<>(); + MServerSocket socket; + + public MServer(){ + Events.on(AppLoadEvent.class, e -> { + Core.app.addListener(this); + }); + } + + public void host(int port) throws IOException{ + socket = new MServerSocket(port, con -> { + MSocket sock = con.accept(null); + + MConnectionImpl kn = new MConnectionImpl(sock); + sock.setUserData(kn); + + String ip = sock.getRemoteAddress().getHostAddress(); + + Connect c = new Connect(); + c.id = kn.id; + c.addressTCP = ip; + + Log.info("&bRecieved connection: {0} / {1}", c.id, c.addressTCP); + + connections.add(kn); + Core.app.post(() -> Net.handleServerReceived(kn.id, c)); + + sock.addDcListener((socket, message) -> { + Log.info("&bLost connection {0}. Reason: {1}", kn.id, message); + + Disconnect dc = new Disconnect(); + dc.id = kn.id; + + Core.app.post(() -> { + Net.handleServerReceived(kn.id, dc); + connections.remove(kn); + }); + }); + }, PacketSerializer::new, () -> { + ByteBuffer buf = NetworkIO.writeServerData(); + byte[] bytes = buf.array(); + return new DatagramPacket(bytes, bytes.length); + }); + + connections.clear(); + } + + public void update(){ + if(socket == null) return; + + socket.update(); + for(MSocket socket : socket.getSockets()){ + MConnectionImpl c = socket.getUserData(); + socket.update((s, msg) -> Core.app.post(() -> { + try{ + Net.handleServerReceived(c.id, msg); + }catch(Exception e){ + e.printStackTrace(); + } + })); + } + } + + public void close(){ + if(socket != null) socket.close(); + } + + public Iterable getConnections(){ + return connections; + } + + public MConnectionImpl getByID(int id){ + for(MConnectionImpl n : connections){ + if(n.id == id){ + return n; + } + } + return null; + } + + class MConnectionImpl extends NetConnection{ + private final MSocket sock; //sock. + + public MConnectionImpl(MSocket con){ + super(con.getRemoteAddress().getHostAddress()); + this.sock = con; + } + + @Override + public void send(Object object, SendMode mode){ + if(mode == SendMode.tcp){ + sock.send(object); + }else{ + sock.sendUnreliable(object); + } + + } + + @Override + public void close(){ + sock.close(); + } + } +} diff --git a/net/src/io/anuke/mindustry/net/PacketSerializer.java b/net/src/io/anuke/mindustry/net/PacketSerializer.java index e79d5457aa..b0e19c1b25 100644 --- a/net/src/io/anuke/mindustry/net/PacketSerializer.java +++ b/net/src/io/anuke/mindustry/net/PacketSerializer.java @@ -1,42 +1,85 @@ package io.anuke.mindustry.net; import io.anuke.arc.function.Supplier; -import io.anuke.arc.net.FrameworkMessage; +import io.anuke.arc.net.*; import io.anuke.arc.net.FrameworkMessage.*; -import io.anuke.arc.net.NetSerializer; -import io.anuke.arc.util.pooling.Pools; +import io.anuke.arc.util.pooling.*; +import io.anuke.mnet.*; -import java.nio.ByteBuffer; +import java.nio.*; +import java.util.*; @SuppressWarnings("unchecked") -public class PacketSerializer implements NetSerializer{ +public class PacketSerializer implements NetSerializer, MSerializer{ + private ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024 * 8); @Override public void write(ByteBuffer byteBuffer, Object o){ - if(o instanceof FrameworkMessage){ - byteBuffer.put((byte)-2); //code for framework message - writeFramework(byteBuffer, (FrameworkMessage)o); - }else{ - if(!(o instanceof Packet)) - throw new RuntimeException("All sent objects must implement be Packets! Class: " + o.getClass()); - byte id = Registrator.getID(o.getClass()); - if(id == -1) - throw new RuntimeException("Unregistered class: " + o.getClass()); - byteBuffer.put(id); - ((Packet)o).write(byteBuffer); + if(o == null){ + byteBuffer.put((byte)-1); + return; } + + if (!(o instanceof Packet)) + throw new RuntimeException("All sent objects must implement be Packets! Class: " + o.getClass()); + byte id = Registrator.getID(o.getClass()); + if (id == -1) + throw new RuntimeException("Unregistered class: " + o.getClass()); + byteBuffer.put(id); + ((Packet) o).write(byteBuffer); } @Override public Object read(ByteBuffer byteBuffer){ byte id = byteBuffer.get(); - if(id == -2){ - return readFramework(byteBuffer); - }else{ - Packet packet = Pools.obtain((Class)Registrator.getByID(id).type, (Supplier)Registrator.getByID(id).constructor); - packet.read(byteBuffer); - return packet; + if(id == -1){ + return null; } + Packet packet = Pools.obtain((Class) Registrator.getByID(id).type, (Supplier) Registrator.getByID(id).constructor); + packet.read(byteBuffer); + return packet; + } + + @Override + public byte[] serialize(Object o){ + buffer.position(0); + write(buffer, o); + return Arrays.copyOfRange(buffer.array(), 0, buffer.position()); + } + + @Override + public byte[] serialize(Object o, int offset){ + buffer.position(0); + write(buffer, o); + int length = buffer.position(); + byte[] bytes = new byte[length + offset]; + System.arraycopy(buffer.array(), 0, bytes, offset, length); + return bytes; + } + + @Override + public int serialize(Object o, byte[] bytes, int offset){ + buffer.position(0); + write(buffer, o); + int length = buffer.position(); + System.arraycopy(buffer.array(), 0, bytes, offset, length); + return length; + } + + @Override + public Object deserialize(byte[] bytes){ + buffer.position(0); + buffer.put(bytes); + buffer.position(0); + return read(buffer); + } + + @Override + public Object deserialize(byte[] bytes, int offset, int length){ + buffer.position(0); + buffer.put(bytes, offset, length); + buffer.position(0); + return read(buffer); } diff --git a/server/src/io/anuke/mindustry/server/ServerLauncher.java b/server/src/io/anuke/mindustry/server/ServerLauncher.java index 06f842c418..fdcbd5a61b 100644 --- a/server/src/io/anuke/mindustry/server/ServerLauncher.java +++ b/server/src/io/anuke/mindustry/server/ServerLauncher.java @@ -8,8 +8,8 @@ public class ServerLauncher{ public static void main(String[] args){ try{ - Net.setClientProvider(new ArcNetClient()); - Net.setServerProvider(new ArcNetServer()); + Net.setClientProvider(new MClient()); + Net.setServerProvider(new MServer()); new HeadlessApplication(new MindustryServer(args), null, throwable -> CrashSender.send(throwable, f -> {})); }catch(Throwable t){ CrashSender.send(t, f -> {}); diff --git a/settings.gradle b/settings.gradle index 41b5080058..ea39bf3078 100644 --- a/settings.gradle +++ b/settings.gradle @@ -27,6 +27,7 @@ if(!hasProperty("release")){ use(':Arc:extensions:freetype') use(':Arc:extensions:recorder') use(':Arc:extensions:arcnet') + use(':Arc:extensions:mnet') use(':Arc:extensions:packer') use(':Arc:backends') use(':Arc:backends:backend-sdl')