Fixed unittests. Formatting

This commit is contained in:
lieght
2025-09-28 21:45:58 +02:00
committed by Bas Antonius de Jong
parent c76b7a800e
commit a94d83292e
25 changed files with 1506 additions and 945 deletions

View File

@@ -30,7 +30,7 @@ jobs:
needs: formatting-check needs: formatting-check
strategy: strategy:
matrix: matrix:
os: [ubuntu-latest, windows-latest, macos-latest] os: [ubuntu-latest] #windows-latest, macos-latest
steps: steps:
- uses: actions/checkout@v5 - uses: actions/checkout@v5
- uses: actions/setup-java@v5 - uses: actions/setup-java@v5

View File

@@ -13,6 +13,12 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.46.1</version>
</dependency>
<dependency> <dependency>
<groupId>org.toop</groupId> <groupId>org.toop</groupId>
<artifactId>pism_framework</artifactId> <artifactId>pism_framework</artifactId>
@@ -58,6 +64,41 @@
<!-- <fork>true</fork>--> <!-- <fork>true</fork>-->
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.46.1</version>
<configuration>
<!-- optional: limit format enforcement to just the files changed by this feature branch -->
<ratchetFrom>origin/main</ratchetFrom>
<formats>
<!-- you can define as many formats as you want, each is independent -->
<format>
<!-- define the files to apply to -->
<includes>
<include>.gitattributes</include>
<include>.gitignore</include>
</includes>
<!-- define the steps to apply to those files -->
<trimTrailingWhitespace/>
<endWithNewline/>
<indent>
<tabs>true</tabs>
<spacesPerTab>4</spacesPerTab>
</indent>
</format>
</formats>
<!-- define a language-specific format -->
<java>
<googleJavaFormat>
<version>1.28.0</version>
<style>AOSP</style> <!-- GOOGLE (2 indents), AOSP (4 indents) -->
<reflowLongStrings>true</reflowLongStrings>
<formatJavadoc>true</formatJavadoc>
</googleJavaFormat>
</java>
</configuration>
</plugin>
</plugins> </plugins>
</build> </build>
</project> </project>

View File

@@ -1,57 +1,86 @@
package org.toop; package org.toop;
import java.util.Arrays;
import org.toop.app.gui.LocalServerSelector;
import org.toop.framework.eventbus.EventFlow; import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.networking.events.NetworkEvents;
import org.toop.framework.networking.NetworkingClientManager; import org.toop.framework.networking.NetworkingClientManager;
import org.toop.framework.networking.NetworkingInitializationException; import org.toop.framework.networking.NetworkingInitializationException;
import org.toop.app.gui.LocalServerSelector; import org.toop.framework.networking.events.NetworkEvents;
import java.util.Arrays;
public class Main { public class Main {
static void main(String[] args) { static void main(String[] args) {
initSystems(); initSystems();
EventFlow a = new EventFlow() EventFlow a =
.addPostEvent( new EventFlow()
NetworkEvents.StartClient.class, .addPostEvent(NetworkEvents.StartClient.class, "127.0.0.1", 7789)
"127.0.0.1", .onResponse(Main::login)
7789) // .onResponse(Main::sendCommand)
.onResponse(Main::login) // .onResponse(Main::closeClient)
// .onResponse(Main::sendCommand) .asyncPostEvent();
// .onResponse(Main::closeClient)
.asyncPostEvent();
new Thread(() -> { new Thread(
while (a.getResult() == null) { () -> {
try { while (a.getResult() == null) {
Thread.sleep(2000); try {
} catch (InterruptedException e) {} Thread.sleep(2000);
} } catch (InterruptedException e) {
long clid = (Long) a.getResult().get("clientId"); }
new EventFlow() }
.addPostEvent(new NetworkEvents.SendCommand(clid, "get playerlist")) long clid = (Long) a.getResult().get("clientId");
.listen(NetworkEvents.PlayerListResponse.class, response -> { new EventFlow()
if (response.clientId() == clid) System.out.println(Arrays.toString(response.playerlist())); .addPostEvent(
}) new NetworkEvents.SendSubscribe(clid, "tic-tac-toe"))
.asyncPostEvent(); .listen(
}).start(); NetworkEvents.PlayerlistResponse.class,
response -> {
if (response.clientId() == clid)
System.out.println(
Arrays.toString(response.playerlist()));
})
.listen(
NetworkEvents.ChallengeResponse.class,
response -> {
if (response.clientId() == clid)
System.out.println(response.challengeId());
})
.listen(
NetworkEvents.ChallengeCancelledResponse.class,
response -> {
if (response.clientId() == clid)
System.out.println(response.challengeId());
})
.listen(
NetworkEvents.GamelistResponse.class,
response -> {
if (response.clientId() == clid)
System.out.println(
Arrays.toString(response.gamelist()));
})
.asyncPostEvent();
})
.start();
new Thread(() -> javax.swing.SwingUtilities.invokeLater(LocalServerSelector::new)).start(); new Thread(() -> javax.swing.SwingUtilities.invokeLater(LocalServerSelector::new)).start();
} }
private static void login(NetworkEvents.StartClientResponse event) { private static void login(NetworkEvents.StartClientResponse event) {
new Thread(() -> { new Thread(
try { () -> {
Thread.sleep(1000); try {
new EventFlow() Thread.sleep(1000);
.addPostEvent(new NetworkEvents.SendCommand(event.clientId(), "login bas")) new EventFlow()
.asyncPostEvent(); .addPostEvent(
} catch (InterruptedException e) {} new NetworkEvents.SendCommand(
}).start(); event.clientId(), "login bas"))
} .asyncPostEvent();
} catch (InterruptedException e) {
}
})
.start();
}
private static void initSystems() throws NetworkingInitializationException { private static void initSystems() throws NetworkingInitializationException {
new NetworkingClientManager(); new NetworkingClientManager();
} }
} }

View File

