Servermanager and default tcpserver

This commit is contained in:
Bas de Jong
2025-09-15 19:04:21 +02:00
parent e2268a5d71
commit fe17404e80
9 changed files with 4524 additions and 83 deletions

BIN
hs_err_pid54860.jfr Normal file

Binary file not shown.

4301
hs_err_pid54860.log Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -2,65 +2,71 @@ package org.toop;
import org.toop.eventbus.Events; import org.toop.eventbus.Events;
import org.toop.eventbus.GlobalEventBus; import org.toop.eventbus.GlobalEventBus;
import org.toop.server.ServerConnection; import org.toop.server.backend.ServerManager;
import org.toop.server.ServerManager; import org.toop.server.frontend.ConnectionManager;
import org.toop.server.backend.Testsss;
import org.toop.server.backend.TcpServer; import org.toop.server.backend.TcpServer;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
public class Main { public class Main {
private static final Logger logger = LogManager.getLogger(Main.class); private static final Logger logger = LogManager.getLogger(Main.class);
public static void main(String[] args) throws ExecutionException, InterruptedException { public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
TcpServer server = new TcpServer(5001); // TcpServer server = new TcpServer(5001);
Thread serverThread = new Thread(server); // Thread serverThread = new Thread(server);
serverThread.start(); // serverThread.start();
ServerManager serverManager = new ServerManager(); initSystems();
GlobalEventBus.post(new Events.ServerEvents.StartServer("5001"));
CompletableFuture<String> future = new CompletableFuture<>(); CompletableFuture<String> future = new CompletableFuture<>();
GlobalEventBus.post(new Events.ServerEvents.StartConnectionRequest("127.0.0.1", "5001", future)); GlobalEventBus.post(new Events.ServerEvents.StartConnectionRequest("127.0.0.1", "5001", future));
String serverId = future.get(); String serverId = future.get();
logger.info("Server ID: " + serverId);
CompletableFuture<String> future2 = new CompletableFuture<>(); // for (int i = 0; i < 1; i++) {
GlobalEventBus.post(new Events.ServerEvents.StartConnectionRequest("127.0.0.2", "5002", future2)); // Thread thread = new Thread(() -> {
String serverId2 = future2.get(); //// logger.info("Server ID: {}", serverId);
logger.info("Server ID: " + serverId2); // GlobalEventBus.post(new Events.ServerEvents.Command(serverId, "HELP", "TEST"));
// });
// thread.start();
// }
CompletableFuture<String> future3 = new CompletableFuture<>(); GlobalEventBus.post(new Events.ServerEvents.Command(serverId, "HELP", "TEST"));
GlobalEventBus.post(new Events.ServerEvents.StartConnectionRequest("127.0.0.3", "5003", future3));
String serverId3 = future3.get();
logger.info("Server ID: " + serverId3);
CompletableFuture<String> future4 = new CompletableFuture<>(); GlobalEventBus.post(new Events.ServerEvents.ForceCloseAllConnections());
GlobalEventBus.post(new Events.ServerEvents.StartConnectionRequest("127.0.0.4", "5004", future4)); GlobalEventBus.post(new Events.ServerEvents.ForceCloseAllServers());
String serverId4 = future4.get();
logger.info("Server ID: " + serverId4);
CompletableFuture<String> future5 = new CompletableFuture<>(); //
GlobalEventBus.post(new Events.ServerEvents.StartConnectionRequest("127.0.0.5", "5005", future5)); // CompletableFuture<String> future2 = new CompletableFuture<>();
String serverId5 = future5.get(); // GlobalEventBus.post(new Events.ServerEvents.StartConnectionRequest("127.0.0.1", "5001", future2));
logger.info("Server ID: " + serverId5); // String serverId2 = future.get();
// logger.info("Server ID: {}", serverId2);
// GlobalEventBus.post(new Events.ServerEvents.Command(serverId2, "HELP", "TEST2"));
// GlobalEventBus.post(new Events.ServerEvents.StartConnection("127.0.0.1", "5001")); // GlobalEventBus.post(new Events.ServerEvents.StartConnection("127.0.0.1", "5001"));
// Server.startNew("127.0.0.1", "5001"); // Server.startNew("127.0.0.1", "5001");
// Testsss.start(""); // Used for testing server. // Testsss.start(""); // Used for testing server.
Window.start(""); // Window.start("");
CompletableFuture<String> future6 = new CompletableFuture<>(); // CompletableFuture<String> future6 = new CompletableFuture<>();
GlobalEventBus.post(new Events.ServerEvents.RequestsAllConnections(future6)); // GlobalEventBus.post(new Events.ServerEvents.RequestsAllConnections(future6));
String serverConnections = future6.get(); // String serverConnections = future6.get();
logger.info("Running connections: {}", serverConnections); // logger.info("Running connections: {}", serverConnections);
} }
public static void initSystems() {
new ServerManager();
new ConnectionManager();
}
} }

