Networking moved to netty. Added a EventPublisher class for easy building of events.

This commit is contained in:
lieght
2025-09-22 04:04:52 +02:00
parent efd485852c
commit 3fa0bae46a
28 changed files with 1429 additions and 483 deletions

View File

@@ -2,6 +2,8 @@
<dictionary name="project"> <dictionary name="project">
<words> <words>
<w>aosp</w> <w>aosp</w>
<w>gamelist</w>
<w>playerlist</w>
<w>vmoptions</w> <w>vmoptions</w>
</words> </words>
</dictionary> </dictionary>

5
.idea/misc.xml generated
View File

@@ -1,5 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="EntryPointsManager">
<list size="1">
<item index="0" class="java.lang.String" itemvalue="com.google.common.eventbus.Subscribe" />
</list>
</component>
<component name="ExternalStorageConfigurationManager" enabled="true" /> <component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager"> <component name="MavenProjectsManager">
<option name="originalFiles"> <option name="originalFiles">

15
pom.xml
View File

@@ -54,13 +54,13 @@
<dependency> <dependency>
<groupId>org.junit.jupiter</groupId> <groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId> <artifactId>junit-jupiter-api</artifactId>
<version>6.0.0-RC3</version> <version>5.13.4</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.junit.jupiter</groupId> <groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId> <artifactId>junit-jupiter-engine</artifactId>
<version>6.0.0-RC3</version> <version>5.13.4</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-surefire-plugin --> <!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-surefire-plugin -->
@@ -105,6 +105,14 @@
<artifactId>spotless-maven-plugin</artifactId> <artifactId>spotless-maven-plugin</artifactId>
<version>2.46.1</version> <version>2.46.1</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.2.6.Final</version>
</dependency>
<dependency><groupId>org.lwjgl</groupId><artifactId>lwjgl</artifactId></dependency> <dependency><groupId>org.lwjgl</groupId><artifactId>lwjgl</artifactId></dependency>
<dependency><groupId>org.lwjgl</groupId><artifactId>lwjgl-glfw</artifactId></dependency> <dependency><groupId>org.lwjgl</groupId><artifactId>lwjgl-glfw</artifactId></dependency>
<dependency><groupId>org.lwjgl</groupId><artifactId>lwjgl-opengl</artifactId></dependency> <dependency><groupId>org.lwjgl</groupId><artifactId>lwjgl-opengl</artifactId></dependency>
@@ -119,6 +127,9 @@
<plugin> <plugin>
<artifactId>maven-surefire-plugin</artifactId> <artifactId>maven-surefire-plugin</artifactId>
<version>3.5.4</version> <version>3.5.4</version>
<configuration>
<excludedGroups>stress</excludedGroups>
</configuration>
</plugin> </plugin>
<plugin> <plugin>
<artifactId>maven-failsafe-plugin</artifactId> <artifactId>maven-failsafe-plugin</artifactId>

View File

@@ -1,214 +1,214 @@
package org.toop; //package org.toop;
//
import java.util.*; //import java.util.*;
import java.util.concurrent.CompletableFuture; //import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; //import java.util.concurrent.ExecutionException;
import org.apache.logging.log4j.LogManager; //import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; //import org.apache.logging.log4j.Logger;
import org.toop.eventbus.Events; //import org.toop.eventbus.events.Events;
import org.toop.eventbus.GlobalEventBus; //import org.toop.eventbus.GlobalEventBus;
import org.toop.game.tictactoe.*; //import org.toop.game.tictactoe.*;
import org.toop.game.tictactoe.ai.MinMaxTicTacToe; //import org.toop.game.tictactoe.ai.MinMaxTicTacToe;
//
public class ConsoleGui { //public class ConsoleGui {
//
private static final Logger logger = LogManager.getLogger(ConsoleGui.class); // private static final Logger logger = LogManager.getLogger(ConsoleGui.class);
//
private Scanner scanner; // private Scanner scanner;
//
private TicTacToe game; // private TicTacToe game;
private MinMaxTicTacToe ai; // private MinMaxTicTacToe ai;
//
private String ai1 = null; // private String ai1 = null;
private String ai2 = null; // private String ai2 = null;
//
private String serverId = null; // private String serverId = null;
private String connectionId = null; // private String connectionId = null;
private String ticTacToeGameId = null; // private String ticTacToeGameId = null;
//
public ConsoleGui() throws ExecutionException, InterruptedException { // public ConsoleGui() throws ExecutionException, InterruptedException {
scanner = new Scanner(System.in); // scanner = new Scanner(System.in);
Random random = new Random(3453498); // Random random = new Random(3453498);
//
int mode = -1; // int mode = -1;
//
System.out.print( // System.out.print(
""" // """
1. player vs player // 1. player vs player
2. player vs ai // 2. player vs ai
3. ai vs player // 3. ai vs player
4. ai v ai // 4. ai v ai
Choose mode (default is 1):\s\ // Choose mode (default is 1):\s\
"""); // """);
String modeString = scanner.nextLine(); // String modeString = scanner.nextLine();
//
try { // try {
mode = Integer.parseInt(modeString); // mode = Integer.parseInt(modeString);
} catch (Exception e) { // } catch (Exception e) {
logger.error(e.getMessage()); // logger.error(e.getMessage());
} // }
//
String player1 = null; // String player1 = null;
String player2 = null; // String player2 = null;
//
switch (mode) { // switch (mode) {
// player vs ai // // player vs ai
case 2: // case 2:
{ // {
System.out.print("Please enter your name: "); // System.out.print("Please enter your name: ");
String name = scanner.nextLine(); // String name = scanner.nextLine();
//
player1 = name; // player1 = name;
ai2 = player2 = "AI#" + random.nextInt(); // ai2 = player2 = "AI#" + random.nextInt();
//
break; // break;
} // }
//
// ai vs player // // ai vs player
case 3: // case 3:
{ // {
System.out.print("Enter your name: "); // System.out.print("Enter your name: ");
String name = scanner.nextLine(); // String name = scanner.nextLine();
//
ai1 = player1 = "AI#" + random.nextInt(); // ai1 = player1 = "AI#" + random.nextInt();
player2 = name; // player2 = name;
//
break; // break;
} // }
//
// ai vs ai // // ai vs ai
case 4: // case 4:
{ // {
ai1 = player1 = "AI#" + random.nextInt(); // ai1 = player1 = "AI#" + random.nextInt();
ai2 = player2 = "AI2" + random.nextInt(); // ai2 = player2 = "AI2" + random.nextInt();
//
break; // break;
} // }
//
// player vs player // // player vs player
case 1: // case 1:
default: // default:
{ // {
System.out.print("Player 1. Please enter your name: "); // System.out.print("Player 1. Please enter your name: ");
String name1 = scanner.nextLine(); // String name1 = scanner.nextLine();
//
System.out.print("Player 2. Please enter your name: "); // System.out.print("Player 2. Please enter your name: ");
String name2 = scanner.nextLine(); // String name2 = scanner.nextLine();
//
player1 = name1; // player1 = name1;
player2 = name2; // player2 = name2;
} // }
} // }
//
game = new TicTacToe(player1, player2); // game = new TicTacToe(player1, player2);
ai = new MinMaxTicTacToe(); // ai = new MinMaxTicTacToe();
//
CompletableFuture<String> serverIdFuture = new CompletableFuture<>(); // CompletableFuture<String> serverIdFuture = new CompletableFuture<>();
GlobalEventBus.post( // GlobalEventBus.post(
new Events.ServerEvents.StartServerRequest("5001", "tictactoe", serverIdFuture)); // new Events.ServerEvents.StartServerRequest("5001", "tictactoe", serverIdFuture));
serverId = serverIdFuture.get(); // serverId = serverIdFuture.get();
//
CompletableFuture<String> connectionIdFuture = new CompletableFuture<>(); // CompletableFuture<String> connectionIdFuture = new CompletableFuture<>();
GlobalEventBus.post( // GlobalEventBus.post(
new Events.ServerEvents.StartConnectionRequest( // new Events.ServerEvents.StartConnectionRequest(
"127.0.0.1", "5001", connectionIdFuture)); // "127.0.0.1", "5001", connectionIdFuture));
connectionId = connectionIdFuture.get(); // connectionId = connectionIdFuture.get();
//
CompletableFuture<String> ticTacToeGame = new CompletableFuture<>(); // CompletableFuture<String> ticTacToeGame = new CompletableFuture<>();
GlobalEventBus.post( // GlobalEventBus.post(
new Events.ServerEvents.CreateTicTacToeGameRequest( // new Events.ServerEvents.CreateTicTacToeGameRequest(
serverId, player1, player2, ticTacToeGame)); // serverId, player1, player2, ticTacToeGame));
ticTacToeGameId = ticTacToeGame.get(); // ticTacToeGameId = ticTacToeGame.get();
GlobalEventBus.post(new Events.ServerEvents.RunTicTacToeGame(serverId, ticTacToeGameId)); // GlobalEventBus.post(new Events.ServerEvents.RunTicTacToeGame(serverId, ticTacToeGameId));
} // }
//
public void print() { // public void print() {
char[] seperator = new char[game.getSize() * 4 - 1]; // char[] seperator = new char[game.getSize() * 4 - 1];
Arrays.fill(seperator, '-'); // Arrays.fill(seperator, '-');
//
for (int i = 0; i < game.getSize(); i++) { // for (int i = 0; i < game.getSize(); i++) {
String buffer = " "; // String buffer = " ";
//
for (int j = 0; j < game.getSize() - 1; j++) { // for (int j = 0; j < game.getSize() - 1; j++) {
buffer += game.getGrid()[i * game.getSize() + j] + " | "; // buffer += game.getGrid()[i * game.getSize() + j] + " | ";
} // }
//
buffer += game.getGrid()[i * game.getSize() + game.getSize() - 1]; // buffer += game.getGrid()[i * game.getSize() + game.getSize() - 1];
System.out.println(buffer); // System.out.println(buffer);
//
if (i < game.getSize() - 1) { // if (i < game.getSize() - 1) {
System.out.println(seperator); // System.out.println(seperator);
} // }
} // }
} // }
//
public boolean next() { // public boolean next() {
Player current = game.getCurrentPlayer(); // Player current = game.getCurrentPlayer();
int move = -1; // int move = -1;
//
if (ai1 != null && current.getName() == ai1 || ai2 != null && current.getName() == ai2) { // if (ai1 != null && current.getName() == ai1 || ai2 != null && current.getName() == ai2) {
move = ai.findBestMove(game); // move = ai.findBestMove(game);
} else { // } else {
System.out.printf( // System.out.printf(
"%s's (%c) turn. Please choose an empty cell between 0-8: ", // "%s's (%c) turn. Please choose an empty cell between 0-8: ",
current.getName(), current.getSymbol()); // current.getName(), current.getSymbol());
String input = scanner.nextLine(); // String input = scanner.nextLine();
//
try { // try {
move = Integer.parseInt(input); // move = Integer.parseInt(input);
} catch (NumberFormatException e) { // } catch (NumberFormatException e) {
} // }
} // }
//
GameBase.State state = game.play(move); // GameBase.State state = game.play(move);
boolean keepRunning = true; // boolean keepRunning = true;
//
switch (state) { // switch (state) {
case INVALID: // case INVALID:
{ // {
System.out.println("Please select an empty cell. Between 0-8"); // System.out.println("Please select an empty cell. Between 0-8");
return true; // return true;
} // }
//
case DRAW: // case DRAW:
{ // {
System.out.println("Game ended in a draw."); // System.out.println("Game ended in a draw.");
keepRunning = false; // keepRunning = false;
break; // break;
} // }
//
case WIN: // case WIN:
{ // {
System.out.printf("%s has won the game.\n", current.getName()); // System.out.printf("%s has won the game.\n", current.getName());
keepRunning = false; // keepRunning = false;
break; // break;
} // }
//
case NORMAL: // case NORMAL:
default: // default:
{ // {
keepRunning = true; // keepRunning = true;
break; // break;
} // }
} // }
//
GlobalEventBus.post( // GlobalEventBus.post(
new Events.ServerEvents.SendCommand( // new Events.ServerEvents.SendCommand(
connectionId, // connectionId,
"gameid " + ticTacToeGameId, // "gameid " + ticTacToeGameId,
"player " + current.getName(), // "player " + current.getName(),
"MOVE", // "MOVE",
String.valueOf(move))); // String.valueOf(move)));
//
if (!keepRunning) { // if (!keepRunning) {
GlobalEventBus.post( // GlobalEventBus.post(
new Events.ServerEvents.EndTicTacToeGame(serverId, ticTacToeGameId)); // new Events.ServerEvents.EndTicTacToeGame(serverId, ticTacToeGameId));
} // }
//
return keepRunning; // return keepRunning;
} // }
//
public GameBase getGame() { // public GameBase getGame() {
return game; // return game;
} // }
} //}