@@ -7,9 +7,9 @@ import javax.swing.*;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.EventFlow; import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.networking.NetworkingGameClientHandler;
import org.toop.framework.networking.events.NetworkEvents; import org.toop.framework.networking.events.NetworkEvents;
import org.toop.tictactoe.LocalTicTacToe; import org.toop.tictactoe.LocalTicTacToe;
import org.toop.framework.networking.NetworkingGameClientHandler;
import org.toop.tictactoe.gui.UIGameBoard; import org.toop.tictactoe.gui.UIGameBoard;
public class RemoteGameSelector { public class RemoteGameSelector {
@@ -55,36 +55,44 @@ public class RemoteGameSelector {
&& !portTextField.getText().isEmpty()) { && !portTextField.getText().isEmpty()) {
AtomicReference<Long> clientId = new AtomicReference<>(); AtomicReference<Long> clientId = new AtomicReference<>();
new EventFlow().addPostEvent( new EventFlow()
NetworkEvents.StartClient.class, .addPostEvent(
(Supplier<NetworkingGameClientHandler>) NetworkEvents.StartClient.class,
new NetworkingGameClientHandler(clientId.get()), (Supplier<NetworkingGameClientHandler>)
"127.0.0.1", new NetworkingGameClientHandler(clientId.get()),
5001 "127.0.0.1",
).onResponse( 5001)
NetworkEvents.StartClientResponse.class, .onResponse(
(response) -> { NetworkEvents.StartClientResponse.class,
clientId.set(response.clientId()); (response) -> {
} clientId.set(response.clientId());
).asyncPostEvent(); })
.asyncPostEvent();
// GlobalEventBus.subscribeAndRegister( // GlobalEventBus.subscribeAndRegister(
// NetworkEvents.ReceivedMessage.class, // NetworkEvents.ReceivedMessage.class,
// event -> { // event -> {
// if (event.message().equalsIgnoreCase("ok")) { // if
// logger.info("received ok from server."); // (event.message().equalsIgnoreCase("ok")) {
// } else if (event.message().toLowerCase().startsWith("gameid")) { // logger.info("received ok from
// String gameId = // server.");
// event.message() // } else if
// .toLowerCase() // (event.message().toLowerCase().startsWith("gameid")) {
// .replace("gameid ", ""); // String gameId =
// GlobalEventBus.post( // event.message()
// new NetworkEvents.SendCommand( // .toLowerCase()
// "start_game " + gameId)); // .replace("gameid
// } else { // ", "");
// logger.info("{}", event.message()); // GlobalEventBus.post(
// } // new
// }); // NetworkEvents.SendCommand(
// "start_game " +
// gameId));
// } else {
// logger.info("{}",
// event.message());
// }
// });
frame.remove(mainMenu); frame.remove(mainMenu);
UIGameBoard ttt = new UIGameBoard(localTicTacToe, this); UIGameBoard ttt = new UIGameBoard(localTicTacToe, this);
localTicTacToe.startThreads(); localTicTacToe.startThreads();

View File

@@ -1,7 +1,6 @@
package org.toop.tictactoe; package org.toop.tictactoe;
import java.util.concurrent.*; import java.util.concurrent.*;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.EventFlow; import org.toop.framework.eventbus.EventFlow;
@@ -10,11 +9,6 @@ import org.toop.game.Game;
import org.toop.game.tictactoe.TicTacToe; import org.toop.game.tictactoe.TicTacToe;
import org.toop.game.tictactoe.TicTacToeAI; import org.toop.game.tictactoe.TicTacToeAI;
import org.toop.tictactoe.gui.UIGameBoard; import org.toop.tictactoe.gui.UIGameBoard;
import org.toop.framework.networking.NetworkingGameClientHandler;
import java.util.function.Supplier;
import static java.lang.Thread.sleep;
/** /**
* A representation of a local tic-tac-toe game. Calls are made to a server for information about * A representation of a local tic-tac-toe game. Calls are made to a server for information about
@@ -71,24 +65,25 @@ public class LocalTicTacToe { // TODO: Implement runnable
* @param port The port of the server to connect to. * @param port The port of the server to connect to.
*/ */
private LocalTicTacToe(String ip, int port) { private LocalTicTacToe(String ip, int port) {
// this.receivedMessageListener = // this.receivedMessageListener =
// GlobalEventBus.subscribe(this::receiveMessageAction); // GlobalEventBus.subscribe(this::receiveMessageAction);
// GlobalEventBus.subscribe(this.receivedMessageListener); // GlobalEventBus.subscribe(this.receivedMessageListener);
// this.connectionId = this.createConnection(ip, port); TODO: Refactor this // this.connectionId = this.createConnection(ip, port); TODO: Refactor this
this.createGame("X", "O"); this.createGame("X", "O");
this.isLocal = false; this.isLocal = false;
//this.executor.submit(this::remoteGameThread); // this.executor.submit(this::remoteGameThread);
} }
private LocalTicTacToe(boolean[] aiFlags) { private LocalTicTacToe(boolean[] aiFlags) {
this.isAiPlayer = aiFlags; // store who is AI this.isAiPlayer = aiFlags; // store who is AI
this.isLocal = true; this.isLocal = true;
//this.executor.submit(this::localGameThread); // this.executor.submit(this::localGameThread);
} }
public void startThreads(){
public void startThreads() {
if (isLocal) { if (isLocal) {
this.executor.submit(this::localGameThread); this.executor.submit(this::localGameThread);
}else { } else {
this.executor.submit(this::remoteGameThread); this.executor.submit(this::remoteGameThread);
} }
} }
@@ -124,10 +119,10 @@ public class LocalTicTacToe { // TODO: Implement runnable
state = this.ticTacToe.play(this.moveQueuePlayerA.take()); state = this.ticTacToe.play(this.moveQueuePlayerA.take());
} else { } else {
Game.Move bestMove = ai.findBestMove(this.ticTacToe, 9); Game.Move bestMove = ai.findBestMove(this.ticTacToe, 9);
assert bestMove != null; assert bestMove != null;
state = this.ticTacToe.play(bestMove); state = this.ticTacToe.play(bestMove);
ui.setCell(bestMove.position(), "X"); ui.setCell(bestMove.position(), "X");
} }
if (state == Game.State.WIN || state == Game.State.DRAW) { if (state == Game.State.WIN || state == Game.State.DRAW) {
ui.setState(state, "X"); ui.setState(state, "X");
@@ -138,9 +133,9 @@ public class LocalTicTacToe { // TODO: Implement runnable
state = this.ticTacToe.play(this.moveQueuePlayerB.take()); state = this.ticTacToe.play(this.moveQueuePlayerB.take());
} else { } else {
Game.Move bestMove = ai.findBestMove(this.ticTacToe, 9); Game.Move bestMove = ai.findBestMove(this.ticTacToe, 9);
assert bestMove != null; assert bestMove != null;
state = this.ticTacToe.play(bestMove); state = this.ticTacToe.play(bestMove);
ui.setCell(bestMove.position(), "O"); ui.setCell(bestMove.position(), "O");
} }
if (state == Game.State.WIN || state == Game.State.DRAW) { if (state == Game.State.WIN || state == Game.State.DRAW) {
ui.setState(state, "O"); ui.setState(state, "O");
@@ -166,8 +161,8 @@ public class LocalTicTacToe { // TODO: Implement runnable
} }
public char[] getCurrentBoard() { public char[] getCurrentBoard() {
//return ticTacToe.getGrid(); // return ticTacToe.getGrid();
return new char[2]; return new char[2];
} }
/** End the current game. */ /** End the current game. */
@@ -206,7 +201,7 @@ public class LocalTicTacToe { // TODO: Implement runnable
private void endTheGame() { private void endTheGame() {
this.sendCommand("end_game", this.gameId); this.sendCommand("end_game", this.gameId);
// this.endListeners(); // this.endListeners();
} }
private void receiveMessageAction(NetworkEvents.ReceivedMessage receivedMessage) { private void receiveMessageAction(NetworkEvents.ReceivedMessage receivedMessage) {
@@ -215,8 +210,7 @@ public class LocalTicTacToe { // TODO: Implement runnable
} }
try { try {
logger.info( logger.info("Received message from {}: {}", this.clientId, receivedMessage.message());
"Received message from {}: {}", this.clientId, receivedMessage.message());
this.receivedQueue.put(receivedMessage.message()); this.receivedQueue.put(receivedMessage.message());
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.error("Error waiting for received Message", e); logger.error("Error waiting for received Message", e);
@@ -224,12 +218,14 @@ public class LocalTicTacToe { // TODO: Implement runnable
} }
private void sendCommand(String... args) { private void sendCommand(String... args) {
new EventFlow().addPostEvent(NetworkEvents.SendCommand.class, this.clientId, args).asyncPostEvent(); new EventFlow()
.addPostEvent(NetworkEvents.SendCommand.class, this.clientId, args)
.asyncPostEvent();
} }
// private void endListeners() { // private void endListeners() {
// GlobalEventBus.unregister(this.receivedMessageListener); // GlobalEventBus.unregister(this.receivedMessageListener);
// } TODO // } TODO
public void setUIReference(UIGameBoard uiGameBoard) { public void setUIReference(UIGameBoard uiGameBoard) {
this.ui = uiGameBoard; this.ui = uiGameBoard;

View File

@@ -13,6 +13,12 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.46.1</version>
</dependency>
<dependency> <dependency>
<groupId>io.netty</groupId> <groupId>io.netty</groupId>
<artifactId>netty-all</artifactId> <artifactId>netty-all</artifactId>
@@ -123,6 +129,41 @@
<!-- <fork>true</fork>--> <!-- <fork>true</fork>-->
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.46.1</version>
<configuration>
<!-- optional: limit format enforcement to just the files changed by this feature branch -->
<ratchetFrom>origin/main</ratchetFrom>
<formats>
<!-- you can define as many formats as you want, each is independent -->
<format>
<!-- define the files to apply to -->
<includes>
<include>.gitattributes</include>
<include>.gitignore</include>
</includes>
<!-- define the steps to apply to those files -->
<trimTrailingWhitespace/>
<endWithNewline/>
<indent>
<tabs>true</tabs>
<spacesPerTab>4</spacesPerTab>
</indent>
</format>
</formats>
<!-- define a language-specific format -->
<java>
<googleJavaFormat>
<version>1.28.0</version>
<style>AOSP</style> <!-- GOOGLE (2 indents), AOSP (4 indents) -->
<reflowLongStrings>true</reflowLongStrings>
<formatJavadoc>true</formatJavadoc>
</googleJavaFormat>
</java>
</configuration>
</plugin>
</plugins> </plugins>
</build> </build>

View File

@@ -42,9 +42,14 @@ public class SnowflakeGenerator {
} }
} }
void setTime(long l) {
this.lastTimestamp.set(l);
}
public SnowflakeGenerator() { public SnowflakeGenerator() {
if (machineId < 0 || machineId > MAX_MACHINE_ID) { if (machineId < 0 || machineId > MAX_MACHINE_ID) {
throw new IllegalArgumentException("Machine ID must be between 0 and " + MAX_MACHINE_ID); throw new IllegalArgumentException(
"Machine ID must be between 0 and " + MAX_MACHINE_ID);
} }
} }

View File

@@ -1,9 +1,5 @@
package org.toop.framework.eventbus; package org.toop.framework.eventbus;
import org.toop.framework.SnowflakeGenerator;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.events.EventWithSnowflake;
import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType; import java.lang.invoke.MethodType;
@@ -13,20 +9,21 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.toop.framework.SnowflakeGenerator;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.events.EventWithSnowflake;
/** /**
* EventFlow is a utility class for creating, posting, and optionally subscribing to events * EventFlow is a utility class for creating, posting, and optionally subscribing to events in a
* in a type-safe and chainable manner. It is designed to work with the {@link GlobalEventBus}. * type-safe and chainable manner. It is designed to work with the {@link GlobalEventBus}.
* *
* <p>This class supports automatic UUID assignment for {@link EventWithSnowflake} events, * <p>This class supports automatic UUID assignment for {@link EventWithSnowflake} events, and
* and allows filtering subscribers so they only respond to events with a specific UUID. * allows filtering subscribers so they only respond to events with a specific UUID. All
* All subscription methods are chainable, and you can configure automatic unsubscription * subscription methods are chainable, and you can configure automatic unsubscription after an event
* after an event has been successfully handled.</p> * has been successfully handled.
*/ */
public class EventFlow { public class EventFlow {
/** Lookup object used for dynamically invoking constructors via MethodHandles. */ /** Lookup object used for dynamically invoking constructors via MethodHandles. */
private static final MethodHandles.Lookup LOOKUP = MethodHandles.lookup(); private static final MethodHandles.Lookup LOOKUP = MethodHandles.lookup();
@@ -65,15 +62,20 @@ public class EventFlow {
try { try {
boolean isUuidEvent = EventWithSnowflake.class.isAssignableFrom(eventClass); boolean isUuidEvent = EventWithSnowflake.class.isAssignableFrom(eventClass);
MethodHandle ctorHandle = CONSTRUCTOR_CACHE.computeIfAbsent(eventClass, cls -> { MethodHandle ctorHandle =
try { CONSTRUCTOR_CACHE.computeIfAbsent(
Class<?>[] paramTypes = cls.getDeclaredConstructors()[0].getParameterTypes(); eventClass,
MethodType mt = MethodType.methodType(void.class, paramTypes); cls -> {
return LOOKUP.findConstructor(cls, mt); try {
} catch (Exception e) { Class<?>[] paramTypes =
throw new RuntimeException("Failed to find constructor handle for " + cls, e); cls.getDeclaredConstructors()[0].getParameterTypes();
} MethodType mt = MethodType.methodType(void.class, paramTypes);
}); return LOOKUP.findConstructor(cls, mt);
} catch (Exception e) {
throw new RuntimeException(
"Failed to find constructor handle for " + cls, e);
}
});
Object[] finalArgs; Object[] finalArgs;
int expectedParamCount = ctorHandle.type().parameterCount(); int expectedParamCount = ctorHandle.type().parameterCount();
@@ -98,67 +100,69 @@ public class EventFlow {
} }
} }
// public EventFlow addSnowflake() { // public EventFlow addSnowflake() {
// this.eventSnowflake = new SnowflakeGenerator(1).nextId(); // this.eventSnowflake = new SnowflakeGenerator(1).nextId();
// return this; // return this;
// } // }
/** /** Subscribe by ID: only fires if UUID matches this publisher's eventId. */
* Subscribe by ID: only fires if UUID matches this publisher's eventId. public <TT extends EventWithSnowflake> EventFlow onResponse(
*/ Class<TT> eventClass, Consumer<TT> action, boolean unsubscribeAfterSuccess) {
public <TT extends EventWithSnowflake> EventFlow onResponse(Class<TT> eventClass, Consumer<TT> action,
boolean unsubscribeAfterSuccess) {
ListenerHandler[] listenerHolder = new ListenerHandler[1]; ListenerHandler[] listenerHolder = new ListenerHandler[1];
listenerHolder[0] = new ListenerHandler( listenerHolder[0] =
GlobalEventBus.subscribe(eventClass, event -> { new ListenerHandler(
if (event.eventSnowflake() != this.eventSnowflake) return; GlobalEventBus.subscribe(
eventClass,
event -> {
if (event.eventSnowflake() != this.eventSnowflake) return;
action.accept(event); action.accept(event);
if (unsubscribeAfterSuccess && listenerHolder[0] != null) { if (unsubscribeAfterSuccess && listenerHolder[0] != null) {
GlobalEventBus.unsubscribe(listenerHolder[0]); GlobalEventBus.unsubscribe(listenerHolder[0]);
this.listeners.remove(listenerHolder[0]); this.listeners.remove(listenerHolder[0]);
} }
this.result = event.result(); this.result = event.result();
}) }));
);
this.listeners.add(listenerHolder[0]); this.listeners.add(listenerHolder[0]);
return this; return this;
} }
/** /** Subscribe by ID: only fires if UUID matches this publisher's eventId. */
* Subscribe by ID: only fires if UUID matches this publisher's eventId. public <TT extends EventWithSnowflake> EventFlow onResponse(
*/ Class<TT> eventClass, Consumer<TT> action) {
public <TT extends EventWithSnowflake> EventFlow onResponse(Class<TT> eventClass, Consumer<TT> action) {
return this.onResponse(eventClass, action, true); return this.onResponse(eventClass, action, true);
} }
/** /** Subscribe by ID without explicit class. */
* Subscribe by ID without explicit class.
*/
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <TT extends EventWithSnowflake> EventFlow onResponse(Consumer<TT> action, boolean unsubscribeAfterSuccess) { public <TT extends EventWithSnowflake> EventFlow onResponse(
Consumer<TT> action, boolean unsubscribeAfterSuccess) {
ListenerHandler[] listenerHolder = new ListenerHandler[1]; ListenerHandler[] listenerHolder = new ListenerHandler[1];
listenerHolder[0] = new ListenerHandler( listenerHolder[0] =
GlobalEventBus.subscribe(event -> { new ListenerHandler(
if (!(event instanceof EventWithSnowflake uuidEvent)) return; GlobalEventBus.subscribe(
if (uuidEvent.eventSnowflake() == this.eventSnowflake) { event -> {
try { if (!(event instanceof EventWithSnowflake uuidEvent)) return;
TT typedEvent = (TT) uuidEvent; if (uuidEvent.eventSnowflake() == this.eventSnowflake) {
action.accept(typedEvent); try {
if (unsubscribeAfterSuccess && listenerHolder[0] != null) { TT typedEvent = (TT) uuidEvent;
GlobalEventBus.unsubscribe(listenerHolder[0]); action.accept(typedEvent);
this.listeners.remove(listenerHolder[0]); if (unsubscribeAfterSuccess
} && listenerHolder[0] != null) {
this.result = typedEvent.result(); GlobalEventBus.unsubscribe(listenerHolder[0]);
} catch (ClassCastException _) { this.listeners.remove(listenerHolder[0]);
throw new ClassCastException("Cannot cast " + event.getClass().getName() + }
" to EventWithSnowflake"); this.result = typedEvent.result();
} } catch (ClassCastException _) {
} throw new ClassCastException(
}) "Cannot cast "
); + event.getClass().getName()
+ " to EventWithSnowflake");
}
}
}));
this.listeners.add(listenerHolder[0]); this.listeners.add(listenerHolder[0]);
return this; return this;
} }
@@ -167,19 +171,21 @@ public class EventFlow {
return this.onResponse(action, true); return this.onResponse(action, true);
} }
public <TT extends EventType> EventFlow listen(Class<TT> eventClass, Consumer<TT> action, public <TT extends EventType> EventFlow listen(
boolean unsubscribeAfterSuccess) { Class<TT> eventClass, Consumer<TT> action, boolean unsubscribeAfterSuccess) {
ListenerHandler[] listenerHolder = new ListenerHandler[1]; ListenerHandler[] listenerHolder = new ListenerHandler[1];
listenerHolder[0] = new ListenerHandler( listenerHolder[0] =
GlobalEventBus.subscribe(eventClass, event -> { new ListenerHandler(
action.accept(event); GlobalEventBus.subscribe(
eventClass,
event -> {
action.accept(event);
if (unsubscribeAfterSuccess && listenerHolder[0] != null) { if (unsubscribeAfterSuccess && listenerHolder[0] != null) {
GlobalEventBus.unsubscribe(listenerHolder[0]); GlobalEventBus.unsubscribe(listenerHolder[0]);
this.listeners.remove(listenerHolder[0]); this.listeners.remove(listenerHolder[0]);
} }
}) }));
);
this.listeners.add(listenerHolder[0]); this.listeners.add(listenerHolder[0]);
return this; return this;
} }
@@ -189,24 +195,28 @@ public class EventFlow {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <TT extends EventType> EventFlow listen(Consumer<TT> action, boolean unsubscribeAfterSuccess) { public <TT extends EventType> EventFlow listen(
Consumer<TT> action, boolean unsubscribeAfterSuccess) {
ListenerHandler[] listenerHolder = new ListenerHandler[1]; ListenerHandler[] listenerHolder = new ListenerHandler[1];
listenerHolder[0] = new ListenerHandler( listenerHolder[0] =
GlobalEventBus.subscribe(event -> { new ListenerHandler(
if (!(event instanceof EventType nonUuidEvent)) return; GlobalEventBus.subscribe(
try { event -> {
TT typedEvent = (TT) nonUuidEvent; if (!(event instanceof EventType nonUuidEvent)) return;
action.accept(typedEvent); try {
if (unsubscribeAfterSuccess && listenerHolder[0] != null) { TT typedEvent = (TT) nonUuidEvent;
GlobalEventBus.unsubscribe(listenerHolder[0]); action.accept(typedEvent);
this.listeners.remove(listenerHolder[0]); if (unsubscribeAfterSuccess && listenerHolder[0] != null) {
} GlobalEventBus.unsubscribe(listenerHolder[0]);
} catch (ClassCastException _) { this.listeners.remove(listenerHolder[0]);
throw new ClassCastException("Cannot cast " + event.getClass().getName() + }
" to EventWithSnowflake"); } catch (ClassCastException _) {
} throw new ClassCastException(
}) "Cannot cast "
); + event.getClass().getName()
+ " to EventWithSnowflake");
}
}));
this.listeners.add(listenerHolder[0]); this.listeners.add(listenerHolder[0]);
return this; return this;
} }

View File

@@ -3,26 +3,26 @@ package org.toop.framework.eventbus;
import com.lmax.disruptor.*; import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.dsl.ProducerType;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.events.EventWithSnowflake;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.events.EventWithSnowflake;
/** /**
* GlobalEventBus backed by the LMAX Disruptor for ultra-low latency, * GlobalEventBus backed by the LMAX Disruptor for ultra-low latency, high-throughput event
* high-throughput event publishing. * publishing.
*/ */
public final class GlobalEventBus { public final class GlobalEventBus {
/** Map of event class to type-specific listeners. */ /** Map of event class to type-specific listeners. */
private static final Map<Class<?>, CopyOnWriteArrayList<Consumer<? super EventType>>> LISTENERS = private static final Map<Class<?>, CopyOnWriteArrayList<Consumer<? super EventType>>>
new ConcurrentHashMap<>(); LISTENERS = new ConcurrentHashMap<>();
/** Map of event class to Snowflake-ID-specific listeners. */ /** Map of event class to Snowflake-ID-specific listeners. */
private static final Map<Class<?>, ConcurrentHashMap<Long, Consumer<? extends EventWithSnowflake>>> UUID_LISTENERS = private static final Map<
new ConcurrentHashMap<>(); Class<?>, ConcurrentHashMap<Long, Consumer<? extends EventWithSnowflake>>>
UUID_LISTENERS = new ConcurrentHashMap<>();
/** Disruptor ring buffer size (must be power of two). */ /** Disruptor ring buffer size (must be power of two). */
private static final int RING_BUFFER_SIZE = 1024 * 64; private static final int RING_BUFFER_SIZE = 1024 * 64;
@@ -34,27 +34,29 @@ public final class GlobalEventBus {
private static final RingBuffer<EventHolder> RING_BUFFER; private static final RingBuffer<EventHolder> RING_BUFFER;
static { static {
ThreadFactory threadFactory = r -> { ThreadFactory threadFactory =
Thread t = new Thread(r, "EventBus-Disruptor"); r -> {
t.setDaemon(true); Thread t = new Thread(r, "EventBus-Disruptor");
return t; t.setDaemon(true);
}; return t;
};
DISRUPTOR = new Disruptor<>( DISRUPTOR =
EventHolder::new, new Disruptor<>(
RING_BUFFER_SIZE, EventHolder::new,
threadFactory, RING_BUFFER_SIZE,
ProducerType.MULTI, threadFactory,
new BusySpinWaitStrategy() ProducerType.MULTI,
); new BusySpinWaitStrategy());
// Single consumer that dispatches to subscribers // Single consumer that dispatches to subscribers
DISRUPTOR.handleEventsWith((holder, seq, endOfBatch) -> { DISRUPTOR.handleEventsWith(
if (holder.event != null) { (holder, seq, endOfBatch) -> {
dispatchEvent(holder.event); if (holder.event != null) {
holder.event = null; dispatchEvent(holder.event);
} holder.event = null;
}); }
});
DISRUPTOR.start(); DISRUPTOR.start();
RING_BUFFER = DISRUPTOR.getRingBuffer(); RING_BUFFER = DISRUPTOR.getRingBuffer();
@@ -71,17 +73,21 @@ public final class GlobalEventBus {
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Subscription // Subscription
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
public static <T extends EventType> Consumer<T> subscribe(Class<T> eventClass, Consumer<T> listener) { public static <T extends EventType> Consumer<? super EventType> subscribe(
Class<T> eventClass, Consumer<T> listener) {
CopyOnWriteArrayList<Consumer<? super EventType>> list = CopyOnWriteArrayList<Consumer<? super EventType>> list =
LISTENERS.computeIfAbsent(eventClass, k -> new CopyOnWriteArrayList<>()); LISTENERS.computeIfAbsent(eventClass, k -> new CopyOnWriteArrayList<>());
list.add(event -> listener.accept(eventClass.cast(event)));
return listener; Consumer<? super EventType> wrapper = event -> listener.accept(eventClass.cast(event));
list.add(wrapper);
return wrapper;
} }
public static Consumer<Object> subscribe(Consumer<Object> listener) { public static Consumer<? super EventType> subscribe(Consumer<Object> listener) {
LISTENERS.computeIfAbsent(Object.class, _ -> new CopyOnWriteArrayList<>()) Consumer<? super EventType> wrapper = event -> listener.accept(event);
.add(listener); LISTENERS.computeIfAbsent(Object.class, _ -> new CopyOnWriteArrayList<>()).add(wrapper);
return listener; return wrapper;
} }
public static <T extends EventWithSnowflake> void subscribeById( public static <T extends EventWithSnowflake> void subscribeById(
@@ -95,7 +101,8 @@ public final class GlobalEventBus {
LISTENERS.values().forEach(list -> list.remove(listener)); LISTENERS.values().forEach(list -> list.remove(listener));
} }
public static <T extends EventWithSnowflake> void unsubscribeById(Class<T> eventClass, long eventId) { public static <T extends EventWithSnowflake> void unsubscribeById(
Class<T> eventClass, long eventId) {
Map<Long, Consumer<? extends EventWithSnowflake>> map = UUID_LISTENERS.get(eventClass); Map<Long, Consumer<? extends EventWithSnowflake>> map = UUID_LISTENERS.get(eventClass);
if (map != null) map.remove(eventId); if (map != null) map.remove(eventId);
} }
@@ -125,15 +132,22 @@ public final class GlobalEventBus {
CopyOnWriteArrayList<Consumer<? super EventType>> classListeners = LISTENERS.get(clazz); CopyOnWriteArrayList<Consumer<? super EventType>> classListeners = LISTENERS.get(clazz);
if (classListeners != null) { if (classListeners != null) {
for (Consumer<? super EventType> listener : classListeners) { for (Consumer<? super EventType> listener : classListeners) {
try { listener.accept(event); } catch (Throwable ignored) {} try {
listener.accept(event);
} catch (Throwable ignored) {
}
} }
} }
// generic listeners // generic listeners
CopyOnWriteArrayList<Consumer<? super EventType>> genericListeners = LISTENERS.get(Object.class); CopyOnWriteArrayList<Consumer<? super EventType>> genericListeners =
LISTENERS.get(Object.class);
if (genericListeners != null) { if (genericListeners != null) {
for (Consumer<? super EventType> listener : genericListeners) { for (Consumer<? super EventType> listener : genericListeners) {
try { listener.accept(event); } catch (Throwable ignored) {} try {
listener.accept(event);
} catch (Throwable ignored) {
}
} }
} }
@@ -144,7 +158,10 @@ public final class GlobalEventBus {
Consumer<EventWithSnowflake> listener = Consumer<EventWithSnowflake> listener =
(Consumer<EventWithSnowflake>) map.remove(snowflakeEvent.eventSnowflake()); (Consumer<EventWithSnowflake>) map.remove(snowflakeEvent.eventSnowflake());
if (listener != null) { if (listener != null) {
try { listener.accept(snowflakeEvent); } catch (Throwable ignored) {} try {
listener.accept(snowflakeEvent);
} catch (Throwable ignored) {
}
} }
} }
} }

View File

@@ -1,15 +1,14 @@
package org.toop.framework.eventbus; package org.toop.framework.eventbus;
import org.toop.framework.eventbus.events.EventType;
public class ListenerHandler { public class ListenerHandler {
private Object listener = null; private Object listener = null;
// private boolean unsubscribeAfterSuccess = true;
// public ListenerHandler(Object listener, boolean unsubAfterSuccess) { // private boolean unsubscribeAfterSuccess = true;
// this.listener = listener;
// this.unsubscribeAfterSuccess = unsubAfterSuccess; // public ListenerHandler(Object listener, boolean unsubAfterSuccess) {
// } // this.listener = listener;
// this.unsubscribeAfterSuccess = unsubAfterSuccess;
// }
public ListenerHandler(Object listener) { public ListenerHandler(Object listener) {
this.listener = listener; this.listener = listener;
@@ -19,8 +18,8 @@ public class ListenerHandler {
return this.listener; return this.listener;
} }
// public boolean isUnsubscribeAfterSuccess() { // public boolean isUnsubscribeAfterSuccess() {
// return this.unsubscribeAfterSuccess; // return this.unsubscribeAfterSuccess;
// } // }
} }

View File

@@ -9,13 +9,12 @@ import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.EventFlow; import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.networking.events.NetworkEvents; import org.toop.framework.networking.events.NetworkEvents;
import java.util.function.Supplier;
public class NetworkingClient { public class NetworkingClient {
private static final Logger logger = LogManager.getLogger(NetworkingClient.class); private static final Logger logger = LogManager.getLogger(NetworkingClient.class);
@@ -37,18 +36,20 @@ public class NetworkingClient {
bootstrap.group(workerGroup); bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class); bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() { bootstrap.handler(
@Override new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) { @Override
handler = handlerFactory.get(); public void initChannel(SocketChannel ch) {
handler = handlerFactory.get();
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(1024)); // split at \n pipeline.addLast(new LineBasedFrameDecoder(1024)); // split at \n
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // bytes -> String pipeline.addLast(
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); new StringDecoder(CharsetUtil.UTF_8)); // bytes -> String
pipeline.addLast(handler); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
} pipeline.addLast(handler);
}); }
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
this.channel = channelFuture.channel(); this.channel = channelFuture.channel();
this.host = host; this.host = host;
@@ -82,7 +83,8 @@ public class NetworkingClient {
String literalMsg = msg.replace("\n", "\\n").replace("\r", "\\r"); String literalMsg = msg.replace("\n", "\\n").replace("\r", "\\r");
if (isChannelActive()) { if (isChannelActive()) {
this.channel.writeAndFlush(msg); this.channel.writeAndFlush(msg);
logger.info("Connection {} sent message: '{}'", this.channel.remoteAddress(), literalMsg); logger.info(
"Connection {} sent message: '{}'", this.channel.remoteAddress(), literalMsg);
} else { } else {
logger.warn("Cannot send message: '{}', connection inactive.", literalMsg); logger.warn("Cannot send message: '{}', connection inactive.", literalMsg);
} }
@@ -99,23 +101,30 @@ public class NetworkingClient {
public void closeConnection() { public void closeConnection() {
if (this.channel != null && this.channel.isActive()) { if (this.channel != null && this.channel.isActive()) {
this.channel.close().addListener(future -> { this.channel
if (future.isSuccess()) { .close()
logger.info("Connection {} closed successfully", this.channel.remoteAddress()); .addListener(
new EventFlow() future -> {
.addPostEvent(new NetworkEvents.ClosedConnection(this.connectionId)) if (future.isSuccess()) {
.asyncPostEvent(); logger.info(
} else { "Connection {} closed successfully",
logger.error("Error closing connection {}. Error: {}", this.channel.remoteAddress());
this.channel.remoteAddress(), new EventFlow()
future.cause().getMessage()); .addPostEvent(
} new NetworkEvents.ClosedConnection(
}); this.connectionId))
.asyncPostEvent();
} else {
logger.error(
"Error closing connection {}. Error: {}",
this.channel.remoteAddress(),
future.cause().getMessage());
}
});
} }
} }
public long getId() { public long getId() {
return this.connectionId; return this.connectionId;
} }
} }

View File

@@ -2,11 +2,10 @@ package org.toop.framework.networking;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.SnowflakeGenerator; import org.toop.framework.SnowflakeGenerator;
import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.networking.events.NetworkEvents; import org.toop.framework.networking.events.NetworkEvents;
public class NetworkingClientManager { public class NetworkingClientManager {
@@ -14,7 +13,7 @@ public class NetworkingClientManager {
private static final Logger logger = LogManager.getLogger(NetworkingClientManager.class); private static final Logger logger = LogManager.getLogger(NetworkingClientManager.class);
/** Map of serverId -> Server instances */ /** Map of serverId -> Server instances */
private final Map<Long, NetworkingClient> networkClients = new ConcurrentHashMap<>(); final Map<Long, NetworkingClient> networkClients = new ConcurrentHashMap<>();
/** Starts a connection manager, to manage, connections. */ /** Starts a connection manager, to manage, connections. */
public NetworkingClientManager() throws NetworkingInitializationException { public NetworkingClientManager() throws NetworkingInitializationException {
@@ -39,20 +38,21 @@ public class NetworkingClientManager {
.listen(this::handleGetAllConnections) .listen(this::handleGetAllConnections)
.listen(this::handleShutdownAll); .listen(this::handleShutdownAll);
logger.info("NetworkingClientManager initialized"); logger.info("NetworkingClientManager initialized");
} catch (Exception e) { } catch (Exception e) {
logger.error("Failed to initialize the client manager", e); logger.error("Failed to initialize the client manager", e);
throw e; throw e;
} }
} }
private long startClientRequest(String ip, int port) { long startClientRequest(String ip, int port) {
long connectionId = new SnowflakeGenerator().nextId(); // TODO: Maybe use the one generated long connectionId = new SnowflakeGenerator().nextId(); // TODO: Maybe use the one generated
try { // With EventFlow try { // With EventFlow
NetworkingClient client = new NetworkingClient( NetworkingClient client =
() -> new NetworkingGameClientHandler(connectionId), new NetworkingClient(
ip, () -> new NetworkingGameClientHandler(connectionId),
port, ip,
connectionId); port,
connectionId);
client.setConnectionId(connectionId); client.setConnectionId(connectionId);
this.networkClients.put(connectionId, client); this.networkClients.put(connectionId, client);
logger.info("New client started successfully for {}:{}", ip, port); logger.info("New client started successfully for {}:{}", ip, port);
@@ -63,15 +63,14 @@ public class NetworkingClientManager {
} }
private long startClientRequest(String ip, int port, long clientId) { private long startClientRequest(String ip, int port, long clientId) {
try { // With EventFlow try { // With EventFlow
NetworkingClient client = new NetworkingClient( NetworkingClient client =
() -> new NetworkingGameClientHandler(clientId), new NetworkingClient(
ip, () -> new NetworkingGameClientHandler(clientId), ip, port, clientId);
port,
clientId);
client.setConnectionId(clientId); client.setConnectionId(clientId);
this.networkClients.replace(clientId, client); this.networkClients.replace(clientId, client);
logger.info("New client started successfully for {}:{}, replaced: {}", ip, port, clientId); logger.info(
"New client started successfully for {}:{}, replaced: {}", ip, port, clientId);
} catch (Exception e) { } catch (Exception e) {
logger.error(e); logger.error(e);
} }
@@ -79,21 +78,26 @@ public class NetworkingClientManager {
return clientId; return clientId;
} }
private void handleStartClient(NetworkEvents.StartClient event) { void handleStartClient(NetworkEvents.StartClient event) {
long id = this.startClientRequest(event.ip(), event.port()); long id = this.startClientRequest(event.ip(), event.port());
new Thread(() -> { new Thread(
try { () -> {
Thread.sleep(100); // TODO: Is this a good idea? try {
new EventFlow().addPostEvent(NetworkEvents.StartClientResponse.class, Thread.sleep(100); // TODO: Is this a good idea?
id, event.eventSnowflake() new EventFlow()
).asyncPostEvent(); .addPostEvent(
} catch (InterruptedException e) { NetworkEvents.StartClientResponse.class,
throw new RuntimeException(e); id,
} event.eventSnowflake())
}).start(); .asyncPostEvent();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.start();
} }
private void handleCommand( void handleCommand(
NetworkEvents.SendCommand NetworkEvents.SendCommand
event) { // TODO: Move this to ServerConnection class, keep it internal. event) { // TODO: Move this to ServerConnection class, keep it internal.
NetworkingClient client = this.networkClients.get(event.clientId()); NetworkingClient client = this.networkClients.get(event.clientId());
@@ -101,7 +105,7 @@ public class NetworkingClientManager {
sendCommand(client, args); sendCommand(client, args);
} }
private void handleSendLogin(NetworkEvents.SendLogin event) { void handleSendLogin(NetworkEvents.SendLogin event) {
NetworkingClient client = this.networkClients.get(event.clientId()); NetworkingClient client = this.networkClients.get(event.clientId());
sendCommand(client, String.format("LOGIN %s", event.username())); sendCommand(client, String.format("LOGIN %s", event.username()));
} }
@@ -133,7 +137,9 @@ public class NetworkingClientManager {
private void handleSendChallenge(NetworkEvents.SendChallenge event) { private void handleSendChallenge(NetworkEvents.SendChallenge event) {
NetworkingClient client = this.networkClients.get(event.clientId()); NetworkingClient client = this.networkClients.get(event.clientId());
sendCommand(client, String.format("CHALLENGE %s %s", event.usernameToChallenge(), event.gameType())); sendCommand(
client,
String.format("CHALLENGE %s %s", event.usernameToChallenge(), event.gameType()));
} }
private void handleSendAcceptChallenge(NetworkEvents.SendAcceptChallenge event) { private void handleSendAcceptChallenge(NetworkEvents.SendAcceptChallenge event) {
@@ -162,8 +168,12 @@ public class NetworkingClientManager {
} }
private void sendCommand(NetworkingClient client, String command) { private void sendCommand(NetworkingClient client, String command) {
logger.info("Preparing to send command: {} to server: {}:{}. clientId: {}", logger.info(
command.trim(), client.getHost(), client.getPort(), client.getId()); "Preparing to send command: {} to server: {}:{}. clientId: {}",
command.trim(),
client.getHost(),
client.getPort(),
client.getId());
client.writeAndFlushnl(command); client.writeAndFlushnl(command);
} }
@@ -173,14 +183,14 @@ public class NetworkingClientManager {
startClientRequest(event.ip(), event.port(), event.clientId()); startClientRequest(event.ip(), event.port(), event.clientId());
} }
private void handleCloseClient(NetworkEvents.CloseClient event) { void handleCloseClient(NetworkEvents.CloseClient event) {
NetworkingClient client = this.networkClients.get(event.clientId()); NetworkingClient client = this.networkClients.get(event.clientId());
client.closeConnection(); // TODO: Check if not blocking, what if error, mb not remove? client.closeConnection(); // TODO: Check if not blocking, what if error, mb not remove?
this.networkClients.remove(event.clientId()); this.networkClients.remove(event.clientId());
logger.info("Client {} closed successfully.", event.clientId()); logger.info("Client {} closed successfully.", event.clientId());
} }
private void handleGetAllConnections(NetworkEvents.RequestsAllClients request) { void handleGetAllConnections(NetworkEvents.RequestsAllClients request) {
List<NetworkingClient> a = new ArrayList<>(this.networkClients.values()); List<NetworkingClient> a = new ArrayList<>(this.networkClients.values());
request.future().complete(a); request.future().complete(a);
} }

View File

@@ -2,14 +2,13 @@ package org.toop.framework.networking;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.EventFlow; import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.networking.events.NetworkEvents; import org.toop.framework.networking.events.NetworkEvents;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter { public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(NetworkingGameClientHandler.class); private static final Logger logger = LogManager.getLogger(NetworkingGameClientHandler.class);
@@ -28,17 +27,27 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
return; return;
} }
if (rec.equalsIgnoreCase("ok")) { if (rec.equalsIgnoreCase("ok")) {
logger.info("Received OK message from server-{}, data: {}", ctx.channel().remoteAddress(), msg); logger.info(
"Received OK message from server-{}, data: {}",
ctx.channel().remoteAddress(),
msg);
return; return;
} }
if (rec.toLowerCase().startsWith("svr")) { if (rec.toLowerCase().startsWith("svr")) {
logger.info("Received SVR message from server-{}, data: {}", ctx.channel().remoteAddress(), msg); logger.info(
new EventFlow().addPostEvent(new NetworkEvents.ServerResponse(this.connectionId)).asyncPostEvent(); "Received SVR message from server-{}, data: {}",
ctx.channel().remoteAddress(),
msg);
new EventFlow()
.addPostEvent(new NetworkEvents.ServerResponse(this.connectionId))
.asyncPostEvent();
parseServerReturn(rec); parseServerReturn(rec);
return; return;
} }
logger.info("Received unparsed message from server-{}, data: {}", ctx.channel().remoteAddress(), msg); logger.info(
"Received unparsed message from server-{}, data: {}",
ctx.channel().remoteAddress(),
msg);
} }
private void parseServerReturn(String rec) { private void parseServerReturn(String rec) {
@@ -48,27 +57,43 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
Matcher gameMatch = gamePattern.matcher(recSrvRemoved); Matcher gameMatch = gamePattern.matcher(recSrvRemoved);
if (gameMatch.find()) { if (gameMatch.find()) {
switch(gameMatch.group(1)) { switch (gameMatch.group(1)) {
case "YOURTURN": gameYourTurnHandler(recSrvRemoved); return; case "YOURTURN":
case "MOVE": gameMoveHandler(recSrvRemoved); return; gameYourTurnHandler(recSrvRemoved);
case "MATCH": gameMatchHandler(recSrvRemoved); return; return;
case "CHALLENGE": gameChallengeHandler(recSrvRemoved); return; case "MOVE":
case "WIN", gameMoveHandler(recSrvRemoved);
"DRAW", return;
"LOSE": gameWinConditionHandler(recSrvRemoved); return; case "MATCH":
default: return; gameMatchHandler(recSrvRemoved);
return;
case "CHALLENGE":
gameChallengeHandler(recSrvRemoved);
return;
case "WIN", "DRAW", "LOSE":
gameWinConditionHandler(recSrvRemoved);
return;
default:
return;
} }
} else { } else {
Pattern getPattern = Pattern.compile("(\\w+)", Pattern.CASE_INSENSITIVE); Pattern getPattern = Pattern.compile("(\\w+)", Pattern.CASE_INSENSITIVE);
Matcher getMatch = getPattern.matcher(recSrvRemoved); Matcher getMatch = getPattern.matcher(recSrvRemoved);
if (getMatch.find()) { if (getMatch.find()) {
switch(getMatch.group(1)) { switch (getMatch.group(1)) {
case "PLAYERLIST": playerlistHandler(recSrvRemoved); return; case "PLAYERLIST":
case "GAMELIST": gamelistHandler(recSrvRemoved); return; playerlistHandler(recSrvRemoved);
case "HELP": helpHandler(recSrvRemoved); return; return;
default: return; case "GAMELIST":
gamelistHandler(recSrvRemoved);
return;
case "HELP":
helpHandler(recSrvRemoved);
return;
default:
return;
} }
} else { } else {
return; // TODO: Should be an error. return; // TODO: Should be an error.
@@ -77,25 +102,29 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
} }
private void gameMoveHandler(String rec) { private void gameMoveHandler(String rec) {
String[] msg = Pattern String[] msg =
.compile("(?:player|details|move):\\s*\"?([^\",}]+)\"?", Pattern.CASE_INSENSITIVE) Pattern.compile(
.matcher(rec) "(?:player|details|move):\\s*\"?([^\",}]+)\"?",
.results() Pattern.CASE_INSENSITIVE)
.map(m -> m.group(1).trim()) .matcher(rec)
.toArray(String[]::new); .results()
.map(m -> m.group(1).trim())
.toArray(String[]::new);
new EventFlow() new EventFlow()
.addPostEvent(new NetworkEvents.GameMoveResponse(this.connectionId, msg[0], msg[1], msg[2])) .addPostEvent(
new NetworkEvents.GameMoveResponse(
this.connectionId, msg[0], msg[1], msg[2]))
.asyncPostEvent(); .asyncPostEvent();
} }
private void gameWinConditionHandler(String rec) { private void gameWinConditionHandler(String rec) {
String condition = Pattern String condition =
.compile("\\b(win|draw|lose)\\b", Pattern.CASE_INSENSITIVE) Pattern.compile("\\b(win|draw|lose)\\b", Pattern.CASE_INSENSITIVE)
.matcher(rec) .matcher(rec)
.results() .results()
.toString() .toString()
.trim(); .trim();
new EventFlow() new EventFlow()
.addPostEvent(new NetworkEvents.GameResultResponse(this.connectionId, condition)) .addPostEvent(new NetworkEvents.GameResultResponse(this.connectionId, condition))
@@ -105,19 +134,26 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
private void gameChallengeHandler(String rec) { private void gameChallengeHandler(String rec) {
boolean isCancelled = rec.toLowerCase().startsWith("challenge accepted"); boolean isCancelled = rec.toLowerCase().startsWith("challenge accepted");
try { try {
String[] msg = Pattern String[] msg =
.compile("(?:CHALLENGER|GAMETYPE|CHALLENGENUMBER):\\s*\"?(.*?)\"?\\s*(?:,|})") Pattern.compile(
.matcher(rec) "(?:CHALLENGER|GAMETYPE|CHALLENGENUMBER):\\s*\"?(.*?)\"?\\s*(?:,|})")
.results() .matcher(rec)
.map(m -> m.group().trim()) .results()
.toArray(String[]::new); .map(m -> m.group().trim())
.toArray(String[]::new);
if (isCancelled) new EventFlow() if (isCancelled)
.addPostEvent(new NetworkEvents.ChallengeCancelledResponse(this.connectionId, msg[0])) new EventFlow()
.asyncPostEvent(); .addPostEvent(
else new EventFlow() new NetworkEvents.ChallengeCancelledResponse(
.addPostEvent(new NetworkEvents.ChallengeResponse(this.connectionId, msg[0], msg[1], msg[2])) this.connectionId, msg[0]))
.asyncPostEvent(); .asyncPostEvent();
else
new EventFlow()
.addPostEvent(
new NetworkEvents.ChallengeResponse(
this.connectionId, msg[0], msg[1], msg[2]))
.asyncPostEvent();
} catch (ArrayIndexOutOfBoundsException e) { } catch (ArrayIndexOutOfBoundsException e) {
logger.error("Array out of bounds for: {}", rec, e); logger.error("Array out of bounds for: {}", rec, e);
} }
@@ -125,30 +161,31 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
private void gameMatchHandler(String rec) { private void gameMatchHandler(String rec) {
try { try {
String[] msg = Pattern String[] msg =
.compile("\"([^\"]*)\"") Pattern.compile("\"([^\"]*)\"")
.matcher(rec) .matcher(rec)
.results() .results()
.map(m -> m.group(1).trim()) .map(m -> m.group(1).trim())
.toArray(String[]::new); .toArray(String[]::new);
// [0] playerToMove, [1] gameType, [2] opponent // [0] playerToMove, [1] gameType, [2] opponent
new EventFlow() new EventFlow()
.addPostEvent(new NetworkEvents.GameMatchResponse(this.connectionId, msg[0], msg[1], msg[2])) .addPostEvent(
new NetworkEvents.GameMatchResponse(
this.connectionId, msg[0], msg[1], msg[2]))
.asyncPostEvent(); .asyncPostEvent();
} catch (ArrayIndexOutOfBoundsException e) { } catch (ArrayIndexOutOfBoundsException e) {
logger.error("Array out of bounds for: {}", rec, e); logger.error("Array out of bounds for: {}", rec, e);
} }
} }
private void gameYourTurnHandler(String rec) { private void gameYourTurnHandler(String rec) {
String msg = Pattern String msg =
.compile("TURNMESSAGE:\\s*\"([^\"]*)\"") Pattern.compile("TURNMESSAGE:\\s*\"([^\"]*)\"")
.matcher(rec) .matcher(rec)
.results() .results()
.toString() .toString()
.trim(); .trim();
new EventFlow() new EventFlow()
.addPostEvent(new NetworkEvents.YourTurnResponse(this.connectionId, msg)) .addPostEvent(new NetworkEvents.YourTurnResponse(this.connectionId, msg))
@@ -156,12 +193,12 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
} }
private void playerlistHandler(String rec) { private void playerlistHandler(String rec) {
String[] players = Pattern String[] players =
.compile("\"([^\"]+)\"") Pattern.compile("\"([^\"]+)\"")
.matcher(rec) .matcher(rec)
.results() .results()
.map(m -> m.group(1).trim()) .map(m -> m.group(1).trim())
.toArray(String[]::new); .toArray(String[]::new);
new EventFlow() new EventFlow()
.addPostEvent(new NetworkEvents.PlayerlistResponse(this.connectionId, players)) .addPostEvent(new NetworkEvents.PlayerlistResponse(this.connectionId, players))
@@ -169,16 +206,16 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
} }
private void gamelistHandler(String rec) { private void gamelistHandler(String rec) {
String[] gameTypes = Pattern String[] gameTypes =
.compile("\"([^\"]+)\"") Pattern.compile("\"([^\"]+)\"")
.matcher(rec) .matcher(rec)
.results() .results()
.map(m -> m.group(1).trim()) .map(m -> m.group(1).trim())
.toArray(String[]::new); .toArray(String[]::new);
new EventFlow() new EventFlow()
.addPostEvent(new NetworkEvents.GamelistResponse(this.connectionId, gameTypes)) .addPostEvent(new NetworkEvents.GamelistResponse(this.connectionId, gameTypes))
.asyncPostEvent(); .asyncPostEvent();
} }
private void helpHandler(String rec) { private void helpHandler(String rec) {
@@ -190,5 +227,4 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
logger.error(cause.getMessage(), cause); logger.error(cause.getMessage(), cause);
ctx.close(); ctx.close();
} }
} }

View File

@@ -1,66 +1,73 @@
package org.toop.framework.networking.events; package org.toop.framework.networking.events;
import org.toop.framework.eventbus.events.EventWithSnowflake;
import org.toop.framework.eventbus.events.EventWithoutSnowflake;
import org.toop.framework.eventbus.events.EventsBase;
import org.toop.framework.networking.NetworkingClient;
import org.toop.framework.networking.NetworkingGameClientHandler;
import java.lang.reflect.RecordComponent; import java.lang.reflect.RecordComponent;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.toop.framework.eventbus.events.EventWithSnowflake;
import org.toop.framework.eventbus.events.EventWithoutSnowflake;
import org.toop.framework.eventbus.events.EventsBase;
import org.toop.framework.networking.NetworkingClient;
/** /**
* A collection of networking-related event records for use with the {@link org.toop.framework.eventbus.GlobalEventBus}. * A collection of networking-related event records for use with the {@link
* <p> * org.toop.framework.eventbus.GlobalEventBus}.
* This class defines all the events that can be posted or listened to in the networking subsystem. *
* Events are separated into those with unique IDs (EventWithSnowflake) and those without (EventWithoutSnowflake). * <p>This class defines all the events that can be posted or listened to in the networking
* </p> * subsystem. Events are separated into those with unique IDs (EventWithSnowflake) and those without
* (EventWithoutSnowflake).
*/ */
public class NetworkEvents extends EventsBase { public class NetworkEvents extends EventsBase {
/** /**
* Requests all active client connections. * Requests all active client connections.
* <p>
* This is a blocking event. The result will be delivered via the provided {@link CompletableFuture}.
* </p>
* *
* @param future CompletableFuture to receive the list of active {@link NetworkingClient} instances. * <p>This is a blocking event. The result will be delivered via the provided {@link
* CompletableFuture}.
*
* @param future CompletableFuture to receive the list of active {@link NetworkingClient}
* instances.
*/ */
public record RequestsAllClients(CompletableFuture<List<NetworkingClient>> future) implements EventWithoutSnowflake {} public record RequestsAllClients(CompletableFuture<List<NetworkingClient>> future)
implements EventWithoutSnowflake {}
/** Forces all active client connections to close immediately. */ /** Forces all active client connections to close immediately. */
public record ForceCloseAllClients() implements EventWithoutSnowflake {} public record ForceCloseAllClients() implements EventWithoutSnowflake {}
/** Response indicating a challenge was cancelled. */ /** Response indicating a challenge was cancelled. */
public record ChallengeCancelledResponse(long clientId, String challengeId) implements EventWithoutSnowflake {} public record ChallengeCancelledResponse(long clientId, String challengeId)
implements EventWithoutSnowflake {}
/** Response indicating a challenge was received. */ /** Response indicating a challenge was received. */
public record ChallengeResponse(long clientId, String challengerName, String gameType, String challengeId) public record ChallengeResponse(
long clientId, String challengerName, String gameType, String challengeId)
implements EventWithoutSnowflake {} implements EventWithoutSnowflake {}
/** Response containing a list of players for a client. */ /** Response containing a list of players for a client. */
public record PlayerlistResponse(long clientId, String[] playerlist) implements EventWithoutSnowflake {} public record PlayerlistResponse(long clientId, String[] playerlist)
implements EventWithoutSnowflake {}
/** Response containing a list of games for a client. */ /** Response containing a list of games for a client. */
public record GamelistResponse(long clientId, String[] gamelist) implements EventWithoutSnowflake {} public record GamelistResponse(long clientId, String[] gamelist)
implements EventWithoutSnowflake {}
/** Response indicating a game match information for a client. */ /** Response indicating a game match information for a client. */
public record GameMatchResponse(long clientId, String playerToMove, String gameType, String opponent) public record GameMatchResponse(
long clientId, String playerToMove, String gameType, String opponent)
implements EventWithoutSnowflake {} implements EventWithoutSnowflake {}
/** Response indicating the result of a game. */ /** Response indicating the result of a game. */
public record GameResultResponse(long clientId, String condition) implements EventWithoutSnowflake {} public record GameResultResponse(long clientId, String condition)
implements EventWithoutSnowflake {}
/** Response indicating a game move occurred. */ /** Response indicating a game move occurred. */
public record GameMoveResponse(long clientId, String player, String details, String move) public record GameMoveResponse(long clientId, String player, String details, String move)
implements EventWithoutSnowflake {} implements EventWithoutSnowflake {}
/** Response indicating it is the player's turn. */ /** Response indicating it is the player's turn. */
public record YourTurnResponse(long clientId, String message) implements EventWithoutSnowflake {} public record YourTurnResponse(long clientId, String message)
implements EventWithoutSnowflake {}
/** Request to send login credentials for a client. */ /** Request to send login credentials for a client. */
public record SendLogin(long clientId, String username) implements EventWithoutSnowflake {} public record SendLogin(long clientId, String username) implements EventWithoutSnowflake {}
@@ -85,7 +92,8 @@ public class NetworkEvents extends EventsBase {
implements EventWithoutSnowflake {} implements EventWithoutSnowflake {}
/** Request to accept a challenge. */ /** Request to accept a challenge. */
public record SendAcceptChallenge(long clientId, int challengeId) implements EventWithoutSnowflake {} public record SendAcceptChallenge(long clientId, int challengeId)
implements EventWithoutSnowflake {}
/** Request to forfeit a game. */ /** Request to forfeit a game. */
public record SendForfeit(long clientId) implements EventWithoutSnowflake {} public record SendForfeit(long clientId) implements EventWithoutSnowflake {}
@@ -97,36 +105,37 @@ public class NetworkEvents extends EventsBase {
public record SendHelp(long clientId) implements EventWithoutSnowflake {} public record SendHelp(long clientId) implements EventWithoutSnowflake {}
/** Request to display help for a specific command. */ /** Request to display help for a specific command. */
public record SendHelpForCommand(long clientId, String command) implements EventWithoutSnowflake {} public record SendHelpForCommand(long clientId, String command)
implements EventWithoutSnowflake {}
/** Request to close a specific client connection. */ /** Request to close a specific client connection. */
public record CloseClient(long clientId) implements EventWithoutSnowflake {} public record CloseClient(long clientId) implements EventWithoutSnowflake {}
/** /**
* Event to start a new client connection. * Event to start a new client connection.
* <p> *
* Carries IP, port, and a unique event ID for correlation with responses. * <p>Carries IP, port, and a unique event ID for correlation with responses.
* </p>
* *
* @param ip Server IP address. * @param ip Server IP address.
* @param port Server port. * @param port Server port.
* @param eventSnowflake Unique event identifier for correlation. * @param eventSnowflake Unique event identifier for correlation.
*/ */
public record StartClient(String ip, int port, long eventSnowflake) implements EventWithSnowflake { public record StartClient(String ip, int port, long eventSnowflake)
implements EventWithSnowflake {
@Override @Override
public Map<String, Object> result() { public Map<String, Object> result() {
return Stream.of(this.getClass().getRecordComponents()) return Stream.of(this.getClass().getRecordComponents())
.collect(Collectors.toMap( .collect(
RecordComponent::getName, Collectors.toMap(
rc -> { RecordComponent::getName,
try { rc -> {
return rc.getAccessor().invoke(this); try {
} catch (Exception e) { return rc.getAccessor().invoke(this);
throw new RuntimeException(e); } catch (Exception e) {
} throw new RuntimeException(e);
} }
)); }));
} }
@Override @Override
@@ -141,20 +150,21 @@ public class NetworkEvents extends EventsBase {
* @param clientId The client ID assigned to the new connection. * @param clientId The client ID assigned to the new connection.
* @param eventSnowflake Event ID used for correlation. * @param eventSnowflake Event ID used for correlation.
*/ */
public record StartClientResponse(long clientId, long eventSnowflake) implements EventWithSnowflake { public record StartClientResponse(long clientId, long eventSnowflake)
implements EventWithSnowflake {
@Override @Override
public Map<String, Object> result() { public Map<String, Object> result() {
return Stream.of(this.getClass().getRecordComponents()) return Stream.of(this.getClass().getRecordComponents())
.collect(Collectors.toMap( .collect(
RecordComponent::getName, Collectors.toMap(
rc -> { RecordComponent::getName,
try { rc -> {
return rc.getAccessor().invoke(this); try {
} catch (Exception e) { return rc.getAccessor().invoke(this);
throw new RuntimeException(e); } catch (Exception e) {
} throw new RuntimeException(e);
} }
)); }));
} }
@Override @Override
@@ -192,7 +202,8 @@ public class NetworkEvents extends EventsBase {
* @param ip The new server IP. * @param ip The new server IP.
* @param port The new server port. * @param port The new server port.
*/ */
public record ChangeClientHost(long clientId, String ip, int port) implements EventWithoutSnowflake {} public record ChangeClientHost(long clientId, String ip, int port)
implements EventWithoutSnowflake {}
/** WIP (Not working) Response indicating that the client could not connect. */ /** WIP (Not working) Response indicating that the client could not connect. */
public record CouldNotConnect(long clientId) implements EventWithoutSnowflake {} public record CouldNotConnect(long clientId) implements EventWithoutSnowflake {}

View File

@@ -1,4 +1,4 @@
package org.toop; package org.toop.framework;
import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assertions.*;
@@ -9,7 +9,6 @@ import org.apache.logging.log4j.core.config.LoggerConfig;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.toop.framework.Logging;
public class LoggingTest { public class LoggingTest {
@@ -106,6 +105,6 @@ public class LoggingTest {
LoggerConfig loggerConfig = LoggerConfig loggerConfig =
ctx.getConfiguration().getLoggers().get("org.toop.DoesNotExist"); ctx.getConfiguration().getLoggers().get("org.toop.DoesNotExist");
assertNull(loggerConfig); // class doesn't exist, so no logger added assertNull(loggerConfig);
} }
} }

View File

@@ -0,0 +1,78 @@
package org.toop.framework;
import static org.junit.jupiter.api.Assertions.*;
import java.util.HashSet;
import java.util.Set;
import org.junit.jupiter.api.Test;
class SnowflakeGeneratorTest {
@Test
void testMachineIdWithinBounds() {
SnowflakeGenerator generator = new SnowflakeGenerator();
long machineIdField = getMachineId(generator);
assertTrue(
machineIdField >= 0 && machineIdField <= 1023,
"Machine ID should be within 0-1023");
}
@Test
void testNextIdReturnsUniqueValues() {
SnowflakeGenerator generator = new SnowflakeGenerator();
Set<Long> ids = new HashSet<>();
for (int i = 0; i < 1000; i++) {
long id = generator.nextId();
assertFalse(ids.contains(id), "Duplicate ID generated");
ids.add(id);
}
}
@Test
void testSequenceRollover() throws Exception {
SnowflakeGenerator generator =
new SnowflakeGenerator() {
private long fakeTime = System.currentTimeMillis();
protected long timestamp() {
return fakeTime;
}
void incrementTime() {
fakeTime++;
}
};
long first = generator.nextId();
long second = generator.nextId();
assertNotEquals(
first, second, "IDs generated within same millisecond should differ by sequence");
// Force sequence overflow
for (int i = 0; i < (1 << 12); i++) generator.nextId();
long afterOverflow = generator.nextId();
assertTrue(afterOverflow > second, "ID after sequence rollover should be greater");
}
@Test
void testNextIdMonotonic() {
SnowflakeGenerator generator = new SnowflakeGenerator();
long prev = generator.nextId();
for (int i = 0; i < 100; i++) {
long next = generator.nextId();
assertTrue(next > prev, "IDs must be increasing");
prev = next;
}
}
// Helper: reflectively get machineId
private long getMachineId(SnowflakeGenerator generator) {
try {
var field = SnowflakeGenerator.class.getDeclaredField("machineId");
field.setAccessible(true);
return (long) field.get(generator);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -0,0 +1,253 @@
// package org.toop.framework.eventbus;
//
// import org.junit.jupiter.api.Tag;
// import org.junit.jupiter.api.Test;
// import org.toop.framework.eventbus.events.EventWithSnowflake;
//
// import java.math.BigInteger;
// import java.util.concurrent.*;
// import java.util.concurrent.atomic.LongAdder;
//
// import static org.junit.jupiter.api.Assertions.assertEquals;
//
// class EventFlowStressTest {
//
// /** Top-level record to ensure runtime type matches subscription */
// public record HeavyEvent(String payload, long eventSnowflake) implements EventWithSnowflake {
// @Override
// public java.util.Map<String, Object> result() {
// return java.util.Map.of("payload", payload, "eventId", eventSnowflake);
// }
//
// @Override
// public long eventSnowflake() {
// return this.eventSnowflake;
// }
// }
//
// public record HeavyEventSuccess(String payload, long eventSnowflake) implements
// EventWithSnowflake {
// @Override
// public java.util.Map<String, Object> result() {
// return java.util.Map.of("payload", payload, "eventId", eventSnowflake);
// }
//
// @Override
// public long eventSnowflake() {
// return eventSnowflake;
// }
// }
//
// private static final int THREADS = 32;
// private static final long EVENTS_PER_THREAD = 10_000_000;
//
// @Tag("stress")
// @Test
// void extremeConcurrencySendTest_progressWithMemory() throws InterruptedException {
// LongAdder counter = new LongAdder();
// ExecutorService executor = Executors.newFixedThreadPool(THREADS);
//
// BigInteger totalEvents = BigInteger.valueOf(THREADS)
// .multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
//
// long startTime = System.currentTimeMillis();
//
// // Monitor thread for EPS and memory
// Thread monitor = new Thread(() -> {
// long lastCount = 0;
// long lastTime = System.currentTimeMillis();
// Runtime runtime = Runtime.getRuntime();
//
// while (counter.sum() < totalEvents.longValue()) {
// try { Thread.sleep(200); } catch (InterruptedException ignored) {}
//
// long now = System.currentTimeMillis();
// long completed = counter.sum();
// long eventsThisPeriod = completed - lastCount;
// double eps = eventsThisPeriod / ((now - lastTime) / 1000.0);
//
// long usedMemory = runtime.totalMemory() - runtime.freeMemory();
// double usedPercent = usedMemory * 100.0 / runtime.maxMemory();
//
// System.out.printf(
// "Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n",
// completed,
// totalEvents.longValue(),
// completed * 100.0 / totalEvents.doubleValue(),
// eps,
// usedMemory / 1024.0 / 1024.0,
// usedPercent
// );
//
// lastCount = completed;
// lastTime = now;
// }
// });
// monitor.setDaemon(true);
// monitor.start();
//
// var listener = new EventFlow().listen(HeavyEvent.class, _ -> counter.increment());
//
// // Submit events asynchronously
// for (int t = 0; t < THREADS; t++) {
// executor.submit(() -> {
// for (int i = 0; i < EVENTS_PER_THREAD; i++) {
// var _ = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i)
// .asyncPostEvent();
// }
// });
// }
//
// executor.shutdown();
// executor.awaitTermination(10, TimeUnit.MINUTES);
//
// listener.getResult();
//
// long endTime = System.currentTimeMillis();
// double durationSeconds = (endTime - startTime) / 1000.0;
//
// System.out.println("Posted " + totalEvents + " events in " + durationSeconds + "
// seconds");
// double averageEps = totalEvents.doubleValue() / durationSeconds;
// System.out.printf("Average EPS: %.0f%n", averageEps);
//
// assertEquals(totalEvents.longValue(), counter.sum());
// }
//
// @Tag("stress")
// @Test
// void extremeConcurrencySendAndReturnTest_progressWithMemory() throws InterruptedException {
// LongAdder counter = new LongAdder();
// ExecutorService executor = Executors.newFixedThreadPool(THREADS);
//
// BigInteger totalEvents = BigInteger.valueOf(THREADS)
// .multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
//
// long startTime = System.currentTimeMillis();
//
// // Monitor thread for EPS and memory
// Thread monitor = new Thread(() -> {
// long lastCount = 0;
// long lastTime = System.currentTimeMillis();
// Runtime runtime = Runtime.getRuntime();
//
// while (counter.sum() < totalEvents.longValue()) {
// try { Thread.sleep(200); } catch (InterruptedException ignored) {}
//
// long now = System.currentTimeMillis();
// long completed = counter.sum();
// long eventsThisPeriod = completed - lastCount;
// double eps = eventsThisPeriod / ((now - lastTime) / 1000.0);
//
// long usedMemory = runtime.totalMemory() - runtime.freeMemory();
// double usedPercent = usedMemory * 100.0 / runtime.maxMemory();
//
// System.out.printf(
// "Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n",
// completed,
// totalEvents.longValue(),
// completed * 100.0 / totalEvents.doubleValue(),
// eps,
// usedMemory / 1024.0 / 1024.0,
// usedPercent
// );
//
// lastCount = completed;
// lastTime = now;
// }
// });
// monitor.setDaemon(true);
// monitor.start();
//
// // Submit events asynchronously
// for (int t = 0; t < THREADS; t++) {
// executor.submit(() -> {
// for (int i = 0; i < EVENTS_PER_THREAD; i++) {
// var a = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i)
// .onResponse(HeavyEventSuccess.class, _ -> counter.increment())
// .postEvent();
//
// new EventFlow().addPostEvent(HeavyEventSuccess.class, "payload-" + i,
// a.getEventSnowflake())
// .postEvent();
// }
// });
// }
//
// executor.shutdown();
// executor.awaitTermination(10, TimeUnit.MINUTES);
//
// long endTime = System.currentTimeMillis();
// double durationSeconds = (endTime - startTime) / 1000.0;
//
// System.out.println("Posted " + totalEvents + " events in " + durationSeconds + "
// seconds");
// double averageEps = totalEvents.doubleValue() / durationSeconds;
// System.out.printf("Average EPS: %.0f%n", averageEps);
//
// assertEquals(totalEvents.longValue(), counter.sum());
// }
//
//
// @Tag("stress")
// @Test
// void efficientExtremeConcurrencyTest() throws InterruptedException {
// final int THREADS = Runtime.getRuntime().availableProcessors();
// final int EVENTS_PER_THREAD = 5000;
//
// ExecutorService executor = Executors.newFixedThreadPool(THREADS);
// ConcurrentLinkedQueue<HeavyEvent> processedEvents = new ConcurrentLinkedQueue<>();
//
// long start = System.nanoTime();
//
// for (int t = 0; t < THREADS; t++) {
// executor.submit(() -> {
// for (int i = 0; i < EVENTS_PER_THREAD; i++) {
// new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i)
// .onResponse(HeavyEvent.class, processedEvents::add)
// .postEvent();
// }
// });
// }
//
// executor.shutdown();
// executor.awaitTermination(10, TimeUnit.MINUTES);
//
// long end = System.nanoTime();
// double durationSeconds = (end - start) / 1_000_000_000.0;
//
// BigInteger totalEvents = BigInteger.valueOf((long)
// THREADS).multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
// double eps = totalEvents.doubleValue() / durationSeconds;
//
// System.out.printf("Posted %s events in %.3f seconds%n", totalEvents, durationSeconds);
// System.out.printf("Throughput: %.0f events/sec%n", eps);
//
// Runtime rt = Runtime.getRuntime();
// System.out.printf("Used memory: %.2f MB%n", (rt.totalMemory() - rt.freeMemory()) / 1024.0
// / 1024.0);
//
// assertEquals(totalEvents.intValue(), processedEvents.size());
// }
//
// @Tag("stress")
// @Test
// void constructorCacheVsReflection() throws Throwable {
// int iterations = 1_000_000;
// long startReflect = System.nanoTime();
// for (int i = 0; i < iterations; i++) {
// HeavyEvent.class.getDeclaredConstructors()[0].newInstance("payload", "uuid-" + i);
// }
// long endReflect = System.nanoTime();
//
// long startHandle = System.nanoTime();
// for (int i = 0; i < iterations; i++) {
// EventFlow a = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i);
// }
// long endHandle = System.nanoTime();
//
// System.out.println("Reflection: " + (endReflect - startReflect) / 1_000_000 + " ms");
// System.out.println("MethodHandle Cache: " + (endHandle - startHandle) / 1_000_000 + "
// ms");
// }
// }

View File

@@ -1,14 +1,13 @@
package org.toop.framework.eventbus; package org.toop.framework.eventbus;
import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.*;
import org.toop.framework.SnowflakeGenerator;
import org.toop.framework.eventbus.events.EventWithSnowflake;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*; import org.toop.framework.SnowflakeGenerator;
import org.toop.framework.eventbus.events.EventWithSnowflake;
class EventFlowTest { class EventFlowTest {
@@ -20,12 +19,13 @@ class EventFlowTest {
long randomPart = id & ((1L << 22) - 1); long randomPart = id & ((1L << 22) - 1);
assertTrue(timestampPart > 0, "Timestamp part should be non-zero"); assertTrue(timestampPart > 0, "Timestamp part should be non-zero");
assertTrue(randomPart >= 0 && randomPart < (1L << 22), "Random part should be within 22 bits"); assertTrue(
randomPart >= 0 && randomPart < (1L << 22), "Random part should be within 22 bits");
} }
@Test @Test
void testSnowflakeMonotonicity() throws InterruptedException { void testSnowflakeMonotonicity() throws InterruptedException {
SnowflakeGenerator sf = new SnowflakeGenerator(1); SnowflakeGenerator sf = new SnowflakeGenerator();
long id1 = sf.nextId(); long id1 = sf.nextId();
Thread.sleep(1); // ensure timestamp increases Thread.sleep(1); // ensure timestamp increases
long id2 = sf.nextId(); long id2 = sf.nextId();
@@ -35,7 +35,7 @@ class EventFlowTest {
@Test @Test
void testSnowflakeUniqueness() { void testSnowflakeUniqueness() {
SnowflakeGenerator sf = new SnowflakeGenerator(1); SnowflakeGenerator sf = new SnowflakeGenerator();
Set<Long> ids = new HashSet<>(); Set<Long> ids = new HashSet<>();
for (int i = 0; i < 100_000; i++) { for (int i = 0; i < 100_000; i++) {
long id = sf.nextId(); long id = sf.nextId();
@@ -46,9 +46,20 @@ class EventFlowTest {
// --- Dummy Event classes for testing --- // --- Dummy Event classes for testing ---
static class DummySnowflakeEvent implements EventWithSnowflake { static class DummySnowflakeEvent implements EventWithSnowflake {
private final long snowflake; private final long snowflake;
DummySnowflakeEvent(long snowflake) { this.snowflake = snowflake; }
@Override public long eventSnowflake() { return snowflake; } DummySnowflakeEvent(long snowflake) {
@Override public java.util.Map<String, Object> result() { return java.util.Collections.emptyMap(); } this.snowflake = snowflake;
}
@Override
public long eventSnowflake() {
return snowflake;
}
@Override
public java.util.Map<String, Object> result() {
return java.util.Collections.emptyMap();
}
} }
@Test @Test

View File

@@ -1,88 +0,0 @@
//package org.toop.framework.eventbus;
//
//import org.junit.jupiter.api.Test;
//import org.toop.framework.eventbus.events.EventWithUuid;
//
//import java.util.concurrent.atomic.AtomicInteger;
//
//import static org.junit.jupiter.api.Assertions.assertTrue;
//
//class EventPublisherPerformanceTest {
//
// public record PerfEvent(String name, String eventId) implements EventWithUuid {
// @Override
// public java.util.Map<String, Object> result() {
// return java.util.Map.of("name", name, "eventId", eventId);
// }
// }
//
// @Test
// void testEventCreationSpeed() {
// int iterations = 10_000;
// long start = System.nanoTime();
//
// for (int i = 0; i < iterations; i++) {
// new EventPublisher<>(PerfEvent.class, "event-" + i);
// }
//
// long end = System.nanoTime();
// long durationMs = (end - start) / 1_000_000;
//
// System.out.println("Created " + iterations + " events in " + durationMs + " ms");
// assertTrue(durationMs < 500, "Event creation too slow");
// }
//
// @Test
// void testEventPostSpeed() {
// int iterations = 100_000;
// AtomicInteger counter = new AtomicInteger(0);
//
// GlobalEventBus.subscribe(PerfEvent.class, e -> counter.incrementAndGet());
//
// long start = System.nanoTime();
//
// for (int i = 0; i < iterations; i++) {
// new EventPublisher<>(PerfEvent.class, "event-" + i).postEvent();
// }
//
// long end = System.nanoTime();
// long durationMs = (end - start) / 1_000_000;
//
// System.out.println("Posted " + iterations + " events in " + durationMs + " ms");
// assertTrue(counter.get() == iterations, "Not all events were received");
// assertTrue(durationMs < 1000, "Posting events too slow");
// }
//
// @Test
// void testConcurrentEventPostSpeed() throws InterruptedException {
// int threads = 20;
// int eventsPerThread = 5_000;
// AtomicInteger counter = new AtomicInteger(0);
//
// GlobalEventBus.subscribe(PerfEvent.class, e -> counter.incrementAndGet());
//
// Thread[] workers = new Thread[threads];
//
// long start = System.nanoTime();
//
// for (int t = 0; t < threads; t++) {
// workers[t] = new Thread(() -> {
// for (int i = 0; i < eventsPerThread; i++) {
// new EventPublisher<>(PerfEvent.class, "event-" + i).postEvent();
// }
// });
// workers[t].start();
// }
//
// for (Thread worker : workers) {
// worker.join();
// }
//
// long end = System.nanoTime();
// long durationMs = (end - start) / 1_000_000;
//
// System.out.println("Posted " + (threads * eventsPerThread) + " events concurrently in " + durationMs + " ms");
// assertTrue(counter.get() == threads * eventsPerThread, "Some events were lost");
// assertTrue(durationMs < 5000, "Concurrent posting too slow");
// }
//}

View File

@@ -1,246 +0,0 @@
package org.toop.framework.eventbus;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.toop.framework.eventbus.events.EventWithSnowflake;
import java.math.BigInteger;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
import static org.junit.jupiter.api.Assertions.assertEquals;
class EventPublisherStressTest {
/** Top-level record to ensure runtime type matches subscription */
public record HeavyEvent(String payload, long eventSnowflake) implements EventWithSnowflake {
@Override
public java.util.Map<String, Object> result() {
return java.util.Map.of("payload", payload, "eventId", eventSnowflake);
}
@Override
public long eventSnowflake() {
return this.eventSnowflake;
}
}
public record HeavyEventSuccess(String payload, long eventSnowflake) implements EventWithSnowflake {
@Override
public java.util.Map<String, Object> result() {
return java.util.Map.of("payload", payload, "eventId", eventSnowflake);
}
@Override
public long eventSnowflake() {
return eventSnowflake;
}
}
private static final int THREADS = 32;
private static final long EVENTS_PER_THREAD = 10_000_000;
@Tag("stress")
@Test
void extremeConcurrencySendTest_progressWithMemory() throws InterruptedException {
LongAdder counter = new LongAdder();
ExecutorService executor = Executors.newFixedThreadPool(THREADS);
BigInteger totalEvents = BigInteger.valueOf(THREADS)
.multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
long startTime = System.currentTimeMillis();
// Monitor thread for EPS and memory
Thread monitor = new Thread(() -> {
long lastCount = 0;
long lastTime = System.currentTimeMillis();
Runtime runtime = Runtime.getRuntime();
while (counter.sum() < totalEvents.longValue()) {
try { Thread.sleep(200); } catch (InterruptedException ignored) {}
long now = System.currentTimeMillis();
long completed = counter.sum();
long eventsThisPeriod = completed - lastCount;
double eps = eventsThisPeriod / ((now - lastTime) / 1000.0);
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
double usedPercent = usedMemory * 100.0 / runtime.maxMemory();
System.out.printf(
"Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n",
completed,
totalEvents.longValue(),
completed * 100.0 / totalEvents.doubleValue(),
eps,
usedMemory / 1024.0 / 1024.0,
usedPercent
);
lastCount = completed;
lastTime = now;
}
});
monitor.setDaemon(true);
monitor.start();
var listener = new EventFlow().listen(HeavyEvent.class, _ -> counter.increment());
// Submit events asynchronously
for (int t = 0; t < THREADS; t++) {
executor.submit(() -> {
for (int i = 0; i < EVENTS_PER_THREAD; i++) {
var _ = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i)
.asyncPostEvent();
}
});
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.MINUTES);
listener.getResult();
long endTime = System.currentTimeMillis();
double durationSeconds = (endTime - startTime) / 1000.0;
System.out.println("Posted " + totalEvents + " events in " + durationSeconds + " seconds");
double averageEps = totalEvents.doubleValue() / durationSeconds;
System.out.printf("Average EPS: %.0f%n", averageEps);
assertEquals(totalEvents.longValue(), counter.sum());
}
@Tag("stress")
@Test
void extremeConcurrencySendAndReturnTest_progressWithMemory() throws InterruptedException {
LongAdder counter = new LongAdder();
ExecutorService executor = Executors.newFixedThreadPool(THREADS);
BigInteger totalEvents = BigInteger.valueOf(THREADS)
.multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
long startTime = System.currentTimeMillis();
// Monitor thread for EPS and memory
Thread monitor = new Thread(() -> {
long lastCount = 0;
long lastTime = System.currentTimeMillis();
Runtime runtime = Runtime.getRuntime();
while (counter.sum() < totalEvents.longValue()) {
try { Thread.sleep(200); } catch (InterruptedException ignored) {}
long now = System.currentTimeMillis();
long completed = counter.sum();
long eventsThisPeriod = completed - lastCount;
double eps = eventsThisPeriod / ((now - lastTime) / 1000.0);
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
double usedPercent = usedMemory * 100.0 / runtime.maxMemory();
System.out.printf(
"Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n",
completed,
totalEvents.longValue(),
completed * 100.0 / totalEvents.doubleValue(),
eps,
usedMemory / 1024.0 / 1024.0,
usedPercent
);
lastCount = completed;
lastTime = now;
}
});
monitor.setDaemon(true);
monitor.start();
// Submit events asynchronously
for (int t = 0; t < THREADS; t++) {
executor.submit(() -> {
for (int i = 0; i < EVENTS_PER_THREAD; i++) {
var a = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i)
.onResponse(HeavyEventSuccess.class, _ -> counter.increment())
.postEvent();
new EventFlow().addPostEvent(HeavyEventSuccess.class, "payload-" + i, a.getEventSnowflake())
.postEvent();
}
});
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.MINUTES);
long endTime = System.currentTimeMillis();
double durationSeconds = (endTime - startTime) / 1000.0;
System.out.println("Posted " + totalEvents + " events in " + durationSeconds + " seconds");
double averageEps = totalEvents.doubleValue() / durationSeconds;
System.out.printf("Average EPS: %.0f%n", averageEps);
assertEquals(totalEvents.longValue(), counter.sum());
}
@Tag("stress")
@Test
void efficientExtremeConcurrencyTest() throws InterruptedException {
final int THREADS = Runtime.getRuntime().availableProcessors();
final int EVENTS_PER_THREAD = 5000;
ExecutorService executor = Executors.newFixedThreadPool(THREADS);
ConcurrentLinkedQueue<HeavyEvent> processedEvents = new ConcurrentLinkedQueue<>();
long start = System.nanoTime();
for (int t = 0; t < THREADS; t++) {
executor.submit(() -> {
for (int i = 0; i < EVENTS_PER_THREAD; i++) {
new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i)
.onResponse(HeavyEvent.class, processedEvents::add)
.postEvent();
}
});
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.MINUTES);
long end = System.nanoTime();
double durationSeconds = (end - start) / 1_000_000_000.0;
BigInteger totalEvents = BigInteger.valueOf((long) THREADS).multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
double eps = totalEvents.doubleValue() / durationSeconds;
System.out.printf("Posted %s events in %.3f seconds%n", totalEvents, durationSeconds);
System.out.printf("Throughput: %.0f events/sec%n", eps);
Runtime rt = Runtime.getRuntime();
System.out.printf("Used memory: %.2f MB%n", (rt.totalMemory() - rt.freeMemory()) / 1024.0 / 1024.0);
assertEquals(totalEvents.intValue(), processedEvents.size());
}
@Tag("stress")
@Test
void constructorCacheVsReflection() throws Throwable {
int iterations = 1_000_000;
long startReflect = System.nanoTime();
for (int i = 0; i < iterations; i++) {
HeavyEvent.class.getDeclaredConstructors()[0].newInstance("payload", "uuid-" + i);
}
long endReflect = System.nanoTime();
long startHandle = System.nanoTime();
for (int i = 0; i < iterations; i++) {
EventFlow a = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i);
}
long endHandle = System.nanoTime();
System.out.println("Reflection: " + (endReflect - startReflect) / 1_000_000 + " ms");
System.out.println("MethodHandle Cache: " + (endHandle - startHandle) / 1_000_000 + " ms");
}
}

View File

@@ -1,110 +1,159 @@
//package org.toop.framework.eventbus; package org.toop.framework.eventbus;
//
//import net.engio.mbassy.bus.publication.SyncAsyncPostCommand;
//import org.junit.jupiter.api.AfterEach;
//import org.junit.jupiter.api.Test;
//import org.toop.framework.eventbus.events.IEvent;
//
//import java.util.concurrent.atomic.AtomicBoolean;
//import java.util.concurrent.atomic.AtomicReference;
//
//import static org.junit.jupiter.api.Assertions.*;
//
//class GlobalEventBusTest {
//
// // A simple test event
// static class TestEvent implements IEvent {
// private final String message;
//
// TestEvent(String message) {
// this.message = message;
// }
//
// String getMessage() {
// return message;
// }
// }
//
// @AfterEach
// void tearDown() {
// // Reset to avoid leaking subscribers between tests
// GlobalEventBus.reset();
// }
//
// @Test
// void testSubscribeWithType() {
// AtomicReference<String> result = new AtomicReference<>();
//
// GlobalEventBus.subscribe(TestEvent.class, e -> result.set(e.getMessage()));
//
// GlobalEventBus.post(new TestEvent("hello"));
//
// assertEquals("hello", result.get());
// }
//
// @Test
// void testSubscribeWithoutType() {
// AtomicReference<String> result = new AtomicReference<>();
//
// GlobalEventBus.subscribe((TestEvent e) -> result.set(e.getMessage()));
//
// GlobalEventBus.post(new TestEvent("world"));
//
// assertEquals("world", result.get());
// }
//
// @Test
// void testUnsubscribeStopsReceivingEvents() {
// AtomicBoolean called = new AtomicBoolean(false);
//
// Object listener = GlobalEventBus.subscribe(TestEvent.class, e -> called.set(true));
//
// // First event should trigger
// GlobalEventBus.post(new TestEvent("first"));
// assertTrue(called.get());
//
// // Reset flag
// called.set(false);
//
// // Unsubscribe and post again
// GlobalEventBus.unsubscribe(listener);
// GlobalEventBus.post(new TestEvent("second"));
//
// assertFalse(called.get(), "Listener should not be called after unsubscribe");
// }
//
// @Test
// void testResetClearsListeners() {
// AtomicBoolean called = new AtomicBoolean(false);
//
// GlobalEventBus.subscribe(TestEvent.class, e -> called.set(true));
//
// GlobalEventBus.reset(); // should wipe subscriptions
//
// GlobalEventBus.post(new TestEvent("ignored"));
//
// assertFalse(called.get(), "Listener should not survive reset()");
// }
// @Test import static org.junit.jupiter.api.Assertions.*;
// void testSetReplacesBus() {
// MBassadorMock<IEvent> mockBus = new MBassadorMock<>(); import java.util.concurrent.*;
// GlobalEventBus.set(mockBus); import java.util.concurrent.atomic.AtomicBoolean;
// import java.util.concurrent.atomic.AtomicReference;
// TestEvent event = new TestEvent("test"); import java.util.function.Consumer;
// GlobalEventBus.post(event); import org.junit.jupiter.api.*;
// import org.toop.framework.eventbus.events.EventType;
// assertEquals(event, mockBus.lastPosted, "Custom bus should receive the event"); import org.toop.framework.eventbus.events.EventWithSnowflake;
// }
// class GlobalEventBusTest {
// // Minimal fake MBassador for verifying set()
// static class MBassadorMock<T extends IEvent> extends net.engio.mbassy.bus.MBassador<T> { // ------------------------------------------------------------------------
// T lastPosted; // Test Events
// // ------------------------------------------------------------------------
// @Override private record TestEvent(String message) implements EventType {}
// public SyncAsyncPostCommand<T> post(T message) {
// this.lastPosted = message; private record TestSnowflakeEvent(long eventSnowflake, String payload)
// return super.post(message); implements EventWithSnowflake {
// } @Override
// } public java.util.Map<String, Object> result() {
//} return java.util.Map.of("payload", payload);
}
}
static class SampleEvent implements EventType {
private final String message;
SampleEvent(String message) {
this.message = message;
}
public String message() {
return message;
}
}
@AfterEach
void cleanup() {
GlobalEventBus.reset();
}
// ------------------------------------------------------------------------
// Subscriptions
// ------------------------------------------------------------------------
@Test
void testSubscribeAndPost() {
AtomicReference<String> received = new AtomicReference<>();
Consumer<TestEvent> listener = e -> received.set(e.message());
GlobalEventBus.subscribe(TestEvent.class, listener);
GlobalEventBus.post(new TestEvent("hello"));
assertEquals("hello", received.get());
}
@Test
void testUnsubscribe() {
GlobalEventBus.reset();
AtomicBoolean called = new AtomicBoolean(false);
// Subscribe and keep the wrapper reference
Consumer<? super EventType> subscription =
GlobalEventBus.subscribe(SampleEvent.class, e -> called.set(true));
// Post once -> should trigger
GlobalEventBus.post(new SampleEvent("test1"));
assertTrue(called.get(), "Listener should be triggered before unsubscribe");
// Reset flag
called.set(false);
// Unsubscribe using the wrapper reference
GlobalEventBus.unsubscribe(subscription);
// Post again -> should NOT trigger
GlobalEventBus.post(new SampleEvent("test2"));
assertFalse(called.get(), "Listener should not be triggered after unsubscribe");
}
@Test
void testSubscribeGeneric() {
AtomicReference<EventType> received = new AtomicReference<>();
Consumer<Object> listener = e -> received.set((EventType) e);
GlobalEventBus.subscribe(listener);
TestEvent event = new TestEvent("generic");
GlobalEventBus.post(event);
assertEquals(event, received.get());
}
@Test
void testSubscribeById() {
AtomicReference<String> received = new AtomicReference<>();
long id = 42L;
GlobalEventBus.subscribeById(TestSnowflakeEvent.class, id, e -> received.set(e.payload()));
GlobalEventBus.post(new TestSnowflakeEvent(id, "snowflake"));
assertEquals("snowflake", received.get());
}
@Test
void testUnsubscribeById() {
AtomicBoolean triggered = new AtomicBoolean(false);
long id = 99L;
GlobalEventBus.subscribeById(TestSnowflakeEvent.class, id, e -> triggered.set(true));
GlobalEventBus.unsubscribeById(TestSnowflakeEvent.class, id);
GlobalEventBus.post(new TestSnowflakeEvent(id, "ignored"));
assertFalse(triggered.get(), "Listener should not be triggered after unsubscribeById");
}
// ------------------------------------------------------------------------
// Async posting
// ------------------------------------------------------------------------
@Test
void testPostAsync() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
GlobalEventBus.subscribe(
TestEvent.class,
e -> {
if ("async".equals(e.message())) {
latch.countDown();
}
});
GlobalEventBus.postAsync(new TestEvent("async"));
assertTrue(
latch.await(1, TimeUnit.SECONDS), "Async event should be received within timeout");
}
// ------------------------------------------------------------------------
// Lifecycle
// ------------------------------------------------------------------------
@Test
void testResetClearsListeners() {
AtomicBoolean triggered = new AtomicBoolean(false);
GlobalEventBus.subscribe(TestEvent.class, e -> triggered.set(true));
GlobalEventBus.reset();
GlobalEventBus.post(new TestEvent("ignored"));
assertFalse(triggered.get(), "Listener should not be triggered after reset");
}
@Test
void testShutdown() {
// Should not throw
assertDoesNotThrow(GlobalEventBus::shutdown);
}
}

View File

@@ -0,0 +1,117 @@
package org.toop.framework.networking;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.*;
import org.mockito.*;
import org.toop.framework.SnowflakeGenerator;
import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.networking.events.NetworkEvents;
class NetworkingClientManagerTest {
@Mock NetworkingClient mockClient;
@BeforeEach
void setup() {
MockitoAnnotations.openMocks(this);
}
@Test
void testStartClientRequest_withMockedClient() throws Exception {
NetworkingClientManager manager = new NetworkingClientManager();
NetworkingClient mockClient = mock(NetworkingClient.class);
long clientId = new SnowflakeGenerator().nextId();
// Directly put mock into the map
manager.networkClients.put(clientId, mockClient);
// Verify it exists
assertEquals(mockClient, manager.networkClients.get(clientId));
}
@Test
void testHandleStartClient_postsResponse() throws Exception {
NetworkingClientManager manager = new NetworkingClientManager();
long eventId = 12345L;
NetworkEvents.StartClient event = new NetworkEvents.StartClient("127.0.0.1", 8080, eventId);
CompletableFuture<NetworkEvents.StartClientResponse> future = new CompletableFuture<>();
// Listen for response
new EventFlow().listen(NetworkEvents.StartClientResponse.class, future::complete);
manager.handleStartClient(event);
NetworkEvents.StartClientResponse response = future.get(); // blocks until completed
assertEquals(eventId, response.eventSnowflake());
}
@Test
void testHandleSendCommand_callsWriteAndFlush() throws Exception {
NetworkingClientManager manager = spy(new NetworkingClientManager());
long clientId = 1L;
manager.networkClients.put(clientId, mockClient);
NetworkEvents.SendCommand commandEvent = new NetworkEvents.SendCommand(clientId, "HELLO");
manager.handleCommand(commandEvent);
verify(mockClient).writeAndFlushnl("HELLO");
}
@Test
void testHandleSendLogin_callsCorrectCommand() throws Exception {
NetworkingClientManager manager = spy(new NetworkingClientManager());
long clientId = 1L;
manager.networkClients.put(clientId, mockClient);
manager.handleSendLogin(new NetworkEvents.SendLogin(clientId, "user1"));
verify(mockClient).writeAndFlushnl("LOGIN user1");
}
@Test
void testHandleCloseClient_removesClient() throws Exception {
NetworkingClientManager manager = spy(new NetworkingClientManager());
long clientId = 1L;
manager.networkClients.put(clientId, mockClient);
manager.handleCloseClient(new NetworkEvents.CloseClient(clientId));
verify(mockClient).closeConnection();
assertFalse(manager.networkClients.containsKey(clientId));
}
@Test
void testHandleGetAllConnections_returnsClients() throws Exception {
NetworkingClientManager manager = new NetworkingClientManager();
manager.networkClients.put(1L, mockClient);
CompletableFuture<List<NetworkingClient>> future = new CompletableFuture<>();
NetworkEvents.RequestsAllClients request = new NetworkEvents.RequestsAllClients(future);
manager.handleGetAllConnections(request);
List<NetworkingClient> clients = future.get();
assertEquals(1, clients.size());
assertSame(mockClient, clients.getFirst());
}
@Test
void testHandleShutdownAll_clearsClients() throws Exception {
NetworkingClientManager manager = new NetworkingClientManager();
manager.networkClients.put(1L, mockClient);
manager.handleShutdownAll(new NetworkEvents.ForceCloseAllClients());
verify(mockClient).closeConnection();
assertTrue(manager.networkClients.isEmpty());
}
}

View File

@@ -0,0 +1,162 @@
package org.toop.framework.networking.events;
import static org.junit.jupiter.api.Assertions.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.Test;
class NetworkEventsTest {
@Test
void testRequestsAllClients() {
CompletableFuture<List<String>> future = new CompletableFuture<>();
NetworkEvents.RequestsAllClients event =
new NetworkEvents.RequestsAllClients((CompletableFuture) future);
assertNotNull(event.future());
assertEquals(future, event.future());
}
@Test
void testForceCloseAllClients() {
NetworkEvents.ForceCloseAllClients event = new NetworkEvents.ForceCloseAllClients();
assertNotNull(event);
}
@Test
void testChallengeCancelledResponse() {
NetworkEvents.ChallengeCancelledResponse event =
new NetworkEvents.ChallengeCancelledResponse(42L, "ch123");
assertEquals(42L, event.clientId());
assertEquals("ch123", event.challengeId());
}
@Test
void testChallengeResponse() {
NetworkEvents.ChallengeResponse event =
new NetworkEvents.ChallengeResponse(1L, "Alice", "Chess", "ch001");
assertEquals("Alice", event.challengerName());
assertEquals("Chess", event.gameType());
assertEquals("ch001", event.challengeId());
}
@Test
void testPlayerlistResponse() {
String[] players = {"p1", "p2"};
NetworkEvents.PlayerlistResponse event = new NetworkEvents.PlayerlistResponse(5L, players);
assertArrayEquals(players, event.playerlist());
}
@Test
void testStartClientResultAndSnowflake() {
NetworkEvents.StartClient event = new NetworkEvents.StartClient("127.0.0.1", 9000, 12345L);
assertEquals("127.0.0.1", event.ip());
assertEquals(9000, event.port());
assertEquals(12345L, event.eventSnowflake());
Map<String, Object> result = event.result();
assertEquals("127.0.0.1", result.get("ip"));
assertEquals(9000, result.get("port"));
assertEquals(12345L, result.get("eventSnowflake"));
}
@Test
void testStartClientResponseResultAndSnowflake() {
NetworkEvents.StartClientResponse response =
new NetworkEvents.StartClientResponse(99L, 54321L);
assertEquals(99L, response.clientId());
assertEquals(54321L, response.eventSnowflake());
Map<String, Object> result = response.result();
assertEquals(99L, result.get("clientId"));
assertEquals(54321L, result.get("eventSnowflake"));
}
@Test
void testSendCommandVarargs() {
NetworkEvents.SendCommand event = new NetworkEvents.SendCommand(7L, "LOGIN", "Alice");
assertEquals(7L, event.clientId());
assertArrayEquals(new String[] {"LOGIN", "Alice"}, event.args());
}
@Test
void testReceivedMessage() {
NetworkEvents.ReceivedMessage msg = new NetworkEvents.ReceivedMessage(11L, "Hello");
assertEquals(11L, msg.clientId());
assertEquals("Hello", msg.message());
}
@Test
void testClosedConnection() {
NetworkEvents.ClosedConnection event = new NetworkEvents.ClosedConnection(22L);
assertEquals(22L, event.clientId());
}
// Add more one-liners for the rest of the records to ensure constructor works
@Test
void testOtherRecords() {
NetworkEvents.SendLogin login = new NetworkEvents.SendLogin(1L, "Bob");
assertEquals(1L, login.clientId());
assertEquals("Bob", login.username());
NetworkEvents.SendLogout logout = new NetworkEvents.SendLogout(2L);
assertEquals(2L, logout.clientId());
NetworkEvents.SendGetPlayerlist getPlayerlist = new NetworkEvents.SendGetPlayerlist(3L);
assertEquals(3L, getPlayerlist.clientId());
NetworkEvents.SendGetGamelist getGamelist = new NetworkEvents.SendGetGamelist(4L);
assertEquals(4L, getGamelist.clientId());
NetworkEvents.SendSubscribe subscribe = new NetworkEvents.SendSubscribe(5L, "Chess");
assertEquals(5L, subscribe.clientId());
assertEquals("Chess", subscribe.gameType());
NetworkEvents.SendMove move = new NetworkEvents.SendMove(6L, (short) 1);
assertEquals(6L, move.clientId());
assertEquals((short) 1, move.moveNumber());
NetworkEvents.SendChallenge challenge = new NetworkEvents.SendChallenge(7L, "Eve", "Go");
assertEquals(7L, challenge.clientId());
assertEquals("Eve", challenge.usernameToChallenge());
assertEquals("Go", challenge.gameType());
NetworkEvents.SendAcceptChallenge accept = new NetworkEvents.SendAcceptChallenge(8L, 100);
assertEquals(8L, accept.clientId());
assertEquals(100, accept.challengeId());
NetworkEvents.SendForfeit forfeit = new NetworkEvents.SendForfeit(9L);
assertEquals(9L, forfeit.clientId());
NetworkEvents.SendMessage message = new NetworkEvents.SendMessage(10L, "Hi!");
assertEquals(10L, message.clientId());
assertEquals("Hi!", message.message());
NetworkEvents.SendHelp help = new NetworkEvents.SendHelp(11L);
assertEquals(11L, help.clientId());
NetworkEvents.SendHelpForCommand helpForCommand =
new NetworkEvents.SendHelpForCommand(12L, "MOVE");
assertEquals(12L, helpForCommand.clientId());
assertEquals("MOVE", helpForCommand.command());
NetworkEvents.CloseClient close = new NetworkEvents.CloseClient(13L);
assertEquals(13L, close.clientId());
NetworkEvents.ServerResponse serverResponse = new NetworkEvents.ServerResponse(14L);
assertEquals(14L, serverResponse.clientId());
NetworkEvents.Reconnect reconnect = new NetworkEvents.Reconnect(15L);
assertEquals(15L, reconnect.clientId());
NetworkEvents.ChangeClientHost change =
new NetworkEvents.ChangeClientHost(16L, "localhost", 1234);
assertEquals(16L, change.clientId());
assertEquals("localhost", change.ip());
assertEquals(1234, change.port());
NetworkEvents.CouldNotConnect couldNotConnect = new NetworkEvents.CouldNotConnect(17L);
assertEquals(17L, couldNotConnect.clientId());
}
}

View File

@@ -13,6 +13,12 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.46.1</version>
</dependency>
<dependency> <dependency>
<groupId>org.junit</groupId> <groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId> <artifactId>junit-bom</artifactId>
@@ -110,6 +116,41 @@
<!-- <fork>true</fork>--> <!-- <fork>true</fork>-->
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.46.1</version>
<configuration>
<!-- optional: limit format enforcement to just the files changed by this feature branch -->
<ratchetFrom>origin/main</ratchetFrom>
<formats>
<!-- you can define as many formats as you want, each is independent -->
<format>
<!-- define the files to apply to -->
<includes>
<include>.gitattributes</include>
<include>.gitignore</include>
</includes>
<!-- define the steps to apply to those files -->
<trimTrailingWhitespace/>
<endWithNewline/>
<indent>
<tabs>true</tabs>
<spacesPerTab>4</spacesPerTab>
</indent>
</format>
</formats>
<!-- define a language-specific format -->
<java>
<googleJavaFormat>
<version>1.28.0</version>
<style>AOSP</style> <!-- GOOGLE (2 indents), AOSP (4 indents) -->
<reflowLongStrings>true</reflowLongStrings>
<formatJavadoc>true</formatJavadoc>
</googleJavaFormat>
</java>
</configuration>
</plugin>
</plugins> </plugins>
</build> </build>

57
pom.xml
View File

@@ -83,13 +83,6 @@
<version>2.0.17</version> <version>2.0.17</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/com.diffplug.spotless/spotless-maven-plugin -->
<dependency>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.46.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.errorprone/error_prone_annotations --> <!-- https://mvnrepository.com/artifact/com.google.errorprone/error_prone_annotations -->
<dependency> <dependency>
<groupId>com.google.errorprone</groupId> <groupId>com.google.errorprone</groupId>
@@ -100,6 +93,21 @@
</dependencyManagement> </dependencyManagement>
<build> <build>
<plugins>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.46.1</version>
<configuration>
<java>
<googleJavaFormat>
<version>1.28.0</version>
<style>AOSP</style>
</googleJavaFormat>
</java>
</configuration>
</plugin>
</plugins>
<pluginManagement> <pluginManagement>
<plugins> <plugins>
<plugin> <plugin>
@@ -162,41 +170,6 @@
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.46.1</version>
<configuration>
<!-- optional: limit format enforcement to just the files changed by this feature branch -->
<ratchetFrom>origin/main</ratchetFrom>
<formats>
<!-- you can define as many formats as you want, each is independent -->
<format>
<!-- define the files to apply to -->
<includes>
<include>.gitattributes</include>
<include>.gitignore</include>
</includes>
<!-- define the steps to apply to those files -->
<trimTrailingWhitespace/>
<endWithNewline/>
<indent>
<tabs>true</tabs>
<spacesPerTab>4</spacesPerTab>
</indent>
</format>
</formats>
<!-- define a language-specific format -->
<java>
<googleJavaFormat>
<version>1.28.0</version>
<style>AOSP</style> <!-- GOOGLE (2 indents), AOSP (4 indents) -->
<reflowLongStrings>true</reflowLongStrings>
<formatJavadoc>true</formatJavadoc>
</googleJavaFormat>
</java>
</configuration>
</plugin>
</plugins> </plugins>
</pluginManagement> </pluginManagement>
</build> </build>