View File

@@ -1,11 +1,7 @@
package org.toop.eventbus; package org.toop.eventbus;
import org.toop.server.ServerConnection;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
/** /**
@@ -79,8 +75,18 @@ public class Events implements IEvents {
public record RequestsAllConnections(CompletableFuture<String> future) {} public record RequestsAllConnections(CompletableFuture<String> future) {}
public record RequestsAllServers(CompletableFuture<String> future) {}
public record ForceCloseAllConnections() {} public record ForceCloseAllConnections() {}
public record ForceCloseAllServers() {}
public record StartServer(String port) {}
public record StartServerRequest(String port, CompletableFuture<String> future) {}
public record ServerStarted(String uuid, String port) {}
/** /**
* *
* Triggers starting a server connection. * Triggers starting a server connection.
@@ -112,7 +118,7 @@ public class Events implements IEvents {
/** /**
* Triggers sending a command to a server. * Triggers sending a command to a server.
*/ */
public record Command(Object connectionId, String command, String... args) { } public record Command(String connectionId, String command, String... args) { }
/** /**
* Triggers when a command is sent to a server. * Triggers when a command is sent to a server.

View File

@@ -0,0 +1,69 @@
package org.toop.server.backend;
import org.toop.eventbus.Events;
import org.toop.eventbus.GlobalEventBus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
// TODO more methods.
public class ServerManager {
private static final Logger logger = LogManager.getLogger(ServerManager.class);
/**
* Map of serverId -> Server instances
*/
private final Map<String, TcpServer> servers = new ConcurrentHashMap<>();
/**
* Starts a server manager, to manage, servers.
*/
public ServerManager() {
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.StartServerRequest.class, this::handleStartServerRequest);
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.StartServer.class, this::handleStartServer);
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ForceCloseAllServers.class, _ -> shutdownAll());
}
private String startServer(String port) {
String serverId = UUID.randomUUID().toString();
try {
TcpServer server = new TcpServer(Integer.parseInt(port));
this.servers.put(serverId, server);
new Thread(server, "Server-" + serverId).start();
logger.info("Connected to server {} at {}", serverId, port);
return serverId;
} catch (Exception e) {
logger.error("Failed to start server", e);
return null;
}
}
private void handleStartServerRequest(Events.ServerEvents.StartServerRequest request) {
request.future().complete(this.startServer(request.port())); // TODO: Maybe post StartServer event.
}
private void handleStartServer(Events.ServerEvents.StartServer event) {
GlobalEventBus.post(new Events.ServerEvents.ServerStarted(
this.startServer(event.port()),
event.port()
));
}
private void getAllServers(Events.ServerEvents.RequestsAllServers request) {
ArrayList<TcpServer> a = new ArrayList<>(this.servers.values());
request.future().complete(a.toString());
}
public void shutdownAll() {
this.servers.values().forEach(TcpServer::stop);
this.servers.clear();
logger.info("All servers shut down");
}
}

View File

