diff --git a/src/main/java/org/toop/server/backend/TcpServer.java b/src/main/java/org/toop/server/backend/TcpServer.java index a7f69a6..cc6ecc9 100644 --- a/src/main/java/org/toop/server/backend/TcpServer.java +++ b/src/main/java/org/toop/server/backend/TcpServer.java @@ -26,7 +26,7 @@ import java.util.concurrent.*; * - Subclasses should consume receivedQueue (or call getNewestCommand()) and * use sendQueue to send messages to all clients (or per-client, if implemented). */ -public class TcpServer implements Runnable { +public abstract class TcpServer implements Runnable { protected static final Logger logger = LogManager.getLogger(TcpServer.class); diff --git a/src/main/java/org/toop/server/frontend/ConnectionManager.java b/src/main/java/org/toop/server/frontend/ConnectionManager.java index ad76ceb..84ea2af 100644 --- a/src/main/java/org/toop/server/frontend/ConnectionManager.java +++ b/src/main/java/org/toop/server/frontend/ConnectionManager.java @@ -6,6 +6,7 @@ import org.toop.eventbus.GlobalEventBus; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -29,17 +30,21 @@ public class ConnectionManager { GlobalEventBus.subscribeAndRegister(Events.ServerEvents.StartConnection.class, this::handleStartConnection); GlobalEventBus.subscribeAndRegister(Events.ServerEvents.SendCommand.class, this::handleCommand); GlobalEventBus.subscribeAndRegister(Events.ServerEvents.Reconnect.class, this::handleReconnect); - GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ChangeConnection.class, this::handleChangeConnection); +// GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ChangeConnection.class, this::handleChangeConnection); GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ForceCloseAllConnections.class, _ -> shutdownAll()); GlobalEventBus.subscribeAndRegister(Events.ServerEvents.RequestsAllConnections.class, this::getAllConnections); } private String startConnectionRequest(String ip, String port) { String connectionId = UUID.randomUUID().toString(); - ServerConnection connection = new ServerConnection(connectionId, ip, port); - this.serverConnections.put(connectionId, connection); - new Thread(connection, "Connection-" + connectionId).start(); - logger.info("Connected to server {} at {}:{}", connectionId, ip, port); + try { + ServerConnection connection = new ServerConnection(connectionId, ip, port); + this.serverConnections.put(connectionId, connection); + new Thread(connection, "Connection-" + connectionId).start(); + logger.info("Connected to server {} at {}:{}", connectionId, ip, port); + } catch (IOException e) { + logger.error("{}", e); + } return connectionId; } @@ -77,18 +82,18 @@ public class ConnectionManager { } } - private void handleChangeConnection(Events.ServerEvents.ChangeConnection event) { - ServerConnection serverConnection = this.serverConnections.get(event.connectionId()); - if (serverConnection != null) { - try { - serverConnection.connect(event.ip(), event.port()); - logger.info("Server {} changed connection to {}:{}", event.connectionId(), event.ip(), event.port()); - } catch (Exception e) { - logger.error("Server {} failed to change connection", event.connectionId(), e); - GlobalEventBus.post(new Events.ServerEvents.CouldNotConnect(event.connectionId())); - } - } - } +// private void handleChangeConnection(Events.ServerEvents.ChangeConnection event) { +// ServerConnection serverConnection = this.serverConnections.get(event.connectionId()); +// if (serverConnection != null) { +// try { +// serverConnection.connect(event.ip(), event.port()); +// logger.info("Server {} changed connection to {}:{}", event.connectionId(), event.ip(), event.port()); +// } catch (Exception e) { +// logger.error("Server {} failed to change connection", event.connectionId(), e); +// GlobalEventBus.post(new Events.ServerEvents.CouldNotConnect(event.connectionId())); +// } +// } +// } TODO private void getAllConnections(Events.ServerEvents.RequestsAllConnections request) { List a = new ArrayList<>(this.serverConnections.values()); diff --git a/src/main/java/org/toop/server/frontend/ServerConnection.java b/src/main/java/org/toop/server/frontend/ServerConnection.java index ce0ed13..b4ee4c3 100644 --- a/src/main/java/org/toop/server/frontend/ServerConnection.java +++ b/src/main/java/org/toop/server/frontend/ServerConnection.java @@ -1,49 +1,37 @@ package org.toop.server.frontend; -import org.toop.Main; import org.toop.eventbus.Events; import org.toop.eventbus.GlobalEventBus; -import org.toop.server.backend.tictactoe.TicTacToeServerCommand; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; +import java.net.InetAddress; import java.util.concurrent.*; -public final class ServerConnection implements Runnable { +public final class ServerConnection extends TcpClient implements Runnable { private static final Logger logger = LogManager.getLogger(ServerConnection.class); - private final BlockingQueue commandQueue = new LinkedBlockingQueue<>(); + private final BlockingQueue receivedQueue = new LinkedBlockingQueue<>(); + private final BlockingQueue sendQueue = new LinkedBlockingQueue<>(); + private final ExecutorService executor = Executors.newFixedThreadPool(2); String uuid; - String ip; - String port; - TcpClient tcpClient; volatile boolean running = false; - public ServerConnection(String uuid, String ip, String port) { + public ServerConnection(String uuid, String ip, String port) throws IOException { + super(ip, Integer.parseInt(port)); // TODO: Verify if port is integer first, to avoid crash. this.uuid = uuid; - this.ip = ip; - this.port = port; this.initEvents(); } - public String getIp() { - return this.ip; - } - - public String getPort() { - return this.port; - } - /** * * Sends a command to the server. * - * @param command The command to send to the server. * @param args The arguments for the command. */ public void sendCommandByString(String... args) { @@ -59,26 +47,19 @@ public final class ServerConnection implements Runnable { String command = String.join(" ", args); - this.commandQueue.add(command); - logger.info("Command '{}' added to the queue", command); + this.sendQueue.add(command); + logger.info("Command '{}' added to the queue", command); // TODO: Better log, which uuid? + } + + private void addReceivedMessageToQueue(String message) { + try { + receivedQueue.put(message); + } catch (InterruptedException e) { + logger.error("{}", e); // TODO: Make more informative + } } private void initEvents() { - GlobalEventBus.subscribeAndRegister(Events.ServerEvents.Reconnect.class, _ -> { - try { - this.reconnect(); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - - GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ChangeConnection.class, event -> { - try { - this.connect(event.ip(), event.port()); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); } private void startWorkers() { @@ -89,27 +70,24 @@ public final class ServerConnection implements Runnable { private void stopWorkers() { this.running = false; - this.commandQueue.clear(); - - if (this.tcpClient != null) { + this.sendQueue.clear(); try { - this.tcpClient.closeSocket(); + this.closeSocket(); } catch (IOException e) { - logger.warn("Error closing client socket", e); + logger.warn("Error closing client socket", e); // TODO: Better log } - } this.executor.shutdownNow(); } private void inputLoop() { - logger.info("Starting {}:{} connection read", this.ip, this.port); + logger.info("Starting {}:{} connection read", this.serverAddress, this.serverPort); try { while (running) { - String received = tcpClient.readLine(); // blocks + String received = this.readLine(); // blocks if (received != null) { logger.info("Connection: {} received: '{}'", this.uuid, received); - GlobalEventBus.post(new Events.ServerEvents.ReceivedMessage(this.uuid, received)); + this.addReceivedMessageToQueue(received); } else { break; } @@ -120,12 +98,12 @@ public final class ServerConnection implements Runnable { } private void outputLoop() { - logger.info("Starting {}:{} connection write", this.ip, this.port); + logger.info("Starting {}:{} connection write", this.serverAddress, this.serverPort); try { while (this.running) { - String command = this.commandQueue.poll(500, TimeUnit.MILLISECONDS); + String command = this.sendQueue.poll(500, TimeUnit.MILLISECONDS); if (command != null) { - this.tcpClient.sendMessage(command); + this.sendMessage(command); logger.info("Sent command: '{}'", command); } } @@ -142,17 +120,14 @@ public final class ServerConnection implements Runnable { * * @param ip The ip to connect to. * @param port The port to connect to. - * @throws IOException wip - * @throws InterruptedException wip */ - public void connect(String ip, String port) throws IOException { + public void connect(InetAddress ip, int port) { if (this.running) { - this.closeConnection(); + this.closeConnection(); // Also stops workers. } - this.ip = ip; - this.port = port; - this.tcpClient = new TcpClient(ip, Integer.parseInt(port)); + this.serverAddress = ip; + this.serverPort = port; this.startWorkers(); } @@ -164,7 +139,7 @@ public final class ServerConnection implements Runnable { * @throws IOException wip */ public void reconnect() throws IOException { - this.connect(this.ip, this.port); + this.connect(this.serverAddress, this.serverPort); } /** @@ -174,12 +149,9 @@ public final class ServerConnection implements Runnable { */ public void closeConnection() { this.stopWorkers(); - logger.info("Server connection closed"); + logger.info("Closed connection: {}, to server {}:{}", this.uuid, this.serverAddress, this.serverPort); } - /** - * DO NOT USE, USE startNew INSTEAD. - */ @Override public void run() { try { @@ -189,24 +161,11 @@ public final class ServerConnection implements Runnable { } } - /** - * - * Starts a server thread. - * - * @param ip The address of the server to contact. - * @param port The port of the server. - */ - public static ServerConnection startNew(String uuid, String ip, String port) { - ServerConnection serverConnection = new ServerConnection(uuid, ip, port); - new Thread(serverConnection).start(); - return serverConnection; - } - @Override public String toString() { return String.format( "Server {ip: \"%s\", port: \"%s\", running: %s}", - this.ip, this.port, this.running + this.serverAddress, this.serverPort, this.running ); } diff --git a/src/main/java/org/toop/server/frontend/TcpClient.java b/src/main/java/org/toop/server/frontend/TcpClient.java index 62e3339..ff2bebc 100644 --- a/src/main/java/org/toop/server/frontend/TcpClient.java +++ b/src/main/java/org/toop/server/frontend/TcpClient.java @@ -7,7 +7,10 @@ import java.io.PrintWriter; import java.net.InetAddress; import java.net.Socket; -public class TcpClient { +/** + * A simple wrapper for creating TCP clients. + */ +public abstract class TcpClient { InetAddress serverAddress; int serverPort;