Connection client simplified

This commit is contained in:
lieght
2025-09-19 14:13:13 +02:00
parent c2b6aea71e
commit 1d90df4e86
4 changed files with 60 additions and 93 deletions

View File

@@ -26,7 +26,7 @@ import java.util.concurrent.*;
* - Subclasses should consume receivedQueue (or call getNewestCommand()) and * - Subclasses should consume receivedQueue (or call getNewestCommand()) and
* use sendQueue to send messages to all clients (or per-client, if implemented). * use sendQueue to send messages to all clients (or per-client, if implemented).
*/ */
public class TcpServer implements Runnable { public abstract class TcpServer implements Runnable {
protected static final Logger logger = LogManager.getLogger(TcpServer.class); protected static final Logger logger = LogManager.getLogger(TcpServer.class);

View File

@@ -6,6 +6,7 @@ import org.toop.eventbus.GlobalEventBus;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -29,17 +30,21 @@ public class ConnectionManager {
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.StartConnection.class, this::handleStartConnection); GlobalEventBus.subscribeAndRegister(Events.ServerEvents.StartConnection.class, this::handleStartConnection);
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.SendCommand.class, this::handleCommand); GlobalEventBus.subscribeAndRegister(Events.ServerEvents.SendCommand.class, this::handleCommand);
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.Reconnect.class, this::handleReconnect); GlobalEventBus.subscribeAndRegister(Events.ServerEvents.Reconnect.class, this::handleReconnect);
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ChangeConnection.class, this::handleChangeConnection); // GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ChangeConnection.class, this::handleChangeConnection);
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ForceCloseAllConnections.class, _ -> shutdownAll()); GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ForceCloseAllConnections.class, _ -> shutdownAll());
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.RequestsAllConnections.class, this::getAllConnections); GlobalEventBus.subscribeAndRegister(Events.ServerEvents.RequestsAllConnections.class, this::getAllConnections);
} }
private String startConnectionRequest(String ip, String port) { private String startConnectionRequest(String ip, String port) {
String connectionId = UUID.randomUUID().toString(); String connectionId = UUID.randomUUID().toString();
try {
ServerConnection connection = new ServerConnection(connectionId, ip, port); ServerConnection connection = new ServerConnection(connectionId, ip, port);
this.serverConnections.put(connectionId, connection); this.serverConnections.put(connectionId, connection);
new Thread(connection, "Connection-" + connectionId).start(); new Thread(connection, "Connection-" + connectionId).start();
logger.info("Connected to server {} at {}:{}", connectionId, ip, port); logger.info("Connected to server {} at {}:{}", connectionId, ip, port);
} catch (IOException e) {
logger.error("{}", e);
}
return connectionId; return connectionId;
} }
@@ -77,18 +82,18 @@ public class ConnectionManager {
} }
} }
private void handleChangeConnection(Events.ServerEvents.ChangeConnection event) { // private void handleChangeConnection(Events.ServerEvents.ChangeConnection event) {
ServerConnection serverConnection = this.serverConnections.get(event.connectionId()); // ServerConnection serverConnection = this.serverConnections.get(event.connectionId());
if (serverConnection != null) { // if (serverConnection != null) {
try { // try {
serverConnection.connect(event.ip(), event.port()); // serverConnection.connect(event.ip(), event.port());
logger.info("Server {} changed connection to {}:{}", event.connectionId(), event.ip(), event.port()); // logger.info("Server {} changed connection to {}:{}", event.connectionId(), event.ip(), event.port());
} catch (Exception e) { // } catch (Exception e) {
logger.error("Server {} failed to change connection", event.connectionId(), e); // logger.error("Server {} failed to change connection", event.connectionId(), e);
GlobalEventBus.post(new Events.ServerEvents.CouldNotConnect(event.connectionId())); // GlobalEventBus.post(new Events.ServerEvents.CouldNotConnect(event.connectionId()));
} // }
} // }
} // } TODO
private void getAllConnections(Events.ServerEvents.RequestsAllConnections request) { private void getAllConnections(Events.ServerEvents.RequestsAllConnections request) {
List<ServerConnection> a = new ArrayList<>(this.serverConnections.values()); List<ServerConnection> a = new ArrayList<>(this.serverConnections.values());

View File

@@ -1,49 +1,37 @@
package org.toop.server.frontend; package org.toop.server.frontend;
import org.toop.Main;
import org.toop.eventbus.Events; import org.toop.eventbus.Events;
import org.toop.eventbus.GlobalEventBus; import org.toop.eventbus.GlobalEventBus;
import org.toop.server.backend.tictactoe.TicTacToeServerCommand;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.util.concurrent.*; import java.util.concurrent.*;
public final class ServerConnection implements Runnable { public final class ServerConnection extends TcpClient implements Runnable {
private static final Logger logger = LogManager.getLogger(ServerConnection.class); private static final Logger logger = LogManager.getLogger(ServerConnection.class);
private final BlockingQueue<String> commandQueue = new LinkedBlockingQueue<>(); private final BlockingQueue<String> receivedQueue = new LinkedBlockingQueue<>();
private final BlockingQueue<String> sendQueue = new LinkedBlockingQueue<>();
private final ExecutorService executor = Executors.newFixedThreadPool(2); private final ExecutorService executor = Executors.newFixedThreadPool(2);
String uuid; String uuid;
String ip;
String port;
TcpClient tcpClient;
volatile boolean running = false; volatile boolean running = false;
public ServerConnection(String uuid, String ip, String port) { public ServerConnection(String uuid, String ip, String port) throws IOException {
super(ip, Integer.parseInt(port)); // TODO: Verify if port is integer first, to avoid crash.
this.uuid = uuid; this.uuid = uuid;
this.ip = ip;
this.port = port;
this.initEvents(); this.initEvents();
} }
public String getIp() {
return this.ip;
}
public String getPort() {
return this.port;
}
/** /**
* *
* Sends a command to the server. * Sends a command to the server.
* *
* @param command The command to send to the server.
* @param args The arguments for the command. * @param args The arguments for the command.
*/ */
public void sendCommandByString(String... args) { public void sendCommandByString(String... args) {
@@ -59,26 +47,19 @@ public final class ServerConnection implements Runnable {
String command = String.join(" ", args); String command = String.join(" ", args);
this.commandQueue.add(command); this.sendQueue.add(command);
logger.info("Command '{}' added to the queue", command); logger.info("Command '{}' added to the queue", command); // TODO: Better log, which uuid?
}
private void addReceivedMessageToQueue(String message) {
try {
receivedQueue.put(message);
} catch (InterruptedException e) {
logger.error("{}", e); // TODO: Make more informative
}
} }
private void initEvents() { private void initEvents() {
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.Reconnect.class, _ -> {
try {
this.reconnect();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ChangeConnection.class, event -> {
try {
this.connect(event.ip(), event.port());
} catch (IOException e) {
throw new RuntimeException(e);
}
});
} }
private void startWorkers() { private void startWorkers() {
@@ -89,27 +70,24 @@ public final class ServerConnection implements Runnable {
private void stopWorkers() { private void stopWorkers() {
this.running = false; this.running = false;
this.commandQueue.clear(); this.sendQueue.clear();
if (this.tcpClient != null) {
try { try {
this.tcpClient.closeSocket(); this.closeSocket();
} catch (IOException e) { } catch (IOException e) {
logger.warn("Error closing client socket", e); logger.warn("Error closing client socket", e); // TODO: Better log
}
} }
this.executor.shutdownNow(); this.executor.shutdownNow();
} }
private void inputLoop() { private void inputLoop() {
logger.info("Starting {}:{} connection read", this.ip, this.port); logger.info("Starting {}:{} connection read", this.serverAddress, this.serverPort);
try { try {
while (running) { while (running) {
String received = tcpClient.readLine(); // blocks String received = this.readLine(); // blocks
if (received != null) { if (received != null) {
logger.info("Connection: {} received: '{}'", this.uuid, received); logger.info("Connection: {} received: '{}'", this.uuid, received);
GlobalEventBus.post(new Events.ServerEvents.ReceivedMessage(this.uuid, received)); this.addReceivedMessageToQueue(received);
} else { } else {
break; break;
} }
@@ -120,12 +98,12 @@ public final class ServerConnection implements Runnable {
} }
private void outputLoop() { private void outputLoop() {
logger.info("Starting {}:{} connection write", this.ip, this.port); logger.info("Starting {}:{} connection write", this.serverAddress, this.serverPort);
try { try {
while (this.running) { while (this.running) {
String command = this.commandQueue.poll(500, TimeUnit.MILLISECONDS); String command = this.sendQueue.poll(500, TimeUnit.MILLISECONDS);
if (command != null) { if (command != null) {
this.tcpClient.sendMessage(command); this.sendMessage(command);
logger.info("Sent command: '{}'", command); logger.info("Sent command: '{}'", command);
} }
} }
@@ -142,17 +120,14 @@ public final class ServerConnection implements Runnable {
* *
* @param ip The ip to connect to. * @param ip The ip to connect to.
* @param port The port to connect to. * @param port The port to connect to.
* @throws IOException wip
* @throws InterruptedException wip
*/ */
public void connect(String ip, String port) throws IOException { public void connect(InetAddress ip, int port) {
if (this.running) { if (this.running) {
this.closeConnection(); this.closeConnection(); // Also stops workers.
} }
this.ip = ip; this.serverAddress = ip;
this.port = port; this.serverPort = port;
this.tcpClient = new TcpClient(ip, Integer.parseInt(port));
this.startWorkers(); this.startWorkers();
} }
@@ -164,7 +139,7 @@ public final class ServerConnection implements Runnable {
* @throws IOException wip * @throws IOException wip
*/ */
public void reconnect() throws IOException { public void reconnect() throws IOException {
this.connect(this.ip, this.port); this.connect(this.serverAddress, this.serverPort);
} }
/** /**
@@ -174,12 +149,9 @@ public final class ServerConnection implements Runnable {
*/ */
public void closeConnection() { public void closeConnection() {
this.stopWorkers(); this.stopWorkers();
logger.info("Server connection closed"); logger.info("Closed connection: {}, to server {}:{}", this.uuid, this.serverAddress, this.serverPort);
} }
/**
* DO NOT USE, USE startNew INSTEAD.
*/
@Override @Override
public void run() { public void run() {
try { try {
@@ -189,24 +161,11 @@ public final class ServerConnection implements Runnable {
} }
} }
/**
*
* Starts a server thread.
*
* @param ip The address of the server to contact.
* @param port The port of the server.
*/
public static ServerConnection startNew(String uuid, String ip, String port) {
ServerConnection serverConnection = new ServerConnection(uuid, ip, port);
new Thread(serverConnection).start();
return serverConnection;
}
@Override @Override
public String toString() { public String toString() {
return String.format( return String.format(
"Server {ip: \"%s\", port: \"%s\", running: %s}", "Server {ip: \"%s\", port: \"%s\", running: %s}",
this.ip, this.port, this.running this.serverAddress, this.serverPort, this.running
); );
} }

View File

@@ -7,7 +7,10 @@ import java.io.PrintWriter;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.Socket; import java.net.Socket;
public class TcpClient { /**
* A simple wrapper for creating TCP clients.
*/
public abstract class TcpClient {
InetAddress serverAddress; InetAddress serverAddress;
int serverPort; int serverPort;