@@ -1,64 +1,121 @@
package org.toop.server.backend; package org.toop.server.backend;
import org.toop.Main;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.io.*; import java.io.*;
import java.net.*; import java.net.*;
import java.util.concurrent.*;
import static java.lang.Thread.sleep;
public class TcpServer implements Runnable { public class TcpServer implements Runnable {
private static final Logger logger = LogManager.getLogger(Main.class); private static final Logger logger = LogManager.getLogger(TcpServer.class);
private final ExecutorService executor = Executors.newFixedThreadPool(2);
private final BlockingQueue<String> receivedQueue = new LinkedBlockingQueue<>();
private final BlockingQueue<String> sendQueue = new LinkedBlockingQueue<>();
private final int WAIT_TIME = 500; // MS
private final int RETRY_ATTEMPTS = 3;
private int port; private int port;
private ServerSocket serverSocket = null;
private boolean running = true; private boolean running = true;
public TcpServer(int port) { public TcpServer(int port) throws IOException {
this.port = port; this.port = port;
this.serverSocket = new ServerSocket(port);
} }
@Override @Override
public void run() { public void run() {
try (ServerSocket serverSocket = new ServerSocket(port)) { try {
logger.info("Server listening on port " + port); logger.info("Server listening on port " + port);
while (running) { while (running) {
Socket clientSocket = serverSocket.accept(); Socket clientSocket = this.serverSocket.accept();
logger.info("Connected to client: " + clientSocket.getInetAddress()); logger.info("Connected to client: {}", clientSocket.getInetAddress());
// Handle each client in a new thread new Thread(() -> this.startWorkers(clientSocket)).start();
new Thread(() -> handleClient(clientSocket)).start();
} }
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
private void handleClient(Socket clientSocket) { private void startWorkers(Socket clientSocket) {
try ( running = true;
try {
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true) PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
) {
this.executor.submit(() -> this.inputLoop(in));
this.executor.submit(() -> this.outputLoop(out));
} catch (Exception e) {
logger.error("Server could not start, {}", e);
}
}
private void stopWorkers() {
this.running = false;
this.sendQueue.clear();
this.executor.shutdownNow();
}
private void inputLoop(BufferedReader in) {
logger.info("Starting {} connection read", this.port);
try {
String message; String message;
while ((message = in.readLine()) != null) { while (running && (message = in.readLine()) != null) {
logger.info("Received: " + message); logger.info("Received: '{}'", message);
out.println("Echo: " + message); if (!message.isEmpty()) {
String finalMessage = message;
new Thread(() -> {
for (int i = 0; i < this.RETRY_ATTEMPTS; i++) {
if (this.receivedQueue.offer(finalMessage)) break;
try {
sleep(this.WAIT_TIME);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
}
} }
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); logger.error("Error reading from server", e);
} finally { } finally {
try { try {
clientSocket.close(); this.serverSocket.close();
logger.info("Client disconnected."); logger.info("Client disconnected. {}", this.port);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
} }
private void outputLoop(PrintWriter out) {
logger.info("Starting {} connection write", this.port);
try {
while (this.running) {
String send = this.sendQueue.poll(this.WAIT_TIME, TimeUnit.MILLISECONDS);
if (send != null) {
out.println(send);
logger.info("Sent message from server {}: '{}'", this.port, send);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void stop() { public void stop() {
running = false; stopWorkers();
logger.info("sendQueue: {}", this.sendQueue.toString());
logger.info("receivedQueue: {}", this.receivedQueue.toString());
} }
} }

View File

@@ -1,4 +1,4 @@
package org.toop.server; package org.toop.server.frontend;
import org.toop.eventbus.Events; import org.toop.eventbus.Events;
import org.toop.eventbus.GlobalEventBus; import org.toop.eventbus.GlobalEventBus;
@@ -12,16 +12,19 @@ import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
public class ServerManager { public class ConnectionManager {
private static final Logger logger = LogManager.getLogger(ServerManager.class); private static final Logger logger = LogManager.getLogger(ConnectionManager.class);
/** /**
* Map of serverId -> Server instances * Map of serverId -> Server instances
*/ */
private final Map<String, ServerConnection> serverConnections = new ConcurrentHashMap<>(); private final Map<String, ServerConnection> serverConnections = new ConcurrentHashMap<>();
public ServerManager() { /**
* Starts a connection manager, to manage, connections.
*/
public ConnectionManager() {
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.StartConnectionRequest.class, this::handleStartConnectionRequest); GlobalEventBus.subscribeAndRegister(Events.ServerEvents.StartConnectionRequest.class, this::handleStartConnectionRequest);
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.StartConnection.class, this::handleStartConnection); GlobalEventBus.subscribeAndRegister(Events.ServerEvents.StartConnection.class, this::handleStartConnection);
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.Command.class, this::handleCommand); GlobalEventBus.subscribeAndRegister(Events.ServerEvents.Command.class, this::handleCommand);
@@ -31,22 +34,25 @@ public class ServerManager {
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.RequestsAllConnections.class, this::getAllConnections); GlobalEventBus.subscribeAndRegister(Events.ServerEvents.RequestsAllConnections.class, this::getAllConnections);
} }
private void handleStartConnectionRequest(Events.ServerEvents.StartConnectionRequest request) { private String startConnectionRequest(String ip, String port) {
String connectionId = UUID.randomUUID().toString(); String connectionId = UUID.randomUUID().toString();
ServerConnection connection = new ServerConnection(request.ip(), request.port()); ServerConnection connection = new ServerConnection(ip, port);
this.serverConnections.put(connectionId, connection); this.serverConnections.put(connectionId, connection);
new Thread(connection, "Connection-" + connectionId).start(); new Thread(connection, "Connection-" + connectionId).start();
logger.info("Connected to server {} at {}:{}", connectionId, ip, port);
return connectionId;
}
request.future().complete(connectionId); private void handleStartConnectionRequest(Events.ServerEvents.StartConnectionRequest request) {
request.future().complete(this.startConnectionRequest(request.ip(), request.port())); // TODO: Maybe post ConnectionEstablished event.
} }
private void handleStartConnection(Events.ServerEvents.StartConnection event) { private void handleStartConnection(Events.ServerEvents.StartConnection event) {
String connectionId = UUID.randomUUID().toString(); GlobalEventBus.post(new Events.ServerEvents.ConnectionEstablished(
ServerConnection connection = new ServerConnection(event.ip(), event.port()); this.startConnectionRequest(event.ip(), event.port()),
this.serverConnections.put(connectionId, connection); event.ip(),
new Thread(connection, "Connection-" + connectionId).start(); event.port()
logger.info("Connected to server {} at {}:{}", connectionId, event.ip(), event.port()); ));
GlobalEventBus.post(new Events.ServerEvents.ConnectionEstablished(connectionId, event.ip(), event.port()));
} }
private void handleCommand(Events.ServerEvents.Command event) { private void handleCommand(Events.ServerEvents.Command event) {

View File

@@ -1,9 +1,9 @@
package org.toop.server; package org.toop.server.frontend;
import org.toop.Main; import org.toop.Main;
import org.toop.eventbus.Events; import org.toop.eventbus.Events;
import org.toop.eventbus.GlobalEventBus; import org.toop.eventbus.GlobalEventBus;
import org.toop.server.backend.TcpClient; import org.toop.server.ServerCommand;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@@ -50,6 +50,8 @@ public final class ServerConnection implements Runnable {
return; return;
} }
System.out.println();
if (!this.running) { if (!this.running) {
logger.warn("Server has been stopped"); logger.warn("Server has been stopped");
return; return;
@@ -72,11 +74,7 @@ public final class ServerConnection implements Runnable {
} }
private void initEvents() { private void initEvents() {
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.Command.class, event GlobalEventBus.subscribeAndRegister(Events.ServerEvents.Reconnect.class, _ -> {
-> this.sendCommandByString(event.command(), event.args()));
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.Reconnect.class, event
-> {
try { try {
this.reconnect(); this.reconnect();
} catch (IOException e) { } catch (IOException e) {
@@ -84,8 +82,7 @@ public final class ServerConnection implements Runnable {
} }
}); });
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ChangeConnection.class, event GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ChangeConnection.class, event -> {
-> {
try { try {
this.connect(event.ip(), event.port()); this.connect(event.ip(), event.port());
} catch (IOException e) { } catch (IOException e) {
@@ -101,7 +98,6 @@ public final class ServerConnection implements Runnable {
} }
private void stopWorkers() { private void stopWorkers() {
running = false;
this.running = false; this.running = false;
this.commandQueue.clear(); this.commandQueue.clear();
@@ -117,7 +113,7 @@ public final class ServerConnection implements Runnable {
} }
private void inputLoop() { private void inputLoop() {
logger.info("Starting server read"); logger.info("Starting {}:{} connection read", this.ip, this.port);
try { try {
while (running) { while (running) {
String received = tcpClient.readLine(); // blocks String received = tcpClient.readLine(); // blocks
@@ -133,7 +129,7 @@ public final class ServerConnection implements Runnable {
} }
private void outputLoop() { private void outputLoop() {
logger.info("Starting server write"); logger.info("Starting {}:{} connection write", this.ip, this.port);
try { try {
while (this.running) { while (this.running) {
String command = this.commandQueue.poll(500, TimeUnit.MILLISECONDS); String command = this.commandQueue.poll(500, TimeUnit.MILLISECONDS);
@@ -191,7 +187,7 @@ public final class ServerConnection implements Runnable {
} }
/** /**
* DO NOT USE, USE START INSTEAD. * DO NOT USE, USE startNew INSTEAD.
*/ */
@Override @Override
public void run() { public void run() {

View File

@@ -1,4 +1,4 @@
package org.toop.server.backend; package org.toop.server.frontend;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;