future) {}
+
+ public record CloseClient(String connectionId) {}
+
+ /**
+ * Event to start a new client connection to a server.
+ *
+ * This event is typically posted to the {@code GlobalEventBus} to initiate the creation of
+ * a client connection, and carries all information needed to establish that connection:
+ *
+ * - A factory for creating the Netty handler that will manage the connection
+ *
+ * - The server's IP address and port
+ *
+ * - A unique event identifier for correlation with follow-up events
+ *
+ *
+ *
+ * The {@link #eventId()} allows callers to correlate the {@code StartClient} event
+ * with subsequent success/failure events. For example, a {@code StartClientSuccess}
+ * or {@code StartClientFailure} event may carry the same {@code eventId}.
+ *
+ *
+ * @param handlerFactory Factory for constructing a {@link NetworkingGameClientHandler}.
+ * @param ip The IP address of the server to connect to.
+ * @param port The port number of the server to connect to.
+ * @param eventId A unique identifier for this event, typically injected
+ * automatically by the {@link org.toop.eventbus.EventPublisher}.
+ */
+ public record StartClient(
+ Supplier extends NetworkingGameClientHandler> handlerFactory,
+ String ip,
+ int port,
+ String eventId
+ ) implements EventWithUuid {
+
+ /**
+ * Returns a map representation of this event, where keys are record component names
+ * and values are their corresponding values. Useful for generic logging, debugging,
+ * or serializing events without hardcoding field names.
+ *
+ * @return a {@code Map} containing field names and values
+ */
+ @Override
+ public Map result() {
+ return Stream.of(this.getClass().getRecordComponents())
+ .collect(Collectors.toMap(
+ RecordComponent::getName,
+ rc -> {
+ try {
+ return rc.getAccessor().invoke(this);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ ));
+ }
+
+ /**
+ * Returns the unique event identifier used for correlating this event.
+ *
+ * @return the event ID string
+ */
+ @Override
+ public String eventId() {
+ return this.eventId;
+ }
+ }
+
+ /**
+ * TODO: Update docs new input.
+ * BLOCKING Triggers starting a server connection and returns a future.
+ *
+ * @param ip The IP address of the server to connect to.
+ * @param port The port of the server to connect to.
+ * @param future Returns the UUID of the connection, when connection is established.
+ */
+ public record StartClientRequest(
+ Supplier extends NetworkingGameClientHandler> handlerFactory,
+ String ip, int port, CompletableFuture future) {}
+
+ /**
+ * BLOCKING Triggers starting a server connection and returns a future.
+ *
+ * @param ip The IP address of the server to connect to.
+ * @param port The port of the server to connect to.
+ */
+ public record StartClientSuccess(Object connectionId, String ip, int port, String eventId)
+ implements EventWithUuid {
+ @Override
+ public Map result() {
+ return Stream.of(this.getClass().getRecordComponents())
+ .collect(Collectors.toMap(
+ RecordComponent::getName,
+ rc -> {
+ try {
+ return rc.getAccessor().invoke(this);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ ));
+ }
+
+ @Override
+ public String eventId() {
+ return this.eventId;
+ }
+ }
+
+ /**
+ * Triggers sending a command to a server.
+ *
+ * @param connectionId The UUID of the connection to send the command on.
+ * @param args The command arguments.
+ */
+ public record SendCommand(String connectionId, String... args) {}
+
+ /**
+ * WIP Triggers when a command is sent to a server.
+ *
+ * @param command The TicTacToeServer instance that executed the command.
+ * @param args The command arguments.
+ * @param result The result returned from executing the command.
+ */
+ public record OnCommand(
+ TicTacToeServer command, String[] args, String result) {} // TODO old
+
+ /**
+ * Triggers reconnecting to a previous address.
+ *
+ * @param connectionId The identifier of the connection being reconnected.
+ */
+ public record Reconnect(Object connectionId) {}
+
+
+ /**
+ * Triggers when the server client receives a message.
+ *
+ * @param ConnectionUuid The UUID of the connection that received the message.
+ * @param message The message received.
+ */
+ public record ReceivedMessage(String ConnectionUuid, String message) {}
+
+ /**
+ * Triggers changing connection to a new address.
+ *
+ * @param connectionId The identifier of the connection being changed.
+ * @param ip The new IP address.
+ * @param port The new port.
+ */
+ public record ChangeClient(Object connectionId, String ip, int port) {}
+
+
+ /**
+ * Triggers when the server couldn't connect to the desired address.
+ *
+ * @param connectionId The identifier of the connection that failed.
+ */
+ public record CouldNotConnect(Object connectionId) {}
+
+ /** WIP Triggers when a connection closes. */
+ public record ClosedConnection() {}
+
+}
diff --git a/src/main/java/org/toop/frontend/ConnectionManager.java b/src/main/java/org/toop/frontend/ConnectionManager.java
deleted file mode 100644
index e0ec11f..0000000
--- a/src/main/java/org/toop/frontend/ConnectionManager.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package org.toop.frontend;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.toop.eventbus.Events;
-import org.toop.eventbus.GlobalEventBus;
-
-public class ConnectionManager {
-
- private static final Logger logger = LogManager.getLogger(ConnectionManager.class);
-
- /** Map of serverId -> Server instances */
- private final Map serverConnections = new ConcurrentHashMap<>();
-
- /** Starts a connection manager, to manage, connections. */
- public ConnectionManager() {
- GlobalEventBus.subscribeAndRegister(
- Events.ServerEvents.StartConnectionRequest.class,
- this::handleStartConnectionRequest);
- GlobalEventBus.subscribeAndRegister(
- Events.ServerEvents.StartConnection.class, this::handleStartConnection);
- GlobalEventBus.subscribeAndRegister(
- Events.ServerEvents.SendCommand.class, this::handleCommand);
- GlobalEventBus.subscribeAndRegister(
- Events.ServerEvents.Reconnect.class, this::handleReconnect);
- // GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ChangeConnection.class,
- // this::handleChangeConnection);
- GlobalEventBus.subscribeAndRegister(
- Events.ServerEvents.ForceCloseAllConnections.class, _ -> shutdownAll());
- GlobalEventBus.subscribeAndRegister(
- Events.ServerEvents.RequestsAllConnections.class, this::getAllConnections);
- }
-
- private String startConnectionRequest(String ip, String port) {
- String connectionId = UUID.randomUUID().toString();
- try {
- if (!port.matches("[0-9]+")) {
- port = "0000";
- }
- ServerConnection connection = new ServerConnection(connectionId, ip, port);
- this.serverConnections.put(connectionId, connection);
- new Thread(connection, "Connection-" + connectionId).start();
- logger.info("Connected to server {} at {}:{}", connectionId, ip, port);
- } catch (IOException e) {
- logger.error("{}", e);
- }
- return connectionId;
- }
-
- private void handleStartConnectionRequest(Events.ServerEvents.StartConnectionRequest request) {
- request.future()
- .complete(
- this.startConnectionRequest(
- request.ip(),
- request.port())); // TODO: Maybe post ConnectionEstablished event.
- }
-
- private void handleStartConnection(Events.ServerEvents.StartConnection event) {
- GlobalEventBus.post(
- new Events.ServerEvents.ConnectionEstablished(
- this.startConnectionRequest(event.ip(), event.port()),
- event.ip(),
- event.port()));
- }
-
- private void handleCommand(
- Events.ServerEvents.SendCommand
- event) { // TODO: Move this to ServerConnection class, keep it internal.
- ServerConnection serverConnection = this.serverConnections.get(event.connectionId());
- if (serverConnection != null) {
- serverConnection.sendCommandByString(event.args());
- } else {
- logger.warn("Server {} not found for command '{}'", event.connectionId(), event.args());
- }
- }
-
- private void handleReconnect(Events.ServerEvents.Reconnect event) {
- ServerConnection serverConnection = this.serverConnections.get(event.connectionId());
- if (serverConnection != null) {
- try {
- serverConnection.reconnect();
- logger.info("Server {} reconnected", event.connectionId());
- } catch (Exception e) {
- logger.error("Server {} failed to reconnect", event.connectionId(), e);
- GlobalEventBus.post(new Events.ServerEvents.CouldNotConnect(event.connectionId()));
- }
- }
- }
-
- // private void handleChangeConnection(Events.ServerEvents.ChangeConnection event) {
- // ServerConnection serverConnection = this.serverConnections.get(event.connectionId());
- // if (serverConnection != null) {
- // try {
- // serverConnection.connect(event.ip(), event.port());
- // logger.info("Server {} changed connection to {}:{}", event.connectionId(),
- // event.ip(), event.port());
- // } catch (Exception e) {
- // logger.error("Server {} failed to change connection", event.connectionId(),
- // e);
- // GlobalEventBus.post(new
- // Events.ServerEvents.CouldNotConnect(event.connectionId()));
- // }
- // }
- // } TODO
-
- private void getAllConnections(Events.ServerEvents.RequestsAllConnections request) {
- List a = new ArrayList<>(this.serverConnections.values());
- request.future().complete(a.toString());
- }
-
- public void shutdownAll() {
- this.serverConnections.values().forEach(ServerConnection::closeConnection);
- this.serverConnections.clear();
- logger.info("All servers shut down");
- }
-}
diff --git a/src/main/java/org/toop/frontend/UI/RemoteGameSelector.java b/src/main/java/org/toop/frontend/UI/RemoteGameSelector.java
index c06d7fe..ae363f6 100644
--- a/src/main/java/org/toop/frontend/UI/RemoteGameSelector.java
+++ b/src/main/java/org/toop/frontend/UI/RemoteGameSelector.java
@@ -7,9 +7,11 @@ import java.util.concurrent.ExecutionException;
import javax.swing.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.toop.eventbus.Events;
+import org.toop.eventbus.events.Events;
import org.toop.eventbus.GlobalEventBus;
+import org.toop.eventbus.events.NetworkEvents;
import org.toop.frontend.games.LocalTicTacToe;
+import org.toop.frontend.networking.NetworkingGameClientHandler;
public class RemoteGameSelector {
private static final Logger logger = LogManager.getLogger(RemoteGameSelector.class);
@@ -56,7 +58,7 @@ public class RemoteGameSelector {
CompletableFuture serverIdFuture = new CompletableFuture<>();
GlobalEventBus.post(
new Events.ServerEvents.StartServerRequest(
- portTextField.getText(),
+ Integer.parseInt(portTextField.getText()), // TODO: Unsafe parse
Objects.requireNonNull(gameSelectorBox.getSelectedItem())
.toString()
.toLowerCase()
@@ -71,9 +73,10 @@ public class RemoteGameSelector {
CompletableFuture connectionIdFuture = new CompletableFuture<>();
GlobalEventBus.post(
- new Events.ServerEvents.StartConnectionRequest(
+ new NetworkEvents.StartClientRequest(
+ NetworkingGameClientHandler::new,
ipTextField.getText(),
- portTextField.getText(),
+ Integer.parseInt(portTextField.getText()), // TODO: Not safe parsing
connectionIdFuture));
String connectionId;
try {
@@ -83,7 +86,7 @@ public class RemoteGameSelector {
} // TODO: Better error handling to not crash the system.
GlobalEventBus.subscribeAndRegister(
- Events.ServerEvents.ReceivedMessage.class,
+ NetworkEvents.ReceivedMessage.class,
event -> {
if (event.message().equalsIgnoreCase("ok")) {
logger.info("received ok from server.");
@@ -93,7 +96,7 @@ public class RemoteGameSelector {
.toLowerCase()
.replace("gameid ", "");
GlobalEventBus.post(
- new Events.ServerEvents.SendCommand(
+ new NetworkEvents.SendCommand(
"start_game " + gameId));
} else {
logger.info("{}", event.message());
@@ -101,7 +104,7 @@ public class RemoteGameSelector {
});
GlobalEventBus.post(
- new Events.ServerEvents.SendCommand(
+ new NetworkEvents.SendCommand(
connectionId,
"create_game",
nameTextField.getText(),
@@ -127,7 +130,7 @@ public class RemoteGameSelector {
frame.remove(mainMenu);
localTicTacToe =
LocalTicTacToe.createRemote(
- ipTextField.getText(), portTextField.getText());
+ ipTextField.getText(), Integer.parseInt(portTextField.getText())); // TODO: Unsafe parse
UIGameBoard ttt = new UIGameBoard(localTicTacToe, this); // TODO: Fix later
frame.add(ttt.getTTTPanel()); // TODO: Fix later
frame.revalidate();
diff --git a/src/main/java/org/toop/frontend/games/LocalTicTacToe.java b/src/main/java/org/toop/frontend/games/LocalTicTacToe.java
index 9fa5ce6..d14d0bb 100644
--- a/src/main/java/org/toop/frontend/games/LocalTicTacToe.java
+++ b/src/main/java/org/toop/frontend/games/LocalTicTacToe.java
@@ -3,9 +3,11 @@ package org.toop.frontend.games;
import java.util.concurrent.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.toop.eventbus.Events;
+import org.toop.eventbus.events.Events;
import org.toop.eventbus.GlobalEventBus;
+import org.toop.eventbus.events.NetworkEvents;
import org.toop.frontend.UI.UIGameBoard;
+import org.toop.frontend.networking.NetworkingGameClientHandler;
import org.toop.game.tictactoe.GameBase;
import org.toop.game.tictactoe.TicTacToe;
import org.toop.game.tictactoe.ai.MinMaxTicTacToe;
@@ -63,13 +65,12 @@ public class LocalTicTacToe { // TODO: Implement runnable
* @param ip The IP of the server to connect to.
* @param port The port of the server to connect to.
*/
- private LocalTicTacToe(String ip, String port) {
+ private LocalTicTacToe(String ip, int port) {
this.receivedMessageListener =
- GlobalEventBus.subscribe(
- Events.ServerEvents.ReceivedMessage.class, this::receiveMessageAction);
+ GlobalEventBus.subscribe(this::receiveMessageAction);
GlobalEventBus.register(this.receivedMessageListener);
this.connectionId = this.createConnection(ip, port);
- this.createGame(ip, port);
+ this.createGame("X", "O");
this.isLocal = false;
this.executor.submit(this::remoteGameThread);
}
@@ -93,11 +94,11 @@ public class LocalTicTacToe { // TODO: Implement runnable
return new LocalTicTacToe(aiPlayers);
}
- public static LocalTicTacToe createRemote(String ip, String port) {
+ public static LocalTicTacToe createRemote(String ip, int port) {
return new LocalTicTacToe(ip, port);
}
- private String createServer(String port) {
+ private String createServer(int port) {
CompletableFuture serverIdFuture = new CompletableFuture<>();
GlobalEventBus.post(
new Events.ServerEvents.StartServerRequest(port, "tictactoe", serverIdFuture));
@@ -109,10 +110,11 @@ public class LocalTicTacToe { // TODO: Implement runnable
return null;
}
- private String createConnection(String ip, String port) {
+ private String createConnection(String ip, int port) {
CompletableFuture connectionIdFuture = new CompletableFuture<>();
GlobalEventBus.post(
- new Events.ServerEvents.StartConnectionRequest(
+ new NetworkEvents.StartClientRequest(
+ NetworkingGameClientHandler::new,
ip,
port,
connectionIdFuture)); // TODO: what if server couldn't be started with port.
@@ -232,7 +234,7 @@ public class LocalTicTacToe { // TODO: Implement runnable
this.endListeners();
}
- private void receiveMessageAction(Events.ServerEvents.ReceivedMessage receivedMessage) {
+ private void receiveMessageAction(NetworkEvents.ReceivedMessage receivedMessage) {
if (!receivedMessage.ConnectionUuid().equals(this.connectionId)) {
return;
}
@@ -247,7 +249,7 @@ public class LocalTicTacToe { // TODO: Implement runnable
}
private void sendCommand(String... args) {
- GlobalEventBus.post(new Events.ServerEvents.SendCommand(this.connectionId, args));
+ GlobalEventBus.post(new NetworkEvents.SendCommand(this.connectionId, args));
}
private void endListeners() {
diff --git a/src/main/java/org/toop/frontend/graphics/node/NodeManager.java b/src/main/java/org/toop/frontend/graphics/node/NodeManager.java
index 00bb2d7..9bbef84 100644
--- a/src/main/java/org/toop/frontend/graphics/node/NodeManager.java
+++ b/src/main/java/org/toop/frontend/graphics/node/NodeManager.java
@@ -4,6 +4,7 @@ import java.util.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.eventbus.*;
+import org.toop.eventbus.events.Events;
import org.toop.frontend.graphics.Shader;
public class NodeManager {
diff --git a/src/main/java/org/toop/frontend/networking/NetworkingClient.java b/src/main/java/org/toop/frontend/networking/NetworkingClient.java
new file mode 100644
index 0000000..995ae86
--- /dev/null
+++ b/src/main/java/org/toop/frontend/networking/NetworkingClient.java
@@ -0,0 +1,139 @@
+package org.toop.frontend.networking;
+
+import com.google.common.base.Supplier;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioIoHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.LineBasedFrameDecoder;
+import io.netty.handler.codec.string.StringDecoder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class NetworkingClient {
+ private static final Logger logger = LogManager.getLogger(NetworkingClient.class);
+
+ final Bootstrap bootstrap = new Bootstrap();
+ final EventLoopGroup workerGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
+
+ private String connectionUuid;
+ private Channel channel;
+ private NetworkingGameClientHandler handler;
+
+ public NetworkingClient(
+ Supplier extends NetworkingGameClientHandler> handlerFactory,
+ String host,
+ int port) {
+ try {
+ this.bootstrap.group(this.workerGroup);
+ this.bootstrap.channel(NioSocketChannel.class);
+ this.bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+ this.bootstrap.handler(new ChannelInitializer() {
+ @Override
+ public void initChannel(SocketChannel ch) {
+ handler = handlerFactory.get();
+
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast(new LineBasedFrameDecoder(1024)); // split at \n
+ pipeline.addLast(new StringDecoder()); // bytes -> String
+ pipeline.addLast(handler);
+ }
+ });
+ ChannelFuture channelFuture = this.bootstrap.connect(host, port).sync();
+ this.channel = channelFuture.channel();
+ } catch (Exception e) {
+ logger.error("Failed to create networking client instance", e);
+ }
+ }
+
+ public NetworkingGameClientHandler getHandler() {
+ return handler;
+ }
+
+ public void setConnectionUuid(String connectionUuid) {
+ this.connectionUuid = connectionUuid;
+ }
+
+ public boolean isChannelActive() {
+ return this.channel != null && this.channel.isActive();
+ }
+
+ public void writeAndFlush(String msg) {
+ String literalMsg = msg.replace("\n", "\\n").replace("\r", "\\r");
+ if (isChannelActive()) {
+ this.channel.writeAndFlush(msg);
+ logger.info("Connection {} sent message: {}", this.channel.remoteAddress(), literalMsg);
+ } else {
+ logger.warn("Cannot send message: {}, connection inactive.", literalMsg);
+ }
+ }
+
+ public void writeAndFlushnl(String msg) {
+ if (isChannelActive()) {
+ this.channel.writeAndFlush(msg + "\n");
+ logger.info("Connection {} sent message: {}", this.channel.remoteAddress(), msg);
+ } else {
+ logger.warn("Cannot send message: {}, connection inactive.", msg);
+ }
+ }
+
+ public void login(String username) {
+ this.writeAndFlush("login " + username + "\n");
+ }
+
+ public void logout() {
+ this.writeAndFlush("logout\n");
+ }
+
+ public void sendMove(int move) {
+ this.writeAndFlush("move " + move + "\n"); // append \n so server receives a full line
+ }
+
+ public void getGamelist() {
+ this.writeAndFlush("get gamelist\n");
+ }
+
+ public void getPlayerlist() {
+ this.writeAndFlush("get playerlist\n");
+ }
+
+ public void subscribe(String gameType) {
+ this.writeAndFlush("subscribe " + gameType + "\n");
+ }
+
+ public void forfeit() {
+ this.writeAndFlush("forfeit\n");
+ }
+
+ public void challenge(String playerName, String gameType) {
+ this.writeAndFlush("challenge " + playerName + " " + gameType + "\n");
+ }
+
+ public void acceptChallenge(String challengeNumber) {
+ this.writeAndFlush("challenge accept " + challengeNumber + "\n");
+ }
+
+ public void sendChatMessage(String message) {
+ this.writeAndFlush("message " + "\"" + message + "\"" + "\n");
+ }
+
+ public void help(String command) {
+ this.writeAndFlush("help " + command + "\n");
+ }
+
+ public void closeConnection() {
+ if (this.channel != null && this.channel.isActive()) {
+ this.channel.close().addListener(future -> {
+ if (future.isSuccess()) {
+ logger.info("Connection {} closed successfully", this.channel.remoteAddress());
+ } else {
+ logger.error("Error closing connection {}. Error: {}",
+ this.channel.remoteAddress(),
+ future.cause().getMessage());
+ }
+ });
+ }
+ }
+
+}
diff --git a/src/main/java/org/toop/frontend/networking/NetworkingClientManager.java b/src/main/java/org/toop/frontend/networking/NetworkingClientManager.java
new file mode 100644
index 0000000..d41c5b8
--- /dev/null
+++ b/src/main/java/org/toop/frontend/networking/NetworkingClientManager.java
@@ -0,0 +1,134 @@
+package org.toop.frontend.networking;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.base.Supplier;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.toop.eventbus.events.Events;
+import org.toop.eventbus.GlobalEventBus;
+import org.toop.eventbus.events.NetworkEvents;
+
+public class NetworkingClientManager {
+
+ private static final Logger logger = LogManager.getLogger(NetworkingClientManager.class);
+
+ /** Map of serverId -> Server instances */
+ private final Map networkClients = new ConcurrentHashMap<>();
+
+ /** Starts a connection manager, to manage, connections. */
+ public NetworkingClientManager() {
+ GlobalEventBus.subscribeAndRegister(this::handleStartClientRequest);
+ GlobalEventBus.subscribeAndRegister(this::handleStartClient);
+ GlobalEventBus.subscribeAndRegister(this::handleCommand);
+ GlobalEventBus.subscribeAndRegister(this::handleCloseClient);
+// GlobalEventBus.subscribeAndRegister(
+// Events.ServerEvents.Reconnect.class, this::handleReconnect);
+ // GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ChangeConnection.class,
+ // this::handleChangeConnection);
+ GlobalEventBus.subscribeAndRegister(this::shutdownAll);
+ GlobalEventBus.subscribeAndRegister(this::getAllConnections);
+ }
+
+ private String startConnectionRequest(Supplier extends NetworkingGameClientHandler> handlerFactory,
+ String ip,
+ int port) {
+ String connectionUuid = UUID.randomUUID().toString();
+ try {
+ NetworkingClient client = new NetworkingClient(
+ handlerFactory,
+ ip,
+ port);
+ this.networkClients.put(connectionUuid, client);
+ } catch (Exception e) {
+ logger.error(e);
+ }
+ return connectionUuid;
+ }
+
+ private void handleStartClientRequest(NetworkEvents.StartClientRequest request) {
+ request.future()
+ .complete(
+ this.startConnectionRequest(
+ request.handlerFactory(),
+ request.ip(),
+ request.port())); // TODO: Maybe post ConnectionEstablished event.
+ }
+
+ private void handleStartClient(NetworkEvents.StartClient event) {
+ GlobalEventBus.post(
+ new NetworkEvents.StartClientSuccess(
+ this.startConnectionRequest(
+ event.handlerFactory(),
+ event.ip(),
+ event.port()),
+ event.ip(),
+ event.port(),
+ event.eventId()
+ ));
+ }
+
+ private void handleCommand(
+ NetworkEvents.SendCommand
+ event) { // TODO: Move this to ServerConnection class, keep it internal.
+ NetworkingClient client = this.networkClients.get(event.connectionId());
+ logger.info("Preparing to send command: {} to server: {}", event.args(), client);
+ if (client != null) {
+ String args = String.join(" ", event.args()) + "\n";
+ client.writeAndFlush(args);
+ } else {
+ logger.warn("Server {} not found for command '{}'", event.connectionId(), event.args());
+ }
+ }
+
+ private void handleCloseClient(NetworkEvents.CloseClient event) {
+ NetworkingClient client = this.networkClients.get(event.connectionId());
+ client.closeConnection(); // TODO: Check if not blocking, what if error, mb not remove?
+ this.networkClients.remove(event.connectionId());
+ logger.info("Client {} closed successfully.", event.connectionId());
+ }
+
+// private void handleReconnect(Events.ServerEvents.Reconnect event) {
+// NetworkingClient client = this.networkClients.get(event.connectionId());
+// if (client != null) {
+// try {
+// client;
+// logger.info("Server {} reconnected", event.connectionId());
+// } catch (Exception e) {
+// logger.error("Server {} failed to reconnect", event.connectionId(), e);
+// GlobalEventBus.post(new Events.ServerEvents.CouldNotConnect(event.connectionId()));
+// }
+// }
+// } // TODO: Reconnect on disconnect
+
+ // private void handleChangeConnection(Events.ServerEvents.ChangeConnection event) {
+ // ServerConnection serverConnection = this.serverConnections.get(event.connectionId());
+ // if (serverConnection != null) {
+ // try {
+ // serverConnection.connect(event.ip(), event.port());
+ // logger.info("Server {} changed connection to {}:{}", event.connectionId(),
+ // event.ip(), event.port());
+ // } catch (Exception e) {
+ // logger.error("Server {} failed to change connection", event.connectionId(),
+ // e);
+ // GlobalEventBus.post(new
+ // Events.ServerEvents.CouldNotConnect(event.connectionId()));
+ // }
+ // }
+ // } TODO
+
+ private void getAllConnections(NetworkEvents.RequestsAllClients request) {
+ List a = new ArrayList<>(this.networkClients.values());
+ request.future().complete(a.toString());
+ }
+
+ public void shutdownAll(NetworkEvents.ForceCloseAllClients request) {
+ this.networkClients.values().forEach(NetworkingClient::closeConnection);
+ this.networkClients.clear();
+ logger.info("All servers shut down");
+ }
+}
diff --git a/src/main/java/org/toop/frontend/networking/NetworkingGameClientHandler.java b/src/main/java/org/toop/frontend/networking/NetworkingGameClientHandler.java
new file mode 100644
index 0000000..b7362fa
--- /dev/null
+++ b/src/main/java/org/toop/frontend/networking/NetworkingGameClientHandler.java
@@ -0,0 +1,28 @@
+package org.toop.frontend.networking;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.toop.Main;
+
+public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
+ private static final Logger logger = LogManager.getLogger(NetworkingGameClientHandler.class);
+
+ public NetworkingGameClientHandler() {}
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ logger.debug("Received message from server-{}, data: {}", ctx.channel().remoteAddress(), msg);
+
+ // TODO: Handle server messages
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ logger.error(cause.getMessage(), cause);
+ ctx.close();
+ }
+
+}
diff --git a/src/main/java/org/toop/frontend/ServerConnection.java b/src/main/java/org/toop/frontend/networking/ServerConnection.java
similarity index 96%
rename from src/main/java/org/toop/frontend/ServerConnection.java
rename to src/main/java/org/toop/frontend/networking/ServerConnection.java
index 8f37a5e..1f01075 100644
--- a/src/main/java/org/toop/frontend/ServerConnection.java
+++ b/src/main/java/org/toop/frontend/networking/ServerConnection.java
@@ -1,12 +1,13 @@
-package org.toop.frontend;
+package org.toop.frontend.networking;
import java.io.IOException;
import java.net.InetAddress;
import java.util.concurrent.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.toop.eventbus.Events;
+import org.toop.eventbus.events.Events;
import org.toop.eventbus.GlobalEventBus;
+import org.toop.eventbus.events.NetworkEvents;
public final class ServerConnection extends TcpClient implements Runnable {
@@ -85,7 +86,7 @@ public final class ServerConnection extends TcpClient implements Runnable {
logger.info("Connection: {} received: '{}'", this.uuid, received);
// this.addReceivedMessageToQueue(received); // TODO: Will never go empty
GlobalEventBus.post(
- new Events.ServerEvents.ReceivedMessage(
+ new NetworkEvents.ReceivedMessage(
this.uuid, received)); // TODO: mb change
} else {
break;
diff --git a/src/main/java/org/toop/frontend/TcpClient.java b/src/main/java/org/toop/frontend/networking/TcpClient.java
similarity index 97%
rename from src/main/java/org/toop/frontend/TcpClient.java
rename to src/main/java/org/toop/frontend/networking/TcpClient.java
index 5daa3bf..c9d62d0 100644
--- a/src/main/java/org/toop/frontend/TcpClient.java
+++ b/src/main/java/org/toop/frontend/networking/TcpClient.java
@@ -1,4 +1,4 @@
-package org.toop.frontend;
+package org.toop.frontend.networking;
import java.io.BufferedReader;
import java.io.IOException;
diff --git a/src/main/java/org/toop/frontend/networking/handlers/NetworkingTicTacToeClientHandler.java b/src/main/java/org/toop/frontend/networking/handlers/NetworkingTicTacToeClientHandler.java
new file mode 100644
index 0000000..e263072
--- /dev/null
+++ b/src/main/java/org/toop/frontend/networking/handlers/NetworkingTicTacToeClientHandler.java
@@ -0,0 +1,12 @@
+//package org.toop.frontend.networking.handlers;
+//
+//import io.netty.channel.ChannelHandlerContext;
+//import org.apache.logging.log4j.LogManager;
+//import org.apache.logging.log4j.Logger;
+//import org.toop.frontend.networking.NetworkingGameClientHandler;
+//
+//public class NetworkingTicTacToeClientHandler extends NetworkingGameClientHandler {
+// static final Logger logger = LogManager.getLogger(NetworkingTicTacToeClientHandler.class);
+//
+//
+//}
diff --git a/src/main/java/org/toop/frontend/platform/core/glfw/GlfwWindow.java b/src/main/java/org/toop/frontend/platform/core/glfw/GlfwWindow.java
index 05f96ef..b83ed52 100644
--- a/src/main/java/org/toop/frontend/platform/core/glfw/GlfwWindow.java
+++ b/src/main/java/org/toop/frontend/platform/core/glfw/GlfwWindow.java
@@ -4,6 +4,7 @@ import org.lwjgl.glfw.*;
import org.lwjgl.system.*;
import org.toop.core.*;
import org.toop.eventbus.*;
+import org.toop.eventbus.events.Events;
public class GlfwWindow extends Window {
private long window;
diff --git a/src/main/java/org/toop/frontend/platform/graphics/opengl/OpenglRenderer.java b/src/main/java/org/toop/frontend/platform/graphics/opengl/OpenglRenderer.java
index 0e587c2..911ec64 100644
--- a/src/main/java/org/toop/frontend/platform/graphics/opengl/OpenglRenderer.java
+++ b/src/main/java/org/toop/frontend/platform/graphics/opengl/OpenglRenderer.java
@@ -3,6 +3,7 @@ package org.toop.frontend.platform.graphics.opengl;
import org.lwjgl.opengl.*;
import org.lwjgl.system.*;
import org.toop.eventbus.*;
+import org.toop.eventbus.events.Events;
import org.toop.frontend.graphics.Renderer;
import org.toop.frontend.graphics.Shader;
diff --git a/src/test/java/org/toop/eventbus/EventPublisherSpeedTest.java b/src/test/java/org/toop/eventbus/EventPublisherSpeedTest.java
new file mode 100644
index 0000000..85d2691
--- /dev/null
+++ b/src/test/java/org/toop/eventbus/EventPublisherSpeedTest.java
@@ -0,0 +1,88 @@
+package org.toop.eventbus;
+
+import org.junit.jupiter.api.Test;
+import org.toop.eventbus.events.EventWithUuid;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class EventPublisherPerformanceTest {
+
+ public record PerfEvent(String name, String eventId) implements EventWithUuid {
+ @Override
+ public java.util.Map result() {
+ return java.util.Map.of("name", name, "eventId", eventId);
+ }
+ }
+
+ @Test
+ void testEventCreationSpeed() {
+ int iterations = 10_000;
+ long start = System.nanoTime();
+
+ for (int i = 0; i < iterations; i++) {
+ new EventPublisher<>(PerfEvent.class, "event-" + i);
+ }
+
+ long end = System.nanoTime();
+ long durationMs = (end - start) / 1_000_000;
+
+ System.out.println("Created " + iterations + " events in " + durationMs + " ms");
+ assertTrue(durationMs < 500, "Event creation too slow");
+ }
+
+ @Test
+ void testEventPostSpeed() {
+ int iterations = 10_000;
+ AtomicInteger counter = new AtomicInteger(0);
+
+ GlobalEventBus.subscribeAndRegister(PerfEvent.class, e -> counter.incrementAndGet());
+
+ long start = System.nanoTime();
+
+ for (int i = 0; i < iterations; i++) {
+ new EventPublisher<>(PerfEvent.class, "event-" + i).postEvent();
+ }
+
+ long end = System.nanoTime();
+ long durationMs = (end - start) / 1_000_000;
+
+ System.out.println("Posted " + iterations + " events in " + durationMs + " ms");
+ assertTrue(counter.get() == iterations, "Not all events were received");
+ assertTrue(durationMs < 1000, "Posting events too slow");
+ }
+
+ @Test
+ void testConcurrentEventPostSpeed() throws InterruptedException {
+ int threads = 20;
+ int eventsPerThread = 5_000;
+ AtomicInteger counter = new AtomicInteger(0);
+
+ GlobalEventBus.subscribeAndRegister(PerfEvent.class, e -> counter.incrementAndGet());
+
+ Thread[] workers = new Thread[threads];
+
+ long start = System.nanoTime();
+
+ for (int t = 0; t < threads; t++) {
+ workers[t] = new Thread(() -> {
+ for (int i = 0; i < eventsPerThread; i++) {
+ new EventPublisher<>(PerfEvent.class, "event-" + i).postEvent();
+ }
+ });
+ workers[t].start();
+ }
+
+ for (Thread worker : workers) {
+ worker.join();
+ }
+
+ long end = System.nanoTime();
+ long durationMs = (end - start) / 1_000_000;
+
+ System.out.println("Posted " + (threads * eventsPerThread) + " events concurrently in " + durationMs + " ms");
+ assertTrue(counter.get() == threads * eventsPerThread, "Some events were lost");
+ assertTrue(durationMs < 5000, "Concurrent posting too slow");
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/toop/eventbus/EventPublisherStressTest.java b/src/test/java/org/toop/eventbus/EventPublisherStressTest.java
new file mode 100644
index 0000000..b1bb392
--- /dev/null
+++ b/src/test/java/org/toop/eventbus/EventPublisherStressTest.java
@@ -0,0 +1,160 @@
+package org.toop.eventbus;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.toop.eventbus.events.EventWithUuid;
+
+import java.math.BigInteger;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class EventPublisherStressTest {
+
+ public record HeavyEvent(String payload, String eventId) implements EventWithUuid {
+ @Override
+ public java.util.Map result() {
+ return java.util.Map.of("payload", payload, "eventId", eventId);
+ }
+ }
+
+ private static final int THREADS = 100;
+ private static final long EVENTS_PER_THREAD = 2_000_000;
+
+ @Tag("stress")
+ @Test
+ void extremeConcurrencyTest_progressWithMemory() throws InterruptedException {
+ AtomicLong counter = new AtomicLong(0); // Big numbers safety
+ ExecutorService executor = Executors.newFixedThreadPool(THREADS);
+
+ GlobalEventBus.subscribeAndRegister(HeavyEvent.class, _ -> counter.incrementAndGet());
+
+ BigInteger totalEvents = BigInteger.valueOf(THREADS)
+ .multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
+
+ long startTime = System.currentTimeMillis();
+
+ // Monitor thread for EPS and memory
+ Thread monitor = new Thread(() -> {
+ long lastCount = 0;
+ long lastTime = startTime;
+
+ Runtime runtime = Runtime.getRuntime();
+
+ while (counter.get() < totalEvents.longValue()) {
+ try { Thread.sleep(1000); } catch (InterruptedException ignored) {}
+
+ long now = System.currentTimeMillis();
+ long completed = counter.get();
+ long eventsThisSecond = completed - lastCount;
+ double eps = eventsThisSecond / ((now - lastTime) / 1000.0);
+
+ // Memory usage
+ long usedMemory = runtime.totalMemory() - runtime.freeMemory();
+ long maxMemory = runtime.maxMemory();
+ double usedPercent = usedMemory * 100.0 / maxMemory;
+
+ System.out.printf(
+ "Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)\n",
+ completed,
+ totalEvents.longValue(),
+ completed * 100.0 / totalEvents.doubleValue(),
+ eps,
+ usedMemory / 1024.0 / 1024.0,
+ usedPercent
+ );
+
+ lastCount = completed;
+ lastTime = now;
+ }
+ });
+ monitor.setDaemon(true);
+ monitor.start();
+
+ // Submit events
+ for (int t = 0; t < THREADS; t++) {
+ executor.submit(() -> {
+ for (int i = 0; i < EVENTS_PER_THREAD; i++) {
+ new EventPublisher<>(HeavyEvent.class, "payload-" + i).postEvent();
+ }
+ });
+ }
+
+ executor.shutdown();
+ executor.awaitTermination(20, TimeUnit.MINUTES); // allow extra time for huge tests
+
+ long endTime = System.currentTimeMillis();
+ double durationSeconds = (endTime - startTime) / 1000.0;
+
+ System.out.println("Posted " + totalEvents + " events in " + durationSeconds + " seconds");
+ double averageEps = totalEvents.doubleValue() / durationSeconds;
+ System.out.printf("Average EPS: %.0f%n", averageEps);
+
+ assertEquals(totalEvents.longValue(), counter.get());
+ }
+
+ @Tag("stress")
+ @Test
+ void efficientExtremeConcurrencyTest() throws InterruptedException {
+ final int THREADS = Runtime.getRuntime().availableProcessors(); // threads ≈ CPU cores
+ final int EVENTS_PER_THREAD = 5000;
+
+ ExecutorService executor = Executors.newFixedThreadPool(THREADS);
+ ConcurrentLinkedQueue processedEvents = new ConcurrentLinkedQueue<>();
+
+ long start = System.nanoTime();
+
+ for (int t = 0; t < THREADS; t++) {
+ executor.submit(() -> {
+ for (int i = 0; i < EVENTS_PER_THREAD; i++) {
+ new EventPublisher<>(HeavyEvent.class, "payload-" + i)
+ .onEventById(HeavyEvent.class, processedEvents::add)
+ .postEvent();
+ }
+ });
+ }
+
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.MINUTES);
+
+ long end = System.nanoTime();
+ double durationSeconds = (end - start) / 1_000_000_000.0;
+
+ BigInteger totalEvents = BigInteger.valueOf((long) THREADS).multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
+ double eps = totalEvents.doubleValue() / durationSeconds;
+
+ System.out.printf("Posted %s events in %.3f seconds%n", totalEvents, durationSeconds);
+ System.out.printf("Throughput: %.0f events/sec%n", eps);
+
+ // Memory snapshot
+ Runtime rt = Runtime.getRuntime();
+ System.out.printf("Used memory: %.2f MB%n", (rt.totalMemory() - rt.freeMemory()) / 1024.0 / 1024.0);
+
+ // Ensure all events were processed
+ assertEquals(totalEvents.intValue(), processedEvents.size());
+ }
+
+ @Tag("stress")
+ @Test
+ void constructorCacheVsReflection() throws Throwable {
+ int iterations = 1_000_000;
+ long startReflect = System.nanoTime();
+ for (int i = 0; i < iterations; i++) {
+ // Reflection every time
+ HeavyEvent.class.getDeclaredConstructors()[0].newInstance("payload", "uuid-" + i);
+ }
+ long endReflect = System.nanoTime();
+
+ long startHandle = System.nanoTime();
+ for (int i = 0; i < iterations; i++) {
+ // Using cached MethodHandle
+ EventPublisher ep = new EventPublisher<>(HeavyEvent.class, "payload-" + i);
+ }
+ long endHandle = System.nanoTime();
+
+ System.out.println("Reflection: " + (endReflect - startReflect) / 1_000_000 + " ms");
+ System.out.println("MethodHandle Cache: " + (endHandle - startHandle) / 1_000_000 + " ms");
+ }
+}
diff --git a/src/test/java/org/toop/eventbus/EventPublisherTest.java b/src/test/java/org/toop/eventbus/EventPublisherTest.java
new file mode 100644
index 0000000..cfba3f8
--- /dev/null
+++ b/src/test/java/org/toop/eventbus/EventPublisherTest.java
@@ -0,0 +1,122 @@
+package org.toop.eventbus;
+
+import com.google.common.eventbus.EventBus;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.toop.eventbus.events.EventWithUuid;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class EventPublisherTest {
+
+ // Simple test event implementing EventWithUuid
+ public record TestEvent(String name, String eventId) implements EventWithUuid {
+ @Override
+ public Map result() {
+ return Map.of("name", name, "eventId", eventId);
+ }
+ }
+
+ public record TestResponseEvent(String msg, String eventId) implements EventWithUuid {
+ @Override
+ public Map result() {
+ return Map.of("msg", msg, "eventId", eventId);
+ }
+ }
+
+ @Test
+ void testEventPublisherGeneratesUuid() {
+ EventPublisher publisher = new EventPublisher<>(TestEvent.class, "myTest");
+ assertNotNull(publisher.getEventId());
+ assertEquals(publisher.getEventId(), publisher.getEvent().eventId());
+ }
+
+ @Test
+ void testPostEvent() {
+ AtomicBoolean triggered = new AtomicBoolean(false);
+
+ EventPublisher publisher = new EventPublisher<>(TestEvent.class, "myTest");
+ publisher.onEventById(TestEvent.class, event -> triggered.set(true))
+ .postEvent();
+
+ assertTrue(triggered.get(), "Subscriber should have been triggered by postEvent");
+ }
+
+ @Test
+ void testOnEventByIdMatchesUuid() {
+ AtomicBoolean triggered = new AtomicBoolean(false);
+
+ EventPublisher publisher1 = new EventPublisher<>(TestEvent.class, "event1");
+ EventPublisher publisher2 = new EventPublisher<>(TestEvent.class, "event2");
+
+ publisher1.onEventById(TestEvent.class, event -> triggered.set(true));
+ publisher2.postEvent();
+
+ // Only publisher1's subscriber should trigger for its UUID
+ assertFalse(triggered.get(), "Subscriber should not trigger for a different UUID");
+
+ publisher1.postEvent();
+ assertTrue(triggered.get(), "Subscriber should trigger for matching UUID");
+ }
+
+ @Test
+ void testUnregisterAfterSuccess() {
+ AtomicBoolean triggered = new AtomicBoolean(false);
+ AtomicReference