diff --git a/src/main/java/org/toop/Main.java b/src/main/java/org/toop/Main.java index 22b074e..ff3f5a2 100644 --- a/src/main/java/org/toop/Main.java +++ b/src/main/java/org/toop/Main.java @@ -2,53 +2,65 @@ package org.toop; import org.toop.eventbus.Events; import org.toop.eventbus.GlobalEventBus; -import org.toop.server.Server; +import org.toop.server.ServerConnection; +import org.toop.server.ServerManager; import org.toop.server.backend.Testsss; import org.toop.server.backend.TcpServer; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + public class Main { private static final Logger logger = LogManager.getLogger(Main.class); - public static void main(String[] args) { - - if (!initEvents()) { - throw new RuntimeException("A event could not be initialized"); - } + public static void main(String[] args) throws ExecutionException, InterruptedException { TcpServer server = new TcpServer(5001); Thread serverThread = new Thread(server); serverThread.start(); - Server.start("127.0.0.1", "5001"); - // Testsss.start(""); // Used for testing server. + + ServerManager serverManager = new ServerManager(); + + CompletableFuture future = new CompletableFuture<>(); + GlobalEventBus.post(new Events.ServerEvents.StartConnectionRequest("127.0.0.1", "5001", future)); + String serverId = future.get(); + logger.info("Server ID: " + serverId); + + CompletableFuture future2 = new CompletableFuture<>(); + GlobalEventBus.post(new Events.ServerEvents.StartConnectionRequest("127.0.0.2", "5002", future2)); + String serverId2 = future2.get(); + logger.info("Server ID: " + serverId2); + + CompletableFuture future3 = new CompletableFuture<>(); + GlobalEventBus.post(new Events.ServerEvents.StartConnectionRequest("127.0.0.3", "5003", future3)); + String serverId3 = future3.get(); + logger.info("Server ID: " + serverId3); + + CompletableFuture future4 = new CompletableFuture<>(); + GlobalEventBus.post(new Events.ServerEvents.StartConnectionRequest("127.0.0.4", "5004", future4)); + String serverId4 = future4.get(); + logger.info("Server ID: " + serverId4); + + CompletableFuture future5 = new CompletableFuture<>(); + GlobalEventBus.post(new Events.ServerEvents.StartConnectionRequest("127.0.0.5", "5005", future5)); + String serverId5 = future5.get(); + logger.info("Server ID: " + serverId5); + +// GlobalEventBus.post(new Events.ServerEvents.StartConnection("127.0.0.1", "5001")); + + +// Server.startNew("127.0.0.1", "5001"); +// Testsss.start(""); // Used for testing server. Window.start(""); + CompletableFuture future6 = new CompletableFuture<>(); + GlobalEventBus.post(new Events.ServerEvents.RequestsAllConnections(future6)); + String serverConnections = future6.get(); + logger.info("Running connections: {}", serverConnections); + } - /** - * Returns false if any event could not be initialized. - */ - private static boolean initEvents() { - try { - - GlobalEventBus.subscribeAndRegister(Events.ServerEvents.OnChangingServerIp.class, - event -> - logger.info("Changing server ip to {}", event.ip()) - ); - - GlobalEventBus.subscribeAndRegister(Events.ServerEvents.OnChangingServerPort.class, - event -> - logger.info("Changing server port to {}", event.port()) - ); - - return true; - } - catch (Exception err) { - logger.info("{}", err.getMessage()); - return false; - } - } - } \ No newline at end of file diff --git a/src/main/java/org/toop/eventbus/Events.java b/src/main/java/org/toop/eventbus/Events.java index 30168d0..3459e7c 100644 --- a/src/main/java/org/toop/eventbus/Events.java +++ b/src/main/java/org/toop/eventbus/Events.java @@ -1,9 +1,12 @@ package org.toop.eventbus; -import org.toop.server.Server; +import org.toop.server.ServerConnection; import java.lang.reflect.Constructor; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; /** * Events that are used in the GlobalEventBus class. @@ -74,10 +77,42 @@ public class Events implements IEvents { public static class ServerEvents { + public record RequestsAllConnections(CompletableFuture future) {} + + public record ForceCloseAllConnections() {} + + /** + * + * Triggers starting a server connection. + * + * @param ip + * @param port + */ + public record StartConnection(String ip, String port) {} + + /** + * Triggers starting a server connection, returns a future. + * WARNING: This is a blocking operation. + * + * @param ip + * @param port + * @param future + */ + public record StartConnectionRequest(String ip, String port, CompletableFuture future) {} + + /** + * Triggers when a connection to a server is established. + * + * @param connectionId + * @param ip + * @param port + */ + public record ConnectionEstablished(Object connectionId, String ip, String port) {} + /** * Triggers sending a command to a server. */ - public record command(String command, String... args) {} + public record Command(Object connectionId, String command, String... args) { } /** * Triggers when a command is sent to a server. @@ -102,12 +137,19 @@ public class Events implements IEvents { /** * Triggers reconnecting to previous address. */ - public record Reconnect() {} + public record Reconnect(Object connectionId) {} /** * Triggers changing connection to a new address. */ - public record ChangeConnection(String ip, String port) { } + public record ChangeConnection(Object connectionId, String ip, String port) {} + + /** + * Triggers when the server couldn't connect to the desired address. + */ + public record CouldNotConnect(Object connectionId) {} + + public record ClosedConnection() {} } diff --git a/src/main/java/org/toop/eventbus/GlobalEventBus.java b/src/main/java/org/toop/eventbus/GlobalEventBus.java index 46c06d9..99fff2d 100644 --- a/src/main/java/org/toop/eventbus/GlobalEventBus.java +++ b/src/main/java/org/toop/eventbus/GlobalEventBus.java @@ -8,26 +8,40 @@ import java.util.function.Consumer; /** * A singleton Event Bus to be used for creating, triggering and activating events. */ -public enum GlobalEventBus { - /** - * The instance of the Event Bus. - */ - INSTANCE; +public class GlobalEventBus { /** * Singleton event bus. */ - private final EventBus eventBus = new EventBus("global-bus"); + private static EventBus eventBus = new EventBus("global-bus"); + + private GlobalEventBus() {} /** * Wraps a Consumer into a Guava @Subscribe-compatible listener. * * @return Singleton Event Bus */ - public EventBus get() { + public static EventBus get() { return eventBus; } + /** + * ONLY USE FOR TESTING + * + * @param newBus + */ + public static void set(EventBus newBus) { + eventBus = newBus; + } + + /** + * Reset back to the default global EventBus. + */ + public static void reset() { + eventBus = new EventBus("global-bus"); + } + /** * Wraps a Consumer into a Guava @Subscribe-compatible listener. * TODO @@ -39,8 +53,10 @@ public enum GlobalEventBus { private static Object subscribe(Class type, Consumer action) { return new Object() { @Subscribe - public void handle(T event) { - action.accept(event); + public void handle(Object event) { + if (type.isInstance(event)) { + action.accept(type.cast(event)); + } } }; } @@ -73,7 +89,7 @@ public enum GlobalEventBus { * @param event The ready event to add to register. */ public static void register(EventMeta event) { - GlobalEventBus.INSTANCE.get().register(event.getEvent()); + GlobalEventBus.get().register(event.getEvent()); event.setReady(true); EventRegistry.markReady(event.getType()); } @@ -86,7 +102,7 @@ public enum GlobalEventBus { public static void unregister(EventMeta event) { EventRegistry.markNotReady(event.getType()); event.setReady(false); - GlobalEventBus.INSTANCE.get().unregister(event.getEvent()); + GlobalEventBus.get().unregister(event.getEvent()); } /** @@ -106,7 +122,6 @@ public enum GlobalEventBus { EventRegistry.storeEvent(eventMeta); // post to Guava EventBus - GlobalEventBus.INSTANCE.get().post(event); + GlobalEventBus.get().post(event); } - } \ No newline at end of file diff --git a/src/main/java/org/toop/server/Server.java b/src/main/java/org/toop/server/Server.java deleted file mode 100644 index 71b7e18..0000000 --- a/src/main/java/org/toop/server/Server.java +++ /dev/null @@ -1,285 +0,0 @@ -package org.toop.server; - -import org.toop.Main; -import org.toop.eventbus.Events; -import org.toop.eventbus.GlobalEventBus; -import org.toop.server.backend.TcpClient; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -public class Server extends Thread { - - private static final Logger logger = LogManager.getLogger(Main.class); - - public enum ServerBackend { - LOCAL, - REMOTE, - } - - String ip; - String port; - BlockingQueue commandQueue; // TODO, add a way to close it, for when reconnecting. - TcpClient tcpClient; - volatile boolean running = false; - Thread tcpIn; - Thread tcpOut; - - public Server(String set_ip, String set_port) { - ip = set_ip; - port = set_port; - this.initEvents(); - this.commandQueue = new LinkedBlockingQueue<>(); - try { - this.tcpClient = new TcpClient(this.getIp(), Integer.parseInt(this.getPort())); - } catch (IOException e) { - e.printStackTrace(); - } - } - - public String getIp() { - return ip; - } - - /** - * @param ip The ip to change to. - */ - public void setIp(String ip) { - - this.ip = ip; - GlobalEventBus.post(new Events.ServerEvents.OnChangingServerIp(ip)); - - } - - public String getPort() { - return port; - } - - - /** - * @param port The port to change to. - */ - public void setPort(String port) { - - this.port = port; - GlobalEventBus.post(new Events.ServerEvents.OnChangingServerPort(port)); - - } - - /** - * - * Sends a command to the server. - * - * @param command The command to send to the server. - * @param args The arguments for the command. - */ - private void sendCommandByString(String command, String... args) { - - if (!ServerCommand.isValid(command)) { - logger.error("Invalid command: {}", command); - return; - } - - if (!running) { - logger.warn("Server has been stopped"); - return; - } - - for (int i = 0; i < args.length; i++) { - args[i] = args[i].trim(); - if (args[i].isEmpty()) { - throw new IllegalArgumentException("Empty argument"); // TODO: Error handling, just crashes atm. - } - } - - String[] fullCommand = new String[args.length + 1]; - fullCommand[0] = command; - System.arraycopy(args, 0, fullCommand, 1, args.length); - command = String.join(" ", fullCommand); - - this.commandQueue.add(command); - logger.info("Command '{}' added to the queue", command); - - } - - @Override - public String toString() { - - return String.format( - "Server {ip: \"%s\", port: \"%s\"}", - ip, port - ); - - } - - private void initEvents() { - - GlobalEventBus.subscribeAndRegister(Events.ServerEvents.command.class, event - -> this.sendCommandByString(event.command(), event.args())); - GlobalEventBus.subscribeAndRegister(Events.ServerEvents.Reconnect.class, event - -> { - try { - this.reconnect(); // TODO: Terrible error handling and code. Needs cleanup. - } catch (IOException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); - GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ChangeConnection.class, event - -> { - try { - this.connect(event.ip(), event.port()); // TODO: Terrible error handling and code. Needs cleanup. - } catch (IOException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); - - } - - private void connection() throws InterruptedException { // TODO: Rename - - sleep(1000); // Just wait, because why not - - this.tcpIn = new Thread(() -> { - try { - while (this.running) { - String received = this.tcpClient.readLine(); // blocks until a line is available - if (received != null) { - logger.info("Received: '{}'", received); - GlobalEventBus.post(new Events.ServerEvents.ReceivedMessage(received)); - } else { - break; - } - } - } catch (IOException e) { - logger.error("Error reading from server", e); - try { - this.tcpClient.close(); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - }); - - this.tcpOut = new Thread(() -> { - try { - while (this.running) { - String command = commandQueue.take(); // blocks until a command is available - this.tcpClient.sendMessage(command); - logger.info("Sent command: '{}'", command); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (IOException e) { - try { - tcpClient.close(); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - throw new RuntimeException(e); - } - }); - - this.tcpIn.start(); - this.tcpOut.start(); - - } - - /** - * - * Connect to a new server. - * - * @param ip The ip to connect to. - * @param port The port to connect to. - * @throws IOException wip - * @throws InterruptedException wip - */ - public void connect(String ip, String port) throws IOException, InterruptedException { - - if (running) { - this.close(); - } - - this.ip = ip; - this.port = port; - - running = true; - - this.tcpClient = new TcpClient(ip, Integer.parseInt(port)); - this.connection(); - - } - - /** - * - * Reconnects to previous address. - * - * @throws IOException wip - * @throws InterruptedException wip - */ - public void reconnect() throws IOException, InterruptedException { - this.connect(this.ip, this.port); - } - - /** - * - * Close connection to server. - * - * @throws IOException wip - * @throws InterruptedException wip - */ - public void close() throws IOException, InterruptedException { - - this.commandQueue.clear(); - running = false; - // Thread.currentThread().interrupt(); - - this.tcpClient.closeSocket(); - - this.tcpIn.interrupt(); - this.tcpOut.interrupt(); - this.tcpIn.join(); - this.tcpOut.join(); - - } - - /** - * DO NOT USE, USE START INSTEAD. - */ - public void run() { - - try { - this.reconnect(); - } catch (IOException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - } - - /** - * - * Starts a server thread. - * - * @param ip The address of the server to contact. - * @param port The port of the server. - */ - public static void start(String ip, String port) { - - try { - new Server(ip, port).start(); - } catch (IllegalArgumentException e) { - new Server("127.0.0.1", "5001").start(); // TODO: Doesn't do anything. - } - } - -} - diff --git a/src/main/java/org/toop/server/ServerConnection.java b/src/main/java/org/toop/server/ServerConnection.java new file mode 100644 index 0000000..6edec26 --- /dev/null +++ b/src/main/java/org/toop/server/ServerConnection.java @@ -0,0 +1,227 @@ +package org.toop.server; + +import org.toop.Main; +import org.toop.eventbus.Events; +import org.toop.eventbus.GlobalEventBus; +import org.toop.server.backend.TcpClient; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.concurrent.*; + +public final class ServerConnection implements Runnable { + + private static final Logger logger = LogManager.getLogger(Main.class); + + private final BlockingQueue commandQueue = new LinkedBlockingQueue<>(); + private final ExecutorService executor = Executors.newFixedThreadPool(2); + + String ip; + String port; + TcpClient tcpClient; + volatile boolean running = false; + + public ServerConnection(String ip, String port) { + this.ip = ip; + this.port = port; + this.initEvents(); + } + + public String getIp() { + return this.ip; + } + + public String getPort() { + return this.port; + } + + /** + * + * Sends a command to the server. + * + * @param command The command to send to the server. + * @param args The arguments for the command. + */ + public void sendCommandByString(String command, String... args) { + if (!ServerCommand.isValid(command)) { + logger.error("Invalid command: {}", command); + return; + } + + if (!this.running) { + logger.warn("Server has been stopped"); + return; + } + + for (int i = 0; i < args.length; i++) { + args[i] = args[i].trim(); + if (args[i].isEmpty()) { + throw new IllegalArgumentException("Empty argument"); // TODO: Error handling, just crashes atm. + } + } + + String[] fullCommand = new String[args.length + 1]; + fullCommand[0] = command; + System.arraycopy(args, 0, fullCommand, 1, args.length); + command = String.join(" ", fullCommand); + + this.commandQueue.add(command); + logger.info("Command '{}' added to the queue", command); + } + + private void initEvents() { + GlobalEventBus.subscribeAndRegister(Events.ServerEvents.Command.class, event + -> this.sendCommandByString(event.command(), event.args())); + + GlobalEventBus.subscribeAndRegister(Events.ServerEvents.Reconnect.class, event + -> { + try { + this.reconnect(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ChangeConnection.class, event + -> { + try { + this.connect(event.ip(), event.port()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + private void startWorkers() { + running = true; + this.executor.submit(this::inputLoop); + this.executor.submit(this::outputLoop); + } + + private void stopWorkers() { + running = false; + this.running = false; + this.commandQueue.clear(); + + if (this.tcpClient != null) { + try { + this.tcpClient.closeSocket(); + } catch (IOException e) { + logger.warn("Error closing client socket", e); + } + } + + this.executor.shutdownNow(); + } + + private void inputLoop() { + logger.info("Starting server read"); + try { + while (running) { + String received = tcpClient.readLine(); // blocks + if (received != null) { + logger.info("Received: '{}'", received); + } else { + break; + } + } + } catch (IOException e) { + logger.error("Error reading from server", e); + } + } + + private void outputLoop() { + logger.info("Starting server write"); + try { + while (this.running) { + String command = this.commandQueue.poll(500, TimeUnit.MILLISECONDS); + if (command != null) { + this.tcpClient.sendMessage(command); + logger.info("Sent command: '{}'", command); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (IOException e) { + logger.error("Error sending command", e); + } + } + + /** + * + * Connect to a new server. + * + * @param ip The ip to connect to. + * @param port The port to connect to. + * @throws IOException wip + * @throws InterruptedException wip + */ + public void connect(String ip, String port) throws IOException { + if (this.running) { + this.closeConnection(); + } + + this.ip = ip; + this.port = port; + this.tcpClient = new TcpClient(ip, Integer.parseInt(port)); + + this.startWorkers(); + } + + /** + * + * Reconnects to previous address. + * + * @throws IOException wip + */ + public void reconnect() throws IOException { + this.connect(this.ip, this.port); + } + + /** + * + * Close connection to server. + * + */ + public void closeConnection() { + this.stopWorkers(); + logger.info("Server connection closed"); + } + + /** + * DO NOT USE, USE START INSTEAD. + */ + @Override + public void run() { + try { + reconnect(); + } catch (IOException e) { + logger.error("Initial connection failed", e); + } + } + + /** + * + * Starts a server thread. + * + * @param ip The address of the server to contact. + * @param port The port of the server. + */ + public static ServerConnection startNew(String ip, String port) { + ServerConnection serverConnection = new ServerConnection(ip, port); + new Thread(serverConnection).start(); + return serverConnection; + } + + @Override + public String toString() { + return String.format( + "Server {ip: \"%s\", port: \"%s\", running: %s}", + this.ip, this.port, this.running + ); + } + +} + diff --git a/src/main/java/org/toop/server/ServerManager.java b/src/main/java/org/toop/server/ServerManager.java new file mode 100644 index 0000000..6fe07ef --- /dev/null +++ b/src/main/java/org/toop/server/ServerManager.java @@ -0,0 +1,97 @@ +package org.toop.server; + +import org.toop.eventbus.Events; +import org.toop.eventbus.GlobalEventBus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +public class ServerManager { + + private static final Logger logger = LogManager.getLogger(ServerManager.class); + + /** + * Map of serverId -> Server instances + */ + private final Map serverConnections = new ConcurrentHashMap<>(); + + public ServerManager() { + GlobalEventBus.subscribeAndRegister(Events.ServerEvents.StartConnectionRequest.class, this::handleStartConnectionRequest); + GlobalEventBus.subscribeAndRegister(Events.ServerEvents.StartConnection.class, this::handleStartConnection); + GlobalEventBus.subscribeAndRegister(Events.ServerEvents.Command.class, this::handleCommand); + GlobalEventBus.subscribeAndRegister(Events.ServerEvents.Reconnect.class, this::handleReconnect); + GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ChangeConnection.class, this::handleChangeConnection); + GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ForceCloseAllConnections.class, _ -> shutdownAll()); + GlobalEventBus.subscribeAndRegister(Events.ServerEvents.RequestsAllConnections.class, this::getAllConnections); + } + + private void handleStartConnectionRequest(Events.ServerEvents.StartConnectionRequest request) { + String connectionId = UUID.randomUUID().toString(); + ServerConnection connection = new ServerConnection(request.ip(), request.port()); + this.serverConnections.put(connectionId, connection); + new Thread(connection, "Connection-" + connectionId).start(); + + request.future().complete(connectionId); + } + + private void handleStartConnection(Events.ServerEvents.StartConnection event) { + String connectionId = UUID.randomUUID().toString(); + ServerConnection connection = new ServerConnection(event.ip(), event.port()); + this.serverConnections.put(connectionId, connection); + new Thread(connection, "Connection-" + connectionId).start(); + logger.info("Connected to server {} at {}:{}", connectionId, event.ip(), event.port()); + GlobalEventBus.post(new Events.ServerEvents.ConnectionEstablished(connectionId, event.ip(), event.port())); + } + + private void handleCommand(Events.ServerEvents.Command event) { + ServerConnection serverConnection = this.serverConnections.get(event.connectionId()); + if (serverConnection != null) { + serverConnection.sendCommandByString(event.command(), event.args()); + } else { + logger.warn("Server {} not found for command '{}'", event.connectionId(), event.command()); + } + } + + private void handleReconnect(Events.ServerEvents.Reconnect event) { + ServerConnection serverConnection = this.serverConnections.get(event.connectionId()); + if (serverConnection != null) { + try { + serverConnection.reconnect(); + logger.info("Server {} reconnected", event.connectionId()); + } catch (Exception e) { + logger.error("Server {} failed to reconnect", event.connectionId(), e); + GlobalEventBus.post(new Events.ServerEvents.CouldNotConnect(event.connectionId())); + } + } + } + + private void handleChangeConnection(Events.ServerEvents.ChangeConnection event) { + ServerConnection serverConnection = this.serverConnections.get(event.connectionId()); + if (serverConnection != null) { + try { + serverConnection.connect(event.ip(), event.port()); + logger.info("Server {} changed connection to {}:{}", event.connectionId(), event.ip(), event.port()); + } catch (Exception e) { + logger.error("Server {} failed to change connection", event.connectionId(), e); + GlobalEventBus.post(new Events.ServerEvents.CouldNotConnect(event.connectionId())); + } + } + } + + private void getAllConnections(Events.ServerEvents.RequestsAllConnections request) { + List a = new ArrayList<>(this.serverConnections.values()); + request.future().complete(a.toString()); + } + + public void shutdownAll() { + this.serverConnections.values().forEach(ServerConnection::closeConnection); + this.serverConnections.clear(); + logger.info("All servers shut down"); + } +} \ No newline at end of file diff --git a/src/main/java/org/toop/server/backend/Testsss.java b/src/main/java/org/toop/server/backend/Testsss.java index c6e1388..af00056 100644 --- a/src/main/java/org/toop/server/backend/Testsss.java +++ b/src/main/java/org/toop/server/backend/Testsss.java @@ -7,18 +7,18 @@ public class Testsss extends Thread { public Testsss() {} - public void run() { - while (true) { - try { - sleep(100); - GlobalEventBus.post(new Events.ServerEvents.command("HELP", "TEST")); - sleep(1000); - GlobalEventBus.post(new Events.ServerEvents.ChangeConnection("127.0.0.1", "5001")); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } +// public void run() { +// while (true) { +// try { +// sleep(100); +// GlobalEventBus.post(new Events.ServerEvents.command("HELP", "TEST")); +// sleep(1000); +// GlobalEventBus.post(new Events.ServerEvents.ChangeConnection("127.0.0.1", "5001")); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// } +// } public static void start(String keepEmpty) { new Testsss().start();