View File

@@ -1,13 +1,22 @@
package org.toop; package org.toop;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.google.common.base.Supplier;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.toop.backend.ServerManager; import org.toop.backend.ServerManager;
import org.toop.eventbus.Events; import org.toop.eventbus.EventPublisher;
import org.toop.eventbus.EventRegistry;
import org.toop.eventbus.events.Events;
import org.toop.eventbus.GlobalEventBus; import org.toop.eventbus.GlobalEventBus;
import org.toop.frontend.ConnectionManager; import org.toop.eventbus.events.NetworkEvents;
import org.toop.frontend.UI.LocalServerSelector; import org.toop.frontend.UI.LocalServerSelector;
import org.toop.frontend.networking.NetworkingClientManager;
import org.toop.frontend.networking.NetworkingGameClientHandler;
public class Main { public class Main {
private static final Logger logger = LogManager.getLogger(Main.class); private static final Logger logger = LogManager.getLogger(Main.class);
@@ -15,14 +24,55 @@ public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException { public static void main(String[] args) throws ExecutionException, InterruptedException {
// Logging.disableAllLogs(); // Logging.disableAllLogs();
// Logging.enableAllLogsForClass(LocalTicTacToe.class); Logging.enableAllLogsForClass(EventRegistry.class);
// Logging.enableLogsForClass(ServerManager.class, Level.ALL); // Logging.enableLogsForClass(ServerManager.class, Level.ALL);
// Logging.enableLogsForClass(TicTacToeServer.class, Level.ALL); // Logging.enableLogsForClass(TicTacToeServer.class, Level.ALL);
// Logging.enableLogsForClass(TcpClient.class, Level.ALL); // Logging.enableLogsForClass(TcpClient.class, Level.ALL);
// Logging.enableLogsForClass(ConnectionManager.class, Level.ALL); // Logging.enableLogsForClass(NetworkingClientManager.class, Level.ALL);
initSystems(); initSystems();
registerEvents(); registerEvents();
CompletableFuture<String> serverIdFuture = new CompletableFuture<>();
GlobalEventBus.post(
new Events.ServerEvents.StartServerRequest(5001, "tictactoe", serverIdFuture));
var serverId = serverIdFuture.get();
// CompletableFuture<String> conIdFuture = new CompletableFuture<>();
// GlobalEventBus.post(
// new NetworkEvents.StartClientRequest(NetworkingGameClientHandler::new,
// "127.0.0.1", 5001, conIdFuture));
// var conId = conIdFuture.get();
int numThreads = 100; // how many EventPublisher tests you want
ExecutorService executor = Executors.newFixedThreadPool(200); // 20 threads in pool
for (int i = 0; i < numThreads; i++) {
executor.submit(() -> {
new EventPublisher<>(
NetworkEvents.StartClient.class,
(Supplier<NetworkingGameClientHandler>) NetworkingGameClientHandler::new,
"127.0.0.1",
5001
).onEventById(
NetworkEvents.StartClientSuccess.class,
event -> GlobalEventBus.post(
new NetworkEvents.CloseClient((String) event.connectionId()))
).unregisterAfterSuccess()
.postEvent();
});
}
// Shutdown after tasks complete
executor.shutdown();
// GlobalEventBus.post(new NetworkEvents.SendCommand(conId, "move", "5"));
// GlobalEventBus.post(new NetworkEvents.ForceCloseAllClients());
// GlobalEventBus.post(new NetworkEvents.StartClient(
// NetworkingGameClientHandler::new, "127.0.0.1", 5001, serverId
// ));
// JFrame frame = new JFrame("Server Settings"); // JFrame frame = new JFrame("Server Settings");
// frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); // frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
// frame.setSize(800, 600); // frame.setSize(800, 600);
@@ -49,7 +99,7 @@ public class Main {
public static void initSystems() { public static void initSystems() {
new ServerManager(); new ServerManager();
new ConnectionManager(); new NetworkingClientManager();
} }
private static void quit() { private static void quit() {

View File

@@ -8,7 +8,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.toop.backend.tictactoe.TicTacToeServer; import org.toop.backend.tictactoe.TicTacToeServer;
import org.toop.eventbus.Events; import org.toop.eventbus.events.Events;
import org.toop.eventbus.GlobalEventBus; import org.toop.eventbus.GlobalEventBus;
// TODO more methods. // TODO more methods.
@@ -37,13 +37,13 @@ public class ServerManager {
Events.ServerEvents.EndTicTacToeGame.class, this::handleEndTicTacToeGameOnAServer); Events.ServerEvents.EndTicTacToeGame.class, this::handleEndTicTacToeGameOnAServer);
} }
private String startServer(String port, String gameType) { private String startServer(int port, String gameType) {
String serverId = UUID.randomUUID().toString(); String serverId = UUID.randomUUID().toString();
gameType = gameType.toLowerCase(); gameType = gameType.toLowerCase();
try { try {
TcpServer server = null; TcpServer server = null;
if (Objects.equals(gameType, "tictactoe")) { if (Objects.equals(gameType, "tictactoe")) {
server = new TicTacToeServer(Integer.parseInt(port)); server = new TicTacToeServer(port);
} else { } else {
logger.error("Manager could not create a server for game type: {}", gameType); logger.error("Manager could not create a server for game type: {}", gameType);
return null; return null;

View File

@@ -0,0 +1,192 @@
package org.toop.eventbus;
import com.google.common.eventbus.EventBus;
import org.toop.eventbus.events.EventWithUuid;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
* EventPublisher is a helper class for creating, posting, and optionally subscribing to events
* in a type-safe and chainable manner. It automatically injects a unique UUID into the event
* and supports filtering subscribers by this UUID.
*
* <p>Usage pattern (with chainable API):
* <pre>{@code
* new EventPublisher<>(StartClient.class, handlerFactory, "127.0.0.1", 5001)
* .onEventById(ClientReady.class, clientReadyEvent -> logger.info(clientReadyEvent))
* .unregisterAfterSuccess()
* .postEvent();
* }</pre>
*
* @param <T> the type of event to publish, must extend EventWithUuid
*/
public class EventPublisher<T extends EventWithUuid> {
private static final MethodHandles.Lookup LOOKUP = MethodHandles.lookup();
private static final Map<Class<?>, MethodHandle> CONSTRUCTOR_CACHE = new ConcurrentHashMap<>();
/** The UUID automatically assigned to this event */
private final String eventId;
/** The event instance created by this publisher */
private final T event;
/** The listener object returned by the global event bus subscription */
private Object listener;
/** Flag indicating whether to unregister the listener after it is successfully triggered */
private boolean unregisterAfterSuccess = false;
/** Results that came back from the subscribed event */
private Map<String, Object> result = null;
/**
* Constructs a new EventPublisher by instantiating the given event class.
* A unique UUID is automatically generated and passed as the last constructor argument.
*
* @param postEventClass the class of the event to instantiate
* @param args constructor arguments for the event, excluding the UUID
* @throws RuntimeException if instantiation fails
*/
public EventPublisher(Class<T> postEventClass, Object... args) {
this.eventId = UUID.randomUUID().toString();
try {
MethodHandle ctorHandle = CONSTRUCTOR_CACHE.computeIfAbsent(postEventClass, cls -> {
try {
// Build signature dynamically (arg types + String for UUID)
Class<?>[] paramTypes = cls.getDeclaredConstructors()[0].getParameterTypes();
MethodType mt = MethodType.methodType(void.class, paramTypes);
return LOOKUP.findConstructor(cls, mt);
} catch (Exception e) {
throw new RuntimeException("Failed to find constructor handle for " + cls, e);
}
});
// Append UUID to args
Object[] finalArgs = new Object[args.length + 1];
System.arraycopy(args, 0, finalArgs, 0, args.length);
finalArgs[args.length] = this.eventId;
// --------------------
@SuppressWarnings("unchecked")
T instance = (T) ctorHandle.invokeWithArguments(finalArgs);
this.event = instance;
} catch (Throwable e) {
throw new RuntimeException("Failed to instantiate event", e);
}
}
public EventPublisher(EventBus eventbus, Class<T> postEventClass, Object... args) {
this.eventId = UUID.randomUUID().toString();
try {
MethodHandle ctorHandle = CONSTRUCTOR_CACHE.computeIfAbsent(postEventClass, cls -> {
try {
// Build signature dynamically (arg types + String for UUID)
Class<?>[] paramTypes = cls.getDeclaredConstructors()[0].getParameterTypes();
MethodType mt = MethodType.methodType(void.class, paramTypes);
return LOOKUP.findConstructor(cls, mt);
} catch (Exception e) {
throw new RuntimeException("Failed to find constructor handle for " + cls, e);
}
});
// Append UUID to args
Object[] finalArgs = new Object[args.length + 1];
System.arraycopy(args, 0, finalArgs, 0, args.length);
finalArgs[args.length] = this.eventId;
// --------------------
@SuppressWarnings("unchecked")
T instance = (T) ctorHandle.invokeWithArguments(finalArgs);
this.event = instance;
} catch (Throwable e) {
throw new RuntimeException("Failed to instantiate event", e);
}
}
/**
* Subscribes a listener for a specific event type, but only triggers the listener
* if the incoming event's UUID matches this EventPublisher's UUID.
*
* @param eventClass the class of the event to subscribe to
* @param action the action to execute when a matching event is received
* @param <TT> the type of the event to subscribe to; must extend EventWithUuid
* @return this EventPublisher instance, for chainable calls
*/
public <TT extends EventWithUuid> EventPublisher<T> onEventById(
Class<TT> eventClass, Consumer<TT> action) {
this.listener = GlobalEventBus.subscribeAndRegister(eventClass, event -> {
if (event.eventId().equals(this.eventId)) {
action.accept(event);
if (unregisterAfterSuccess && listener != null) {
GlobalEventBus.unregister(listener);
}
this.result = event.result();
}
});
return this;
}
/**
* Posts the event to the global event bus. This should generally be the
* final call in the chain.
*
* @return this EventPublisher instance, for potential chaining
*/
public EventPublisher<T> postEvent() {
GlobalEventBus.post(event);
return this;
}
/**
* Configures the publisher so that any listener registered with
* {@link #onEventById(Class, Consumer)} is automatically unregistered
* after it is successfully triggered.
*
* @return this EventPublisher instance, for chainable calls
*/
public EventPublisher<T> unregisterAfterSuccess() {
this.unregisterAfterSuccess = true;
return this;
}
public Map<String, Object> getResult() {
if (this.result != null) {
return this.result;
}
return null;
// TODO: Why check for null if return is null anyway?
}
/**
* Returns the event instance created by this publisher.
*
* @return the event instance
*/
public T getEvent() {
return event;
}
/**
* Returns the UUID automatically assigned to this event.
*
* @return the UUID of the event
*/
public String getEventId() {
return eventId;
}
}

View File

@@ -2,6 +2,7 @@ package org.toop.eventbus;
import com.google.common.eventbus.EventBus; import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import java.util.function.Consumer; import java.util.function.Consumer;
/** A singleton Event Bus to be used for creating, triggering and activating events. */ /** A singleton Event Bus to be used for creating, triggering and activating events. */
@@ -53,6 +54,18 @@ public class GlobalEventBus {
}; };
} }
@SuppressWarnings("unchecked")
public static <T> Object subscribe(Consumer<T> action) {
return new Object() {
@Subscribe
public void handle(Object event) {
try {
action.accept((T) event); // unchecked cast
} catch (ClassCastException ignored) {}
}
};
}
/** /**
* Wraps a Consumer into a Guava @Subscribe-compatible listener and registers it. * Wraps a Consumer into a Guava @Subscribe-compatible listener and registers it.
* *
@@ -66,10 +79,16 @@ public class GlobalEventBus {
return listener; return listener;
} }
public static <T> Object subscribeAndRegister(Consumer<T> action) {
var listener = subscribe(action);
register(listener);
return listener;
}
/** /**
* Wrapper for registering a listener. * Wrapper for registering a listener.
* *
* @param event The ready event to add to register. * @param listener The listener to register.
*/ */
public static void register(Object listener) { public static void register(Object listener) {
GlobalEventBus.get().register(listener); GlobalEventBus.get().register(listener);
@@ -78,7 +97,7 @@ public class GlobalEventBus {
/** /**
* Wrapper for unregistering a listener. * Wrapper for unregistering a listener.
* *
* @param event The ready event to unregister. * @param listener The listener to unregister.
*/ */
public static void unregister(Object listener) { public static void unregister(Object listener) {
GlobalEventBus.get().unregister(listener); GlobalEventBus.get().unregister(listener);
@@ -90,18 +109,6 @@ public class GlobalEventBus {
* @param event The event to post. * @param event The event to post.
*/ */
public static <T> void post(T event) { public static <T> void post(T event) {
Class<T> type = (Class<T>) event.getClass();
// if (!EventRegistry.isReady(type)) {
// throw new IllegalStateException("Event type not ready: " +
// type.getSimpleName());
// } TODO: Handling non ready events.
// store in registry
EventMeta<T> eventMeta = new EventMeta<>(type, event);
EventRegistry.storeEvent(eventMeta);
// post to Guava EventBus
GlobalEventBus.get().post(event); GlobalEventBus.get().post(event);
} }
} }

View File

@@ -1,3 +0,0 @@
package org.toop.eventbus;
public interface IEvents {}

View File

@@ -0,0 +1,8 @@
package org.toop.eventbus.events;
import java.util.Map;
public interface EventWithUuid {
Map<String, Object> result();
String eventId();
}

View File

@@ -1,9 +1,9 @@
package org.toop.eventbus; package org.toop.eventbus.events;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.toop.backend.tictactoe.TicTacToeServer;
import org.toop.core.Window; import org.toop.core.Window;
/** Events that are used in the GlobalEventBus class. */ /** Events that are used in the GlobalEventBus class. */
@@ -18,7 +18,7 @@ public class Events implements IEvents {
* @throws Exception * @throws Exception
*/ */
public static Object get(String eventName, Object... args) throws Exception { public static Object get(String eventName, Object... args) throws Exception {
Class<?> clazz = Class.forName("org.toop.eventbus.Events$ServerEvents$" + eventName); Class<?> clazz = Class.forName("org.toop.eventbus.events.Events$ServerEvents$" + eventName);
Class<?>[] paramTypes = Arrays.stream(args).map(Object::getClass).toArray(Class<?>[]::new); Class<?>[] paramTypes = Arrays.stream(args).map(Object::getClass).toArray(Class<?>[]::new);
Constructor<?> constructor = clazz.getConstructor(paramTypes); Constructor<?> constructor = clazz.getConstructor(paramTypes);
return constructor.newInstance(args); return constructor.newInstance(args);
@@ -36,7 +36,7 @@ public class Events implements IEvents {
public static Object get(String eventCategory, String eventName, Object... args) public static Object get(String eventCategory, String eventName, Object... args)
throws Exception { throws Exception {
Class<?> clazz = Class<?> clazz =
Class.forName("org.toop.eventbus.Events$" + eventCategory + "$" + eventName); Class.forName("org.toop.eventbus.events.Events$" + eventCategory + "$" + eventName);
Class<?>[] paramTypes = Arrays.stream(args).map(Object::getClass).toArray(Class<?>[]::new); Class<?>[] paramTypes = Arrays.stream(args).map(Object::getClass).toArray(Class<?>[]::new);
Constructor<?> constructor = clazz.getConstructor(paramTypes); Constructor<?> constructor = clazz.getConstructor(paramTypes);
return constructor.newInstance(args); return constructor.newInstance(args);
@@ -72,14 +72,6 @@ public class Events implements IEvents {
public static class ServerEvents { public static class ServerEvents {
/**
* BLOCKING Requests all active connections. The result is returned via the provided
* CompletableFuture.
*
* @param future List of all connections in string form.
*/
public record RequestsAllConnections(CompletableFuture<String> future) {}
/** /**
* BLOCKING Requests all active servers. The result is returned via the provided * BLOCKING Requests all active servers. The result is returned via the provided
* CompletableFuture. * CompletableFuture.
@@ -88,9 +80,6 @@ public class Events implements IEvents {
*/ */
public record RequestsAllServers(CompletableFuture<String> future) {} public record RequestsAllServers(CompletableFuture<String> future) {}
/** Forces closing all active connections immediately. */
public record ForceCloseAllConnections() {}
/** Forces closing all active servers immediately. */ /** Forces closing all active servers immediately. */
public record ForceCloseAllServers() {} public record ForceCloseAllServers() {}
@@ -100,7 +89,7 @@ public class Events implements IEvents {
* @param port The port to open the server. * @param port The port to open the server.
* @param gameType Either "tictactoe" or ... * @param gameType Either "tictactoe" or ...
*/ */
public record StartServer(String port, String gameType) {} public record StartServer(int port, String gameType) {}
/** /**
* BLOCKING Requests starting a server with a specific port and game type, and returns a * BLOCKING Requests starting a server with a specific port and game type, and returns a
@@ -111,7 +100,7 @@ public class Events implements IEvents {
* @param future The uuid of the server. * @param future The uuid of the server.
*/ */
public record StartServerRequest( public record StartServerRequest(
String port, String gameType, CompletableFuture<String> future) {} int port, String gameType, CompletableFuture<String> future) {}
/** /**
* Represents a server that has successfully started. * Represents a server that has successfully started.
@@ -119,7 +108,7 @@ public class Events implements IEvents {
* @param uuid The unique identifier of the server. * @param uuid The unique identifier of the server.
* @param port The port the server is listening on. * @param port The port the server is listening on.
*/ */
public record ServerStarted(String uuid, String port) {} public record ServerStarted(String uuid, int port) {}
/** /**
* BLOCKING Requests creation of a TicTacToe game on a specific server. * BLOCKING Requests creation of a TicTacToe game on a specific server.
@@ -151,61 +140,9 @@ public class Events implements IEvents {
*/ */
public record EndTicTacToeGame(String serverUuid, String gameUuid) {} public record EndTicTacToeGame(String serverUuid, String gameUuid) {}
/**
* Triggers starting a server connection.
*
* @param ip The IP address of the server to connect to.
* @param port The port of the server to connect to.
*/
public record StartConnection(String ip, String port) {}
/**
* BLOCKING Triggers starting a server connection and returns a future.
*
* @param ip The IP address of the server to connect to.
* @param port The port of the server to connect to.
* @param future Returns the UUID of the connection, when connection is established.
*/
public record StartConnectionRequest(
String ip, String port, CompletableFuture<String> future) {}
// public record StartGameConnectionRequest(String ip, String port, // public record StartGameConnectionRequest(String ip, String port,
// CompletableFuture<String> future) {} // CompletableFuture<String> future) {}
/**
* BLOCKING Triggers starting a server connection and returns a future.
*
* @param ip The IP address of the server to connect to.
* @param port The port of the server to connect to.
*/
public record ConnectionEstablished(Object connectionId, String ip, String port) {}
/**
* Triggers sending a command to a server.
*
* @param connectionId The UUID of the connection to send the command on.
* @param args The command arguments.
*/
public record SendCommand(String connectionId, String... args) {}
/**
* WIP Triggers when a command is sent to a server.
*
* @param command The TicTacToeServer instance that executed the command.
* @param args The command arguments.
* @param result The result returned from executing the command.
*/
public record OnCommand(
TicTacToeServer command, String[] args, String result) {} // TODO old
/**
* Triggers when the server client receives a message.
*
* @param ConnectionUuid The UUID of the connection that received the message.
* @param message The message received.
*/
public record ReceivedMessage(String ConnectionUuid, String message) {}
/** /**
* Triggers on changing the server IP. * Triggers on changing the server IP.
* *
@@ -218,33 +155,7 @@ public class Events implements IEvents {
* *
* @param port The new port. * @param port The new port.
*/ */
public record OnChangingServerPort(String port) {} public record OnChangingServerPort(int port) {}
/**
* Triggers reconnecting to a previous address.
*
* @param connectionId The identifier of the connection being reconnected.
*/
public record Reconnect(Object connectionId) {}
/**
* Triggers changing connection to a new address.
*
* @param connectionId The identifier of the connection being changed.
* @param ip The new IP address.
* @param port The new port.
*/
public record ChangeConnection(Object connectionId, String ip, String port) {}
/**
* Triggers when the server couldn't connect to the desired address.
*
* @param connectionId The identifier of the connection that failed.
*/
public record CouldNotConnect(Object connectionId) {}
/** WIP Triggers when a connection closes. */
public record ClosedConnection() {}
/** Triggers when a cell is clicked in one of the game boards. */ /** Triggers when a cell is clicked in one of the game boards. */
public record CellClicked(int cell) {} public record CellClicked(int cell) {}

View File

@@ -0,0 +1,3 @@
package org.toop.eventbus.events;
public interface IEvents {}

View File

@@ -0,0 +1,190 @@
package org.toop.eventbus.events;
import com.google.common.base.Supplier;
import org.toop.backend.tictactoe.TicTacToeServer;
import org.toop.frontend.networking.NetworkingGameClientHandler;
import java.lang.reflect.RecordComponent;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class NetworkEvents extends Events {
/**
* BLOCKING Requests all active connections. The result is returned via the provided
* CompletableFuture.
*
* @param future List of all connections in string form.
*/
public record RequestsAllClients(CompletableFuture<String> future) {}
/** Forces closing all active connections immediately. */
public record ForceCloseAllClients() {}
public record CloseClientRequest(CompletableFuture<String> future) {}
public record CloseClient(String connectionId) {}
/**
* Event to start a new client connection to a server.
* <p>
* This event is typically posted to the {@code GlobalEventBus} to initiate the creation of
* a client connection, and carries all information needed to establish that connection:
* <br>
* - A factory for creating the Netty handler that will manage the connection
* <br>
* - The server's IP address and port
* <br>
* - A unique event identifier for correlation with follow-up events
* </p>
*
* <p>
* The {@link #eventId()} allows callers to correlate the {@code StartClient} event
* with subsequent success/failure events. For example, a {@code StartClientSuccess}
* or {@code StartClientFailure} event may carry the same {@code eventId}.
* </p>
*
* @param handlerFactory Factory for constructing a {@link NetworkingGameClientHandler}.
* @param ip The IP address of the server to connect to.
* @param port The port number of the server to connect to.
* @param eventId A unique identifier for this event, typically injected
* automatically by the {@link org.toop.eventbus.EventPublisher}.
*/
public record StartClient(
Supplier<? extends NetworkingGameClientHandler> handlerFactory,
String ip,
int port,
String eventId
) implements EventWithUuid {
/**
* Returns a map representation of this event, where keys are record component names
* and values are their corresponding values. Useful for generic logging, debugging,
* or serializing events without hardcoding field names.
*
* @return a {@code Map<String, Object>} containing field names and values
*/
@Override
public Map<String, Object> result() {
return Stream.of(this.getClass().getRecordComponents())
.collect(Collectors.toMap(
RecordComponent::getName,
rc -> {
try {
return rc.getAccessor().invoke(this);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
));
}
/**
* Returns the unique event identifier used for correlating this event.
*
* @return the event ID string
*/
@Override
public String eventId() {
return this.eventId;
}
}
/**
* TODO: Update docs new input.
* BLOCKING Triggers starting a server connection and returns a future.
*
* @param ip The IP address of the server to connect to.
* @param port The port of the server to connect to.
* @param future Returns the UUID of the connection, when connection is established.
*/
public record StartClientRequest(
Supplier<? extends NetworkingGameClientHandler> handlerFactory,
String ip, int port, CompletableFuture<String> future) {}
/**
* BLOCKING Triggers starting a server connection and returns a future.
*
* @param ip The IP address of the server to connect to.
* @param port The port of the server to connect to.
*/
public record StartClientSuccess(Object connectionId, String ip, int port, String eventId)
implements EventWithUuid {
@Override
public Map<String, Object> result() {
return Stream.of(this.getClass().getRecordComponents())
.collect(Collectors.toMap(
RecordComponent::getName,
rc -> {
try {
return rc.getAccessor().invoke(this);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
));
}
@Override
public String eventId() {
return this.eventId;
}
}
/**
* Triggers sending a command to a server.
*
* @param connectionId The UUID of the connection to send the command on.
* @param args The command arguments.
*/
public record SendCommand(String connectionId, String... args) {}
/**
* WIP Triggers when a command is sent to a server.
*
* @param command The TicTacToeServer instance that executed the command.
* @param args The command arguments.
* @param result The result returned from executing the command.
*/
public record OnCommand(
TicTacToeServer command, String[] args, String result) {} // TODO old
/**
* Triggers reconnecting to a previous address.
*
* @param connectionId The identifier of the connection being reconnected.
*/
public record Reconnect(Object connectionId) {}
/**
* Triggers when the server client receives a message.
*
* @param ConnectionUuid The UUID of the connection that received the message.
* @param message The message received.
*/
public record ReceivedMessage(String ConnectionUuid, String message) {}
/**
* Triggers changing connection to a new address.
*
* @param connectionId The identifier of the connection being changed.
* @param ip The new IP address.
* @param port The new port.
*/
public record ChangeClient(Object connectionId, String ip, int port) {}
/**
* Triggers when the server couldn't connect to the desired address.
*
* @param connectionId The identifier of the connection that failed.
*/
public record CouldNotConnect(Object connectionId) {}
/** WIP Triggers when a connection closes. */
public record ClosedConnection() {}
}

View File

@@ -1,122 +0,0 @@
package org.toop.frontend;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.eventbus.Events;
import org.toop.eventbus.GlobalEventBus;
public class ConnectionManager {
private static final Logger logger = LogManager.getLogger(ConnectionManager.class);
/** Map of serverId -> Server instances */
private final Map<String, ServerConnection> serverConnections = new ConcurrentHashMap<>();
/** Starts a connection manager, to manage, connections. */
public ConnectionManager() {
GlobalEventBus.subscribeAndRegister(
Events.ServerEvents.StartConnectionRequest.class,
this::handleStartConnectionRequest);
GlobalEventBus.subscribeAndRegister(
Events.ServerEvents.StartConnection.class, this::handleStartConnection);
GlobalEventBus.subscribeAndRegister(
Events.ServerEvents.SendCommand.class, this::handleCommand);
GlobalEventBus.subscribeAndRegister(
Events.ServerEvents.Reconnect.class, this::handleReconnect);
// GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ChangeConnection.class,
// this::handleChangeConnection);
GlobalEventBus.subscribeAndRegister(
Events.ServerEvents.ForceCloseAllConnections.class, _ -> shutdownAll());
GlobalEventBus.subscribeAndRegister(
Events.ServerEvents.RequestsAllConnections.class, this::getAllConnections);
}
private String startConnectionRequest(String ip, String port) {
String connectionId = UUID.randomUUID().toString();
try {
if (!port.matches("[0-9]+")) {
port = "0000";
}
ServerConnection connection = new ServerConnection(connectionId, ip, port);
this.serverConnections.put(connectionId, connection);
new Thread(connection, "Connection-" + connectionId).start();
logger.info("Connected to server {} at {}:{}", connectionId, ip, port);
} catch (IOException e) {
logger.error("{}", e);
}
return connectionId;
}
private void handleStartConnectionRequest(Events.ServerEvents.StartConnectionRequest request) {
request.future()
.complete(
this.startConnectionRequest(
request.ip(),
request.port())); // TODO: Maybe post ConnectionEstablished event.
}
private void handleStartConnection(Events.ServerEvents.StartConnection event) {
GlobalEventBus.post(
new Events.ServerEvents.ConnectionEstablished(
this.startConnectionRequest(event.ip(), event.port()),
event.ip(),
event.port()));
}
private void handleCommand(
Events.ServerEvents.SendCommand
event) { // TODO: Move this to ServerConnection class, keep it internal.
ServerConnection serverConnection = this.serverConnections.get(event.connectionId());
if (serverConnection != null) {
serverConnection.sendCommandByString(event.args());
} else {
logger.warn("Server {} not found for command '{}'", event.connectionId(), event.args());
}
}
private void handleReconnect(Events.ServerEvents.Reconnect event) {
ServerConnection serverConnection = this.serverConnections.get(event.connectionId());
if (serverConnection != null) {
try {
serverConnection.reconnect();
logger.info("Server {} reconnected", event.connectionId());
} catch (Exception e) {
logger.error("Server {} failed to reconnect", event.connectionId(), e);
GlobalEventBus.post(new Events.ServerEvents.CouldNotConnect(event.connectionId()));
}
}
}
// private void handleChangeConnection(Events.ServerEvents.ChangeConnection event) {
// ServerConnection serverConnection = this.serverConnections.get(event.connectionId());
// if (serverConnection != null) {
// try {
// serverConnection.connect(event.ip(), event.port());
// logger.info("Server {} changed connection to {}:{}", event.connectionId(),
// event.ip(), event.port());
// } catch (Exception e) {
// logger.error("Server {} failed to change connection", event.connectionId(),
// e);
// GlobalEventBus.post(new
// Events.ServerEvents.CouldNotConnect(event.connectionId()));
// }
// }
// } TODO
private void getAllConnections(Events.ServerEvents.RequestsAllConnections request) {
List<ServerConnection> a = new ArrayList<>(this.serverConnections.values());
request.future().complete(a.toString());
}
public void shutdownAll() {
this.serverConnections.values().forEach(ServerConnection::closeConnection);
this.serverConnections.clear();
logger.info("All servers shut down");
}
}

View File

@@ -7,9 +7,11 @@ import java.util.concurrent.ExecutionException;
import javax.swing.*; import javax.swing.*;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.toop.eventbus.Events; import org.toop.eventbus.events.Events;
import org.toop.eventbus.GlobalEventBus; import org.toop.eventbus.GlobalEventBus;
import org.toop.eventbus.events.NetworkEvents;
import org.toop.frontend.games.LocalTicTacToe; import org.toop.frontend.games.LocalTicTacToe;
import org.toop.frontend.networking.NetworkingGameClientHandler;
public class RemoteGameSelector { public class RemoteGameSelector {
private static final Logger logger = LogManager.getLogger(RemoteGameSelector.class); private static final Logger logger = LogManager.getLogger(RemoteGameSelector.class);
@@ -56,7 +58,7 @@ public class RemoteGameSelector {
CompletableFuture<String> serverIdFuture = new CompletableFuture<>(); CompletableFuture<String> serverIdFuture = new CompletableFuture<>();
GlobalEventBus.post( GlobalEventBus.post(
new Events.ServerEvents.StartServerRequest( new Events.ServerEvents.StartServerRequest(
portTextField.getText(), Integer.parseInt(portTextField.getText()), // TODO: Unsafe parse
Objects.requireNonNull(gameSelectorBox.getSelectedItem()) Objects.requireNonNull(gameSelectorBox.getSelectedItem())
.toString() .toString()
.toLowerCase() .toLowerCase()
@@ -71,9 +73,10 @@ public class RemoteGameSelector {
CompletableFuture<String> connectionIdFuture = new CompletableFuture<>(); CompletableFuture<String> connectionIdFuture = new CompletableFuture<>();
GlobalEventBus.post( GlobalEventBus.post(
new Events.ServerEvents.StartConnectionRequest( new NetworkEvents.StartClientRequest(
NetworkingGameClientHandler::new,
ipTextField.getText(), ipTextField.getText(),
portTextField.getText(), Integer.parseInt(portTextField.getText()), // TODO: Not safe parsing
connectionIdFuture)); connectionIdFuture));
String connectionId; String connectionId;
try { try {
@@ -83,7 +86,7 @@ public class RemoteGameSelector {
} // TODO: Better error handling to not crash the system. } // TODO: Better error handling to not crash the system.
GlobalEventBus.subscribeAndRegister( GlobalEventBus.subscribeAndRegister(
Events.ServerEvents.ReceivedMessage.class, NetworkEvents.ReceivedMessage.class,
event -> { event -> {
if (event.message().equalsIgnoreCase("ok")) { if (event.message().equalsIgnoreCase("ok")) {
logger.info("received ok from server."); logger.info("received ok from server.");
@@ -93,7 +96,7 @@ public class RemoteGameSelector {
.toLowerCase() .toLowerCase()
.replace("gameid ", ""); .replace("gameid ", "");
GlobalEventBus.post( GlobalEventBus.post(
new Events.ServerEvents.SendCommand( new NetworkEvents.SendCommand(
"start_game " + gameId)); "start_game " + gameId));
} else { } else {
logger.info("{}", event.message()); logger.info("{}", event.message());
@@ -101,7 +104,7 @@ public class RemoteGameSelector {
}); });
GlobalEventBus.post( GlobalEventBus.post(
new Events.ServerEvents.SendCommand( new NetworkEvents.SendCommand(
connectionId, connectionId,
"create_game", "create_game",
nameTextField.getText(), nameTextField.getText(),
@@ -127,7 +130,7 @@ public class RemoteGameSelector {
frame.remove(mainMenu); frame.remove(mainMenu);
localTicTacToe = localTicTacToe =
LocalTicTacToe.createRemote( LocalTicTacToe.createRemote(
ipTextField.getText(), portTextField.getText()); ipTextField.getText(), Integer.parseInt(portTextField.getText())); // TODO: Unsafe parse
UIGameBoard ttt = new UIGameBoard(localTicTacToe, this); // TODO: Fix later UIGameBoard ttt = new UIGameBoard(localTicTacToe, this); // TODO: Fix later
frame.add(ttt.getTTTPanel()); // TODO: Fix later frame.add(ttt.getTTTPanel()); // TODO: Fix later
frame.revalidate(); frame.revalidate();

View File

@@ -3,9 +3,11 @@ package org.toop.frontend.games;
import java.util.concurrent.*; import java.util.concurrent.*;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.toop.eventbus.Events; import org.toop.eventbus.events.Events;
import org.toop.eventbus.GlobalEventBus; import org.toop.eventbus.GlobalEventBus;
import org.toop.eventbus.events.NetworkEvents;
import org.toop.frontend.UI.UIGameBoard; import org.toop.frontend.UI.UIGameBoard;
import org.toop.frontend.networking.NetworkingGameClientHandler;
import org.toop.game.tictactoe.GameBase; import org.toop.game.tictactoe.GameBase;
import org.toop.game.tictactoe.TicTacToe; import org.toop.game.tictactoe.TicTacToe;
import org.toop.game.tictactoe.ai.MinMaxTicTacToe; import org.toop.game.tictactoe.ai.MinMaxTicTacToe;
@@ -63,13 +65,12 @@ public class LocalTicTacToe { // TODO: Implement runnable
* @param ip The IP of the server to connect to. * @param ip The IP of the server to connect to.
* @param port The port of the server to connect to. * @param port The port of the server to connect to.
*/ */
private LocalTicTacToe(String ip, String port) { private LocalTicTacToe(String ip, int port) {
this.receivedMessageListener = this.receivedMessageListener =
GlobalEventBus.subscribe( GlobalEventBus.subscribe(this::receiveMessageAction);
Events.ServerEvents.ReceivedMessage.class, this::receiveMessageAction);
GlobalEventBus.register(this.receivedMessageListener); GlobalEventBus.register(this.receivedMessageListener);
this.connectionId = this.createConnection(ip, port); this.connectionId = this.createConnection(ip, port);
this.createGame(ip, port); this.createGame("X", "O");
this.isLocal = false; this.isLocal = false;
this.executor.submit(this::remoteGameThread); this.executor.submit(this::remoteGameThread);
} }
@@ -93,11 +94,11 @@ public class LocalTicTacToe { // TODO: Implement runnable
return new LocalTicTacToe(aiPlayers); return new LocalTicTacToe(aiPlayers);
} }
public static LocalTicTacToe createRemote(String ip, String port) { public static LocalTicTacToe createRemote(String ip, int port) {
return new LocalTicTacToe(ip, port); return new LocalTicTacToe(ip, port);
} }
private String createServer(String port) { private String createServer(int port) {
CompletableFuture<String> serverIdFuture = new CompletableFuture<>(); CompletableFuture<String> serverIdFuture = new CompletableFuture<>();
GlobalEventBus.post( GlobalEventBus.post(
new Events.ServerEvents.StartServerRequest(port, "tictactoe", serverIdFuture)); new Events.ServerEvents.StartServerRequest(port, "tictactoe", serverIdFuture));
@@ -109,10 +110,11 @@ public class LocalTicTacToe { // TODO: Implement runnable
return null; return null;
} }
private String createConnection(String ip, String port) { private String createConnection(String ip, int port) {
CompletableFuture<String> connectionIdFuture = new CompletableFuture<>(); CompletableFuture<String> connectionIdFuture = new CompletableFuture<>();
GlobalEventBus.post( GlobalEventBus.post(
new Events.ServerEvents.StartConnectionRequest( new NetworkEvents.StartClientRequest(
NetworkingGameClientHandler::new,
ip, ip,
port, port,
connectionIdFuture)); // TODO: what if server couldn't be started with port. connectionIdFuture)); // TODO: what if server couldn't be started with port.
@@ -232,7 +234,7 @@ public class LocalTicTacToe { // TODO: Implement runnable
this.endListeners(); this.endListeners();
} }
private void receiveMessageAction(Events.ServerEvents.ReceivedMessage receivedMessage) { private void receiveMessageAction(NetworkEvents.ReceivedMessage receivedMessage) {
if (!receivedMessage.ConnectionUuid().equals(this.connectionId)) { if (!receivedMessage.ConnectionUuid().equals(this.connectionId)) {
return; return;
} }
@@ -247,7 +249,7 @@ public class LocalTicTacToe { // TODO: Implement runnable
} }
private void sendCommand(String... args) { private void sendCommand(String... args) {
GlobalEventBus.post(new Events.ServerEvents.SendCommand(this.connectionId, args)); GlobalEventBus.post(new NetworkEvents.SendCommand(this.connectionId, args));
} }
private void endListeners() { private void endListeners() {

View File

@@ -4,6 +4,7 @@ import java.util.*;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.toop.eventbus.*; import org.toop.eventbus.*;
import org.toop.eventbus.events.Events;
import org.toop.frontend.graphics.Shader; import org.toop.frontend.graphics.Shader;
public class NodeManager { public class NodeManager {

View File

@@ -0,0 +1,139 @@
package org.toop.frontend.networking;
import com.google.common.base.Supplier;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class NetworkingClient {
private static final Logger logger = LogManager.getLogger(NetworkingClient.class);
final Bootstrap bootstrap = new Bootstrap();
final EventLoopGroup workerGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
private String connectionUuid;
private Channel channel;
private NetworkingGameClientHandler handler;
public NetworkingClient(
Supplier<? extends NetworkingGameClientHandler> handlerFactory,
String host,
int port) {
try {
this.bootstrap.group(this.workerGroup);
this.bootstrap.channel(NioSocketChannel.class);
this.bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
handler = handlerFactory.get();
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(1024)); // split at \n
pipeline.addLast(new StringDecoder()); // bytes -> String
pipeline.addLast(handler);
}
});
ChannelFuture channelFuture = this.bootstrap.connect(host, port).sync();
this.channel = channelFuture.channel();
} catch (Exception e) {
logger.error("Failed to create networking client instance", e);
}
}
public NetworkingGameClientHandler getHandler() {
return handler;
}
public void setConnectionUuid(String connectionUuid) {
this.connectionUuid = connectionUuid;
}
public boolean isChannelActive() {
return this.channel != null && this.channel.isActive();
}
public void writeAndFlush(String msg) {
String literalMsg = msg.replace("\n", "\\n").replace("\r", "\\r");
if (isChannelActive()) {
this.channel.writeAndFlush(msg);
logger.info("Connection {} sent message: {}", this.channel.remoteAddress(), literalMsg);
} else {
logger.warn("Cannot send message: {}, connection inactive.", literalMsg);
}
}
public void writeAndFlushnl(String msg) {
if (isChannelActive()) {
this.channel.writeAndFlush(msg + "\n");
logger.info("Connection {} sent message: {}", this.channel.remoteAddress(), msg);
} else {
logger.warn("Cannot send message: {}, connection inactive.", msg);
}
}
public void login(String username) {
this.writeAndFlush("login " + username + "\n");
}
public void logout() {
this.writeAndFlush("logout\n");
}
public void sendMove(int move) {
this.writeAndFlush("move " + move + "\n"); // append \n so server receives a full line
}
public void getGamelist() {
this.writeAndFlush("get gamelist\n");
}
public void getPlayerlist() {
this.writeAndFlush("get playerlist\n");
}
public void subscribe(String gameType) {
this.writeAndFlush("subscribe " + gameType + "\n");
}
public void forfeit() {
this.writeAndFlush("forfeit\n");
}
public void challenge(String playerName, String gameType) {
this.writeAndFlush("challenge " + playerName + " " + gameType + "\n");
}
public void acceptChallenge(String challengeNumber) {
this.writeAndFlush("challenge accept " + challengeNumber + "\n");
}
public void sendChatMessage(String message) {
this.writeAndFlush("message " + "\"" + message + "\"" + "\n");
}
public void help(String command) {
this.writeAndFlush("help " + command + "\n");
}
public void closeConnection() {
if (this.channel != null && this.channel.isActive()) {
this.channel.close().addListener(future -> {
if (future.isSuccess()) {
logger.info("Connection {} closed successfully", this.channel.remoteAddress());
} else {
logger.error("Error closing connection {}. Error: {}",
this.channel.remoteAddress(),
future.cause().getMessage());
}
});
}
}
}

View File

@@ -0,0 +1,134 @@
package org.toop.frontend.networking;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.base.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.eventbus.events.Events;
import org.toop.eventbus.GlobalEventBus;
import org.toop.eventbus.events.NetworkEvents;
public class NetworkingClientManager {
private static final Logger logger = LogManager.getLogger(NetworkingClientManager.class);
/** Map of serverId -> Server instances */
private final Map<String, NetworkingClient> networkClients = new ConcurrentHashMap<>();
/** Starts a connection manager, to manage, connections. */
public NetworkingClientManager() {
GlobalEventBus.subscribeAndRegister(this::handleStartClientRequest);
GlobalEventBus.subscribeAndRegister(this::handleStartClient);
GlobalEventBus.subscribeAndRegister(this::handleCommand);
GlobalEventBus.subscribeAndRegister(this::handleCloseClient);
// GlobalEventBus.subscribeAndRegister(
// Events.ServerEvents.Reconnect.class, this::handleReconnect);
// GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ChangeConnection.class,
// this::handleChangeConnection);
GlobalEventBus.subscribeAndRegister(this::shutdownAll);
GlobalEventBus.subscribeAndRegister(this::getAllConnections);
}
private String startConnectionRequest(Supplier<? extends NetworkingGameClientHandler> handlerFactory,
String ip,
int port) {
String connectionUuid = UUID.randomUUID().toString();
try {
NetworkingClient client = new NetworkingClient(
handlerFactory,
ip,
port);
this.networkClients.put(connectionUuid, client);
} catch (Exception e) {
logger.error(e);
}
return connectionUuid;
}
private void handleStartClientRequest(NetworkEvents.StartClientRequest request) {
request.future()
.complete(
this.startConnectionRequest(
request.handlerFactory(),
request.ip(),
request.port())); // TODO: Maybe post ConnectionEstablished event.
}
private void handleStartClient(NetworkEvents.StartClient event) {
GlobalEventBus.post(
new NetworkEvents.StartClientSuccess(
this.startConnectionRequest(
event.handlerFactory(),
event.ip(),
event.port()),
event.ip(),
event.port(),
event.eventId()
));
}
private void handleCommand(
NetworkEvents.SendCommand
event) { // TODO: Move this to ServerConnection class, keep it internal.
NetworkingClient client = this.networkClients.get(event.connectionId());
logger.info("Preparing to send command: {} to server: {}", event.args(), client);
if (client != null) {
String args = String.join(" ", event.args()) + "\n";
client.writeAndFlush(args);
} else {
logger.warn("Server {} not found for command '{}'", event.connectionId(), event.args());
}
}
private void handleCloseClient(NetworkEvents.CloseClient event) {
NetworkingClient client = this.networkClients.get(event.connectionId());
client.closeConnection(); // TODO: Check if not blocking, what if error, mb not remove?
this.networkClients.remove(event.connectionId());
logger.info("Client {} closed successfully.", event.connectionId());
}
// private void handleReconnect(Events.ServerEvents.Reconnect event) {
// NetworkingClient client = this.networkClients.get(event.connectionId());
// if (client != null) {
// try {
// client;
// logger.info("Server {} reconnected", event.connectionId());
// } catch (Exception e) {
// logger.error("Server {} failed to reconnect", event.connectionId(), e);
// GlobalEventBus.post(new Events.ServerEvents.CouldNotConnect(event.connectionId()));
// }
// }
// } // TODO: Reconnect on disconnect
// private void handleChangeConnection(Events.ServerEvents.ChangeConnection event) {
// ServerConnection serverConnection = this.serverConnections.get(event.connectionId());
// if (serverConnection != null) {
// try {
// serverConnection.connect(event.ip(), event.port());
// logger.info("Server {} changed connection to {}:{}", event.connectionId(),
// event.ip(), event.port());
// } catch (Exception e) {
// logger.error("Server {} failed to change connection", event.connectionId(),
// e);
// GlobalEventBus.post(new
// Events.ServerEvents.CouldNotConnect(event.connectionId()));
// }
// }
// } TODO
private void getAllConnections(NetworkEvents.RequestsAllClients request) {
List<NetworkingClient> a = new ArrayList<>(this.networkClients.values());
request.future().complete(a.toString());
}
public void shutdownAll(NetworkEvents.ForceCloseAllClients request) {
this.networkClients.values().forEach(NetworkingClient::closeConnection);
this.networkClients.clear();
logger.info("All servers shut down");
}
}

View File

@@ -0,0 +1,28 @@
package org.toop.frontend.networking;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.Main;
public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(NetworkingGameClientHandler.class);
public NetworkingGameClientHandler() {}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
logger.debug("Received message from server-{}, data: {}", ctx.channel().remoteAddress(), msg);
// TODO: Handle server messages
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error(cause.getMessage(), cause);
ctx.close();
}
}

View File

@@ -1,12 +1,13 @@
package org.toop.frontend; package org.toop.frontend.networking;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.concurrent.*; import java.util.concurrent.*;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.toop.eventbus.Events; import org.toop.eventbus.events.Events;
import org.toop.eventbus.GlobalEventBus; import org.toop.eventbus.GlobalEventBus;
import org.toop.eventbus.events.NetworkEvents;
public final class ServerConnection extends TcpClient implements Runnable { public final class ServerConnection extends TcpClient implements Runnable {
@@ -85,7 +86,7 @@ public final class ServerConnection extends TcpClient implements Runnable {
logger.info("Connection: {} received: '{}'", this.uuid, received); logger.info("Connection: {} received: '{}'", this.uuid, received);
// this.addReceivedMessageToQueue(received); // TODO: Will never go empty // this.addReceivedMessageToQueue(received); // TODO: Will never go empty
GlobalEventBus.post( GlobalEventBus.post(
new Events.ServerEvents.ReceivedMessage( new NetworkEvents.ReceivedMessage(
this.uuid, received)); // TODO: mb change this.uuid, received)); // TODO: mb change
} else { } else {
break; break;

View File

@@ -1,4 +1,4 @@
package org.toop.frontend; package org.toop.frontend.networking;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;

View File

@@ -0,0 +1,12 @@
//package org.toop.frontend.networking.handlers;
//
//import io.netty.channel.ChannelHandlerContext;
//import org.apache.logging.log4j.LogManager;
//import org.apache.logging.log4j.Logger;
//import org.toop.frontend.networking.NetworkingGameClientHandler;
//
//public class NetworkingTicTacToeClientHandler extends NetworkingGameClientHandler {
// static final Logger logger = LogManager.getLogger(NetworkingTicTacToeClientHandler.class);
//
//
//}

View File

@@ -4,6 +4,7 @@ import org.lwjgl.glfw.*;
import org.lwjgl.system.*; import org.lwjgl.system.*;
import org.toop.core.*; import org.toop.core.*;
import org.toop.eventbus.*; import org.toop.eventbus.*;
import org.toop.eventbus.events.Events;
public class GlfwWindow extends Window { public class GlfwWindow extends Window {
private long window; private long window;

View File

@@ -3,6 +3,7 @@ package org.toop.frontend.platform.graphics.opengl;
import org.lwjgl.opengl.*; import org.lwjgl.opengl.*;
import org.lwjgl.system.*; import org.lwjgl.system.*;
import org.toop.eventbus.*; import org.toop.eventbus.*;
import org.toop.eventbus.events.Events;
import org.toop.frontend.graphics.Renderer; import org.toop.frontend.graphics.Renderer;
import org.toop.frontend.graphics.Shader; import org.toop.frontend.graphics.Shader;

View File

@@ -0,0 +1,88 @@
package org.toop.eventbus;
import org.junit.jupiter.api.Test;
import org.toop.eventbus.events.EventWithUuid;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertTrue;
class EventPublisherPerformanceTest {
public record PerfEvent(String name, String eventId) implements EventWithUuid {
@Override
public java.util.Map<String, Object> result() {
return java.util.Map.of("name", name, "eventId", eventId);
}
}
@Test
void testEventCreationSpeed() {
int iterations = 10_000;
long start = System.nanoTime();
for (int i = 0; i < iterations; i++) {
new EventPublisher<>(PerfEvent.class, "event-" + i);
}
long end = System.nanoTime();
long durationMs = (end - start) / 1_000_000;
System.out.println("Created " + iterations + " events in " + durationMs + " ms");
assertTrue(durationMs < 500, "Event creation too slow");
}
@Test
void testEventPostSpeed() {
int iterations = 10_000;
AtomicInteger counter = new AtomicInteger(0);
GlobalEventBus.subscribeAndRegister(PerfEvent.class, e -> counter.incrementAndGet());
long start = System.nanoTime();
for (int i = 0; i < iterations; i++) {
new EventPublisher<>(PerfEvent.class, "event-" + i).postEvent();
}
long end = System.nanoTime();
long durationMs = (end - start) / 1_000_000;
System.out.println("Posted " + iterations + " events in " + durationMs + " ms");
assertTrue(counter.get() == iterations, "Not all events were received");
assertTrue(durationMs < 1000, "Posting events too slow");
}
@Test
void testConcurrentEventPostSpeed() throws InterruptedException {
int threads = 20;
int eventsPerThread = 5_000;
AtomicInteger counter = new AtomicInteger(0);
GlobalEventBus.subscribeAndRegister(PerfEvent.class, e -> counter.incrementAndGet());
Thread[] workers = new Thread[threads];
long start = System.nanoTime();
for (int t = 0; t < threads; t++) {
workers[t] = new Thread(() -> {
for (int i = 0; i < eventsPerThread; i++) {
new EventPublisher<>(PerfEvent.class, "event-" + i).postEvent();
}
});
workers[t].start();
}
for (Thread worker : workers) {
worker.join();
}
long end = System.nanoTime();
long durationMs = (end - start) / 1_000_000;
System.out.println("Posted " + (threads * eventsPerThread) + " events concurrently in " + durationMs + " ms");
assertTrue(counter.get() == threads * eventsPerThread, "Some events were lost");
assertTrue(durationMs < 5000, "Concurrent posting too slow");
}
}

View File

@@ -0,0 +1,160 @@
package org.toop.eventbus;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.toop.eventbus.events.EventWithUuid;
import java.math.BigInteger;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
class EventPublisherStressTest {
public record HeavyEvent(String payload, String eventId) implements EventWithUuid {
@Override
public java.util.Map<String, Object> result() {
return java.util.Map.of("payload", payload, "eventId", eventId);
}
}
private static final int THREADS = 100;
private static final long EVENTS_PER_THREAD = 2_000_000;
@Tag("stress")
@Test
void extremeConcurrencyTest_progressWithMemory() throws InterruptedException {
AtomicLong counter = new AtomicLong(0); // Big numbers safety
ExecutorService executor = Executors.newFixedThreadPool(THREADS);
GlobalEventBus.subscribeAndRegister(HeavyEvent.class, _ -> counter.incrementAndGet());
BigInteger totalEvents = BigInteger.valueOf(THREADS)
.multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
long startTime = System.currentTimeMillis();
// Monitor thread for EPS and memory
Thread monitor = new Thread(() -> {
long lastCount = 0;
long lastTime = startTime;
Runtime runtime = Runtime.getRuntime();
while (counter.get() < totalEvents.longValue()) {
try { Thread.sleep(1000); } catch (InterruptedException ignored) {}
long now = System.currentTimeMillis();
long completed = counter.get();
long eventsThisSecond = completed - lastCount;
double eps = eventsThisSecond / ((now - lastTime) / 1000.0);
// Memory usage
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
long maxMemory = runtime.maxMemory();
double usedPercent = usedMemory * 100.0 / maxMemory;
System.out.printf(
"Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)\n",
completed,
totalEvents.longValue(),
completed * 100.0 / totalEvents.doubleValue(),
eps,
usedMemory / 1024.0 / 1024.0,
usedPercent
);
lastCount = completed;
lastTime = now;
}
});
monitor.setDaemon(true);
monitor.start();
// Submit events
for (int t = 0; t < THREADS; t++) {
executor.submit(() -> {
for (int i = 0; i < EVENTS_PER_THREAD; i++) {
new EventPublisher<>(HeavyEvent.class, "payload-" + i).postEvent();
}
});
}
executor.shutdown();
executor.awaitTermination(20, TimeUnit.MINUTES); // allow extra time for huge tests
long endTime = System.currentTimeMillis();
double durationSeconds = (endTime - startTime) / 1000.0;
System.out.println("Posted " + totalEvents + " events in " + durationSeconds + " seconds");
double averageEps = totalEvents.doubleValue() / durationSeconds;
System.out.printf("Average EPS: %.0f%n", averageEps);
assertEquals(totalEvents.longValue(), counter.get());
}
@Tag("stress")
@Test
void efficientExtremeConcurrencyTest() throws InterruptedException {
final int THREADS = Runtime.getRuntime().availableProcessors(); // threads ≈ CPU cores
final int EVENTS_PER_THREAD = 5000;
ExecutorService executor = Executors.newFixedThreadPool(THREADS);
ConcurrentLinkedQueue<HeavyEvent> processedEvents = new ConcurrentLinkedQueue<>();
long start = System.nanoTime();
for (int t = 0; t < THREADS; t++) {
executor.submit(() -> {
for (int i = 0; i < EVENTS_PER_THREAD; i++) {
new EventPublisher<>(HeavyEvent.class, "payload-" + i)
.onEventById(HeavyEvent.class, processedEvents::add)
.postEvent();
}
});
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.MINUTES);
long end = System.nanoTime();
double durationSeconds = (end - start) / 1_000_000_000.0;
BigInteger totalEvents = BigInteger.valueOf((long) THREADS).multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
double eps = totalEvents.doubleValue() / durationSeconds;
System.out.printf("Posted %s events in %.3f seconds%n", totalEvents, durationSeconds);
System.out.printf("Throughput: %.0f events/sec%n", eps);
// Memory snapshot
Runtime rt = Runtime.getRuntime();
System.out.printf("Used memory: %.2f MB%n", (rt.totalMemory() - rt.freeMemory()) / 1024.0 / 1024.0);
// Ensure all events were processed
assertEquals(totalEvents.intValue(), processedEvents.size());
}
@Tag("stress")
@Test
void constructorCacheVsReflection() throws Throwable {
int iterations = 1_000_000;
long startReflect = System.nanoTime();
for (int i = 0; i < iterations; i++) {
// Reflection every time
HeavyEvent.class.getDeclaredConstructors()[0].newInstance("payload", "uuid-" + i);
}
long endReflect = System.nanoTime();
long startHandle = System.nanoTime();
for (int i = 0; i < iterations; i++) {
// Using cached MethodHandle
EventPublisher<HeavyEvent> ep = new EventPublisher<>(HeavyEvent.class, "payload-" + i);
}
long endHandle = System.nanoTime();
System.out.println("Reflection: " + (endReflect - startReflect) / 1_000_000 + " ms");
System.out.println("MethodHandle Cache: " + (endHandle - startHandle) / 1_000_000 + " ms");
}
}

View File

@@ -0,0 +1,122 @@
package org.toop.eventbus;
import com.google.common.eventbus.EventBus;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.toop.eventbus.events.EventWithUuid;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.*;
class EventPublisherTest {
// Simple test event implementing EventWithUuid
public record TestEvent(String name, String eventId) implements EventWithUuid {
@Override
public Map<String, Object> result() {
return Map.of("name", name, "eventId", eventId);
}
}
public record TestResponseEvent(String msg, String eventId) implements EventWithUuid {
@Override
public Map<String, Object> result() {
return Map.of("msg", msg, "eventId", eventId);
}
}
@Test
void testEventPublisherGeneratesUuid() {
EventPublisher<TestEvent> publisher = new EventPublisher<>(TestEvent.class, "myTest");
assertNotNull(publisher.getEventId());
assertEquals(publisher.getEventId(), publisher.getEvent().eventId());
}
@Test
void testPostEvent() {
AtomicBoolean triggered = new AtomicBoolean(false);
EventPublisher<TestEvent> publisher = new EventPublisher<>(TestEvent.class, "myTest");
publisher.onEventById(TestEvent.class, event -> triggered.set(true))
.postEvent();
assertTrue(triggered.get(), "Subscriber should have been triggered by postEvent");
}
@Test
void testOnEventByIdMatchesUuid() {
AtomicBoolean triggered = new AtomicBoolean(false);
EventPublisher<TestEvent> publisher1 = new EventPublisher<>(TestEvent.class, "event1");
EventPublisher<TestEvent> publisher2 = new EventPublisher<>(TestEvent.class, "event2");
publisher1.onEventById(TestEvent.class, event -> triggered.set(true));
publisher2.postEvent();
// Only publisher1's subscriber should trigger for its UUID
assertFalse(triggered.get(), "Subscriber should not trigger for a different UUID");
publisher1.postEvent();
assertTrue(triggered.get(), "Subscriber should trigger for matching UUID");
}
@Test
void testUnregisterAfterSuccess() {
AtomicBoolean triggered = new AtomicBoolean(false);
AtomicReference<Object> listenerRef = new AtomicReference<>();
EventPublisher<TestEvent> publisher = new EventPublisher<>(TestEvent.class, "event");
publisher.onEventById(TestEvent.class, event -> triggered.set(true))
.unregisterAfterSuccess()
.postEvent();
// Subscriber should have been removed after first trigger
assertTrue(triggered.get(), "Subscriber should trigger first time");
triggered.set(false);
publisher.postEvent();
assertFalse(triggered.get(), "Subscriber should not trigger after unregister");
}
@Test
void testResultMapPopulated() {
AtomicReference<Map<String, Object>> resultRef = new AtomicReference<>();
EventPublisher<TestEvent> publisher = new EventPublisher<>(TestEvent.class, "myName");
publisher.onEventById(TestEvent.class, event -> resultRef.set(event.result()))
.postEvent();
Map<String, Object> result = resultRef.get();
assertNotNull(result);
assertEquals("myName", result.get("name"));
assertEquals(publisher.getEventId(), result.get("eventId"));
}
@Test
void testMultipleSubscribers() {
AtomicBoolean firstTriggered = new AtomicBoolean(false);
AtomicBoolean secondTriggered = new AtomicBoolean(false);
EventPublisher<TestEvent> publisher = new EventPublisher<>(TestEvent.class, "multi");
publisher.onEventById(TestEvent.class, e -> firstTriggered.set(true))
.onEventById(TestEvent.class, e -> secondTriggered.set(true))
.postEvent();
assertTrue(firstTriggered.get());
assertTrue(secondTriggered.get());
}
@Test
void testEventInstanceCreatedCorrectly() {
EventPublisher<TestEvent> publisher = new EventPublisher<>(TestEvent.class, "hello");
TestEvent event = publisher.getEvent();
assertNotNull(event);
assertEquals("hello", event.name());
assertEquals(publisher.getEventId(), event.eventId());
}
}