Removed google guave dependency. Added a new GlobalEventBus. Refined the EventPublisher. Moving events to own file.

This commit is contained in:
lieght
2025-09-23 00:40:15 +02:00
parent 63b08f3010
commit b5ee0a6725
27 changed files with 1115 additions and 927 deletions

17
pom.xml
View File

@@ -38,12 +38,6 @@
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.4.8-jre</version>
</dependency>
<dependency>
<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>
@@ -98,6 +92,17 @@
<artifactId>log4j-core</artifactId>
<version>2.25.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.diffplug.spotless/spotless-maven-plugin -->
<dependency>

View File

@@ -1,11 +1,10 @@
package org.toop;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import com.google.common.base.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.backend.ServerManager;
@@ -16,7 +15,6 @@ import org.toop.eventbus.GlobalEventBus;
import org.toop.eventbus.events.NetworkEvents;
import org.toop.frontend.UI.LocalServerSelector;
import org.toop.frontend.networking.NetworkingClientManager;
import org.toop.frontend.networking.NetworkingGameClientHandler;
public class Main {
private static final Logger logger = LogManager.getLogger(Main.class);
@@ -38,49 +36,20 @@ public class Main {
new Events.ServerEvents.StartServerRequest(5001, "tictactoe", serverIdFuture));
var serverId = serverIdFuture.get();
new MainTest();
// CompletableFuture<String> conIdFuture = new CompletableFuture<>();
// GlobalEventBus.post(
// new NetworkEvents.StartClientRequest(NetworkingGameClientHandler::new,
// "127.0.0.1", 5001, conIdFuture));
// var conId = conIdFuture.get();
// GlobalEventBus.post(new NetworkEvents.SendCommand(conId, "move", "5"));
// GlobalEventBus.post(new NetworkEvents.ForceCloseAllClients());
// GlobalEventBus.post(new NetworkEvents.StartClient(
// NetworkingGameClientHandler::new, "127.0.0.1", 5001, serverId
// ));
// JFrame frame = new JFrame("Server Settings");
// frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
// frame.setSize(800, 600);
// frame.setLocationRelativeTo(null);
// frame.setVisible(true);
var a = new MainTest();
javax.swing.SwingUtilities.invokeLater(LocalServerSelector::new);
// new Thread(() -> {
// LocalServerSelector window = new LocalServerSelector();
// }).start();
}
private static void initSystems() {
new ServerManager();
new NetworkingClientManager();
}
private static void registerEvents() {
GlobalEventBus.subscribeAndRegister(
Events.WindowEvents.OnQuitRequested.class,
event -> {
quit();
});
GlobalEventBus.subscribeAndRegister(Events.WindowEvents.OnMouseMove.class, event -> {});
}
public static void initSystems() {
new ServerManager();
new NetworkingClientManager();
new EventPublisher<>(Events.WindowEvents.OnQuitRequested.class, _ -> quit());
new EventPublisher<>(Events.WindowEvents.OnMouseMove.class, _ -> {});
}
private static void quit() {

View File

@@ -1,39 +1,24 @@
package org.toop;
import com.google.common.base.Supplier;
import org.toop.eventbus.EventPublisher;
import org.toop.eventbus.GlobalEventBus;
import org.toop.eventbus.events.Events;
import org.toop.eventbus.events.NetworkEvents;
import org.toop.frontend.networking.NetworkingGameClientHandler;
import java.util.function.Supplier;
public class MainTest {
MainTest() {
var ep = new EventPublisher<>(
Events.ServerEvents.StartServer.class,
5001,
"tictactoe"
).onEvent(
this::handleServerStarted
).unregisterAfterSuccess().postEvent();
// var ep = new EventPublisher<>(
// NetworkEvents.SendCommand.class,
// (Supplier<NetworkingGameClientHandler>) NetworkingGameClientHandler::new,
// "127.0.0.1",
// 5001
// ).onEventById(this::handleStartClientRequest).unregisterAfterSuccess().postEvent();
var a = new EventPublisher<>(
NetworkEvents.StartClient.class,
(Supplier<NetworkingGameClientHandler>) NetworkingGameClientHandler::new,
"127.0.0.1",
5001
).onEventById(NetworkEvents.StartClientSuccess.class, this::handleStartClientSuccess)
.unsubscribeAfterSuccess().asyncPostEvent();
}
private void handleStartClientRequest(NetworkEvents.StartClientSuccess event) {
GlobalEventBus.post(new NetworkEvents.CloseClient((String) event.connectionId()));
private void handleStartClientSuccess(NetworkEvents.StartClientSuccess event) {
GlobalEventBus.post(new NetworkEvents.CloseClient(event.clientId()));
}
private void handleServerStarted(Events.ServerEvents.ServerStarted event) {
System.out.println("Server started");
}
}

View File

@@ -8,6 +8,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.backend.tictactoe.TicTacToeServer;
import org.toop.eventbus.EventPublisher;
import org.toop.eventbus.events.Events;
import org.toop.eventbus.GlobalEventBus;
@@ -22,19 +23,12 @@ public class ServerManager {
/** Starts a server manager, to manage, servers. */
public ServerManager() {
GlobalEventBus.subscribeAndRegister(
Events.ServerEvents.StartServerRequest.class, this::handleStartServerRequest);
GlobalEventBus.subscribeAndRegister(
Events.ServerEvents.StartServer.class, this::handleStartServer);
GlobalEventBus.subscribeAndRegister(
Events.ServerEvents.ForceCloseAllServers.class, _ -> shutdownAll());
GlobalEventBus.subscribeAndRegister(
Events.ServerEvents.CreateTicTacToeGameRequest.class,
this::handleStartTicTacToeGameOnAServer);
GlobalEventBus.subscribeAndRegister(
Events.ServerEvents.RunTicTacToeGame.class, this::handleRunTicTacToeGameOnAServer);
GlobalEventBus.subscribeAndRegister(
Events.ServerEvents.EndTicTacToeGame.class, this::handleEndTicTacToeGameOnAServer);
new EventPublisher<>(Events.ServerEvents.StartServerRequest.class, this::handleStartServerRequest);
new EventPublisher<>(Events.ServerEvents.StartServer.class, this::handleStartServer);
new EventPublisher<>(Events.ServerEvents.ForceCloseAllServers.class, _ -> shutdownAll());
new EventPublisher<>(Events.ServerEvents.CreateTicTacToeGameRequest.class, this::handleStartTicTacToeGameOnAServer);
new EventPublisher<>(Events.ServerEvents.RunTicTacToeGame.class, this::handleRunTicTacToeGameOnAServer);
new EventPublisher<>(Events.ServerEvents.EndTicTacToeGame.class, this::handleEndTicTacToeGameOnAServer);
}
private String startServer(int port, String gameType) {
@@ -67,9 +61,7 @@ public class ServerManager {
}
private void handleStartServer(Events.ServerEvents.StartServer event) {
GlobalEventBus.post(
new Events.ServerEvents.ServerStarted(
this.startServer(event.port(), event.gameType()), event.port()));
new EventPublisher<>(Events.ServerEvents.ServerStarted.class, this.startServer(event.port(), event.gameType()), event.port());
}
private void handleStartTicTacToeGameOnAServer(

View File

@@ -1,50 +1,50 @@
package org.toop.core;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.frontend.platform.core.glfw.GlfwWindow;
public abstract class Window {
public enum API {
NONE,
GLFW,
}
public record Size(int width, int height) {}
protected static final Logger logger = LogManager.getLogger(Window.class);
private static API api = API.NONE;
private static Window instance = null;
public static Window setup(API api, String title, Size size) {
if (instance != null) {
logger.warn("Window is already setup.");
return instance;
}
switch (api) {
case GLFW:
instance = new GlfwWindow(title, size);
break;
default:
logger.fatal("No valid window api chosen");
return null;
}
Window.api = api;
return instance;
}
public static API getApi() {
return api;
}
public void cleanup() {
instance = null;
logger.info("Window cleanup.");
}
public abstract void update();
}
//package org.toop.core;
//
//import org.apache.logging.log4j.LogManager;
//import org.apache.logging.log4j.Logger;
//import org.toop.frontend.platform.core.glfw.GlfwWindow;
//
//public abstract class Window {
// public enum API {
// NONE,
// GLFW,
// }
//
// public record Size(int width, int height) {}
//
// protected static final Logger logger = LogManager.getLogger(Window.class);
//
// private static API api = API.NONE;
// private static Window instance = null;
//
// public static Window setup(API api, String title, Size size) {
// if (instance != null) {
// logger.warn("Window is already setup.");
// return instance;
// }
//
// switch (api) {
// case GLFW:
// instance = new GlfwWindow(title, size);
// break;
//
// default:
// logger.fatal("No valid window api chosen");
// return null;
// }
//
// Window.api = api;
// return instance;
// }
//
// public static API getApi() {
// return api;
// }
//
// public void cleanup() {
// instance = null;
// logger.info("Window cleanup.");
// }
//
// public abstract void update();
//}

View File

@@ -1,7 +1,7 @@
package org.toop.eventbus;
import com.google.common.eventbus.EventBus;
import org.toop.eventbus.events.EventWithUuid;
import org.toop.eventbus.events.IEvent;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
@@ -12,51 +12,84 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
* EventPublisher is a helper class for creating, posting, and optionally subscribing to events
* in a type-safe and chainable manner. It automatically injects a unique UUID into the event
* and supports filtering subscribers by this UUID.
* EventPublisher is a utility class for creating, posting, and optionally subscribing to events
* in a type-safe and chainable manner. It is designed to work with the {@link GlobalEventBus}.
*
* <p>Usage pattern (with chainable API):
* <p>This class supports automatic UUID assignment for {@link EventWithUuid} events,
* and allows filtering subscribers so they only respond to events with a specific UUID.
* All subscription methods are chainable, and you can configure automatic unsubscription
* after an event has been successfully handled.</p>
*
* <p><strong>Usage patterns:</strong></p>
*
* <p><strong>1. Publish an event with optional subscription by UUID:</strong></p>
* <pre>{@code
* new EventPublisher<>(StartClient.class, handlerFactory, "127.0.0.1", 5001)
* .onEventById(ClientReady.class, clientReadyEvent -> logger.info(clientReadyEvent))
* .unregisterAfterSuccess()
* .unsubscribeAfterSuccess()
* .postEvent();
* }</pre>
*
* @param <T> the type of event to publish, must extend EventWithUuid
* <p><strong>2. Subscribe to a specific event type without UUID filtering:</strong></p>
* <pre>{@code
* new EventPublisher<>(MyEvent.class)
* .onEvent(MyEvent.class, e -> logger.info("Received: " + e))
* .postEvent();
* }</pre>
*
* <p><strong>3. Subscribe with runtime type inference:</strong></p>
* <pre>{@code
* new EventPublisher<>((MyEvent e) -> logger.info("Received: " + e))
* .postEvent();
* }</pre>
*
* <p><strong>Notes:</strong></p>
* <ul>
* <li>For events extending {@link EventWithUuid}, a UUID is automatically generated
* and passed to the event constructor if none is provided.</li>
* <li>Listeners registered via {@code onEventById} will only be triggered
* if the event's UUID matches this publisher's UUID.</li>
* <li>Listeners can be unsubscribed automatically after the first successful trigger
* using {@link #unsubscribeAfterSuccess()}.</li>
* <li>All subscription and posting methods are chainable for fluent API usage.</li>
* </ul>
*
* @param <T> the type of event to publish; must implement {@link IEvent}
*/
public class EventPublisher<T> {
public class EventPublisher<T extends IEvent> {
/** Lookup object used for dynamically invoking constructors via MethodHandles. */
private static final MethodHandles.Lookup LOOKUP = MethodHandles.lookup();
/** Cache of constructor handles for event classes to avoid repeated reflection lookups. */
private static final Map<Class<?>, MethodHandle> CONSTRUCTOR_CACHE = new ConcurrentHashMap<>();
/** The UUID automatically assigned to this event */
private final String eventId;
/** Automatically assigned UUID for {@link EventWithUuid} events. */
private String eventId = null;
/** The event instance created by this publisher */
private final T event;
/** The event instance created by this publisher. */
private T event = null;
/** The listener object returned by the global event bus subscription */
/** The listener returned by GlobalEventBus subscription. Used for unsubscription. */
private Object listener;
/** Flag indicating whether to unregister the listener after it is successfully triggered */
private boolean unregisterAfterSuccess = false;
/** Flag indicating whether to automatically unsubscribe the listener after success. */
private boolean unsubscribeAfterSuccess = false;
/** Results that came back from the subscribed event */
/** Holds the results returned from the subscribed event, if any. */
private Map<String, Object> result = null;
/**
* Constructs a new EventPublisher by instantiating the given event class.
* A unique UUID is automatically generated and passed as the last constructor argument.
* For {@link EventWithUuid} events, a UUID is automatically generated and passed as
* the last constructor argument if not explicitly provided.
*
* @param postEventClass the class of the event to instantiate
* @param args constructor arguments for the event, excluding the UUID
* @param args constructor arguments for the event (UUID may be excluded)
* @throws RuntimeException if instantiation fails
*/
public EventPublisher(Class<T> postEventClass, Object... args) {
this.eventId = UUID.randomUUID().toString();
try {
boolean isUuidEvent = EventWithUuid.class.isAssignableFrom(postEventClass);
@@ -71,43 +104,65 @@ public class EventPublisher<T> {
});
Object[] finalArgs;
if (isUuidEvent) {
// append UUID to args
int expectedParamCount = ctorHandle.type().parameterCount();
if (isUuidEvent && args.length < expectedParamCount) {
this.eventId = UUID.randomUUID().toString();
finalArgs = new Object[args.length + 1];
System.arraycopy(args, 0, finalArgs, 0, args.length);
finalArgs[args.length] = this.eventId;
} else if (isUuidEvent) {
this.eventId = (String) args[args.length - 1];
finalArgs = args;
} else {
// just forward args
finalArgs = args;
}
@SuppressWarnings("unchecked")
T instance = (T) ctorHandle.invokeWithArguments(finalArgs);
this.event = instance;
} catch (Throwable e) {
throw new RuntimeException("Failed to instantiate event", e);
}
}
/**
* Subscribes a listener for a specific event type, but only triggers the listener
* if the incoming event's UUID matches this EventPublisher's UUID.
* Creates a new EventPublisher and immediately subscribes a listener for the event class.
*
* @param eventClass the class of the event to subscribe to
* @param action the action to execute when a matching event is received
* @param <TT> the type of the event to subscribe to; must extend EventWithUuid
* @return this EventPublisher instance, for chainable calls
* @param action the action to execute when an event of the given class is received
*/
public EventPublisher(Class<T> eventClass, Consumer<T> action) {
this.onEvent(eventClass, action);
}
/**
* Creates a new EventPublisher and immediately subscribes a listener using runtime type inference.
* The event type is inferred at runtime. Wrong type casts are ignored silently.
*
* @param action the action to execute when a matching event is received
*/
public EventPublisher(Consumer<T> action) {
this.onEvent(action);
}
/**
* Subscribes a listener for a specific {@link EventWithUuid} event type.
* The listener is only triggered if the event UUID matches this publisher's UUID.
*
* @param eventClass the class of the event to subscribe to
* @param action the action to execute on a matching event
* @param <TT> type of event; must extend EventWithUuid
* @return this EventPublisher for chainable calls
*/
public <TT extends EventWithUuid> EventPublisher<T> onEventById(
Class<TT> eventClass, Consumer<TT> action) {
this.listener = GlobalEventBus.subscribeAndRegister(eventClass, event -> {
this.listener = GlobalEventBus.subscribe(eventClass, event -> {
if (event.eventId().equals(this.eventId)) {
action.accept(event);
if (unregisterAfterSuccess && listener != null) {
GlobalEventBus.unregister(listener);
if (unsubscribeAfterSuccess && listener != null) {
GlobalEventBus.unsubscribe(listener);
}
this.result = event.result();
@@ -118,33 +173,29 @@ public class EventPublisher<T> {
}
/**
* Subscribes a listener for a specific event type, but only triggers the listener
* if the incoming event's UUID matches this EventPublisher's UUID.
* Subscribes a listener for {@link EventWithUuid} events without specifying class explicitly.
* Only triggers for events whose UUID matches this publisher's UUID.
*
* @param action the action (function) to execute when a matching event is received
* @param <TT> the type of the event to subscribe to; must extend EventWithUuid
* @return this EventPublisher instance, for chainable calls
* @param action the action to execute on a matching event
* @param <TT> type of event; must extend EventWithUuid
* @return this EventPublisher for chainable calls
*/
@SuppressWarnings("unchecked")
public <TT extends EventWithUuid> EventPublisher<T> onEventById(
Consumer<TT> action) {
public <TT extends EventWithUuid> EventPublisher<T> onEventById(Consumer<TT> action) {
this.listener = GlobalEventBus.subscribeAndRegister(event -> {
// Only process events that are EventWithUuid
this.listener = GlobalEventBus.subscribe(event -> {
if (event instanceof EventWithUuid uuidEvent) {
if (uuidEvent.eventId().equals(this.eventId)) {
try {
TT typedEvent = (TT) uuidEvent; // unchecked cast
TT typedEvent = (TT) uuidEvent;
action.accept(typedEvent);
if (unregisterAfterSuccess && listener != null) {
GlobalEventBus.unregister(listener);
if (unsubscribeAfterSuccess && listener != null) {
GlobalEventBus.unsubscribe(listener);
}
this.result = typedEvent.result();
} catch (ClassCastException ignored) {
// TODO: Not the right type, ignore silently
}
} catch (ClassCastException ignored) {}
}
}
});
@@ -153,76 +204,50 @@ public class EventPublisher<T> {
}
/**
* Subscribes a listener for a specific event type. The listener will be invoked
* whenever an event of the given class is posted to the global event bus.
*
* <p>This overload provides type safety by requiring the event class explicitly
* and casting the incoming event before passing it to the provided action.</p>
*
* <pre>{@code
* new EventPublisher<>(MyEvent.class)
* .onEvent(MyEvent.class, e -> logger.info("Received: " + e))
* .postEvent();
* }</pre>
* Subscribes a listener for a specific event type without UUID filtering.
*
* @param eventClass the class of the event to subscribe to
* @param action the action to execute when an event of the given class is received
* @param <TT> the type of the event to subscribe to
* @return this EventPublisher instance, for chainable calls
* @param action the action to execute on the event
* @param <TT> type of event; must implement IEvent
* @return this EventPublisher for chainable calls
*/
public <TT> EventPublisher<T> onEvent(Class<TT> eventClass, Consumer<TT> action) {
this.listener = GlobalEventBus.subscribeAndRegister(eventClass, event -> {
public <TT extends IEvent> EventPublisher<T> onEvent(Class<TT> eventClass, Consumer<TT> action) {
this.listener = GlobalEventBus.subscribe(eventClass, event -> {
action.accept(eventClass.cast(event));
if (unregisterAfterSuccess && listener != null) {
GlobalEventBus.unregister(listener);
if (unsubscribeAfterSuccess && listener != null) {
GlobalEventBus.unsubscribe(listener);
}
});
return this;
}
/**
* Subscribes a listener for events without requiring the event class explicitly.
* The listener will attempt to cast each posted event to the expected type.
* If the cast fails, the event is ignored silently.
*
* <p>This overload provides more concise syntax, but relies on an unchecked cast
* at runtime. Use {@link #onEvent(Class, Consumer)} if you prefer explicit
* type safety.</p>
*
* <pre>{@code
* new EventPublisher<>(MyEvent.class)
* .onEvent((MyEvent e) -> logger.info("Received: " + e))
* .postEvent();
* }</pre>
* Subscribes a listener using runtime type inference. Wrong type casts are ignored silently.
*
* @param action the action to execute when a matching event is received
* @param <TT> the type of the event to subscribe to
* @return this EventPublisher instance, for chainable calls
* @param <TT> type of event (inferred at runtime)
* @return this EventPublisher for chainable calls
*/
@SuppressWarnings("unchecked")
public <TT> EventPublisher<T> onEvent(Consumer<TT> action) {
this.listener = GlobalEventBus.subscribeAndRegister(event -> {
this.listener = GlobalEventBus.subscribe(event -> {
try {
// unchecked cast if wrong type, ClassCastException is caught
TT typedEvent = (TT) event;
action.accept(typedEvent);
if (unregisterAfterSuccess && listener != null) {
GlobalEventBus.unregister(listener);
if (unsubscribeAfterSuccess && listener != null) {
GlobalEventBus.unsubscribe(listener);
}
} catch (ClassCastException ignored) {
// Ignore events of unrelated types
}
} catch (ClassCastException ignored) {}
});
return this;
}
/**
* Posts the event to the global event bus. This should generally be the
* final call in the chain.
* Posts the event synchronously to {@link GlobalEventBus}.
*
* @return this EventPublisher instance, for potential chaining
* @return this EventPublisher for chainable calls
*/
public EventPublisher<T> postEvent() {
GlobalEventBus.post(event);
@@ -230,30 +255,45 @@ public class EventPublisher<T> {
}
/**
* Configures the publisher so that any listener registered with
* {@link #onEventById(Class, Consumer)} is automatically unregistered
* after it is successfully triggered.
* Posts the event asynchronously to {@link GlobalEventBus}.
*
* @return this EventPublisher instance, for chainable calls
* @return this EventPublisher for chainable calls
*/
public EventPublisher<T> unregisterAfterSuccess() {
this.unregisterAfterSuccess = true;
public EventPublisher<T> asyncPostEvent() {
GlobalEventBus.postAsync(event);
return this;
}
public EventPublisher<T> unregisterNow() {
if (unregisterAfterSuccess && listener != null) {
GlobalEventBus.unregister(listener);
/**
* Configures automatic unsubscription for listeners registered via onEventById
* after a successful trigger.
*
* @return this EventPublisher for chainable calls
*/
public EventPublisher<T> unsubscribeAfterSuccess() {
this.unsubscribeAfterSuccess = true;
return this;
}
/**
* Immediately unsubscribes the listener, if set.
*
* @return this EventPublisher for chainable calls
*/
public EventPublisher<T> unsubscribeNow() {
if (unsubscribeAfterSuccess && listener != null) {
GlobalEventBus.unsubscribe(listener);
}
return this;
}
/**
* Returns the results provided by the triggered event, if any.
*
* @return map of results, or null if none
*/
public Map<String, Object> getResult() {
if (this.result != null) {
return this.result;
}
return null;
// TODO: Why check for null if return is null anyway?
return this.result;
}
/**
@@ -266,9 +306,9 @@ public class EventPublisher<T> {
}
/**
* Returns the UUID automatically assigned to this event.
* Returns the automatically assigned UUID for {@link EventWithUuid} events.
*
* @return the UUID of the event
* @return the UUID string, or null for non-UUID events
*/
public String getEventId() {
return eventId;

View File

@@ -1,114 +1,203 @@
package org.toop.eventbus;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import org.toop.eventbus.events.EventWithUuid;
import org.toop.eventbus.events.IEvent;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Consumer;
/** A singleton Event Bus to be used for creating, triggering and activating events. */
public class GlobalEventBus {
/**
* GlobalEventBus is a high-throughput, thread-safe event bus for publishing and subscribing
* to events within the application.
*
* <p>It supports:</p>
* <ul>
* <li>Type-specific subscriptions via {@link #subscribe(Class, Consumer)}</li>
* <li>UUID-specific subscriptions via {@link #subscribeById(Class, String, Consumer)}</li>
* <li>Asynchronous posting of events with automatic queueing and fallback</li>
* </ul>
*
* <p><b>Performance note:</b> Directly using {@link GlobalEventBus} is possible,
* but for safer type handling, automatic UUID management, and easier unsubscription,
* it is recommended to use {@link EventPublisher} whenever possible.</p>
*
* <p>The bus maintains a fixed pool of worker threads that continuously process queued events.</p>
*/
public final class GlobalEventBus {
/** Singleton event bus. */
private static EventBus eventBus = new EventBus("global-bus");
/** Number of worker threads, set to the number of available CPU cores. */
private static final int WORKERS = Runtime.getRuntime().availableProcessors();
/** Queue for asynchronous event processing. */
private static final BlockingQueue<IEvent> EVENT_QUEUE = new LinkedBlockingQueue<>(WORKERS * 1024);
/** Map of event class to type-specific listeners. */
private static final Map<Class<?>, CopyOnWriteArrayList<Consumer<? super IEvent>>> LISTENERS = new ConcurrentHashMap<>();
/** Map of event class to UUID-specific listeners. */
private static final Map<Class<?>, ConcurrentHashMap<String, Consumer<? extends EventWithUuid>>> UUID_LISTENERS = new ConcurrentHashMap<>();
/** Thread pool for worker threads processing queued events. */
private static final ExecutorService WORKER_POOL = Executors.newFixedThreadPool(WORKERS, r -> {
Thread t = new Thread(r, "EventBus-Worker-" + r.hashCode());
t.setDaemon(true);
return t;
});
// Initialize worker threads
static {
for (int i = 0; i < WORKERS; i++) {
WORKER_POOL.submit(GlobalEventBus::workerLoop);
}
}
/** Private constructor to prevent instantiation. */
private GlobalEventBus() {}
/**
* Wraps a Consumer into a Guava @Subscribe-compatible listener.
*
* @return Singleton Event Bus
*/
public static EventBus get() {
return eventBus;
/** Continuously processes events from the queue and dispatches them to listeners. */
private static void workerLoop() {
try {
while (true) {
IEvent event = EVENT_QUEUE.take();
dispatchEvent(event);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* ONLY USE FOR TESTING
* Subscribes a type-specific listener for all events of a given class.
*
* @param newBus
* @param eventClass the class of events to subscribe to
* @param listener the action to execute when the event is posted
* @param <T> the event type
* @return the provided listener for possible unsubscription
*/
public static void set(EventBus newBus) {
eventBus = newBus;
}
/** Reset back to the default global EventBus. */
public static void reset() {
eventBus = new EventBus("global-bus");
public static <T extends IEvent> Consumer<T> subscribe(Class<T> eventClass, Consumer<T> listener) {
CopyOnWriteArrayList<Consumer<? super IEvent>> list =
LISTENERS.computeIfAbsent(eventClass, k -> new CopyOnWriteArrayList<>());
list.add(event -> listener.accept(eventClass.cast(event)));
return listener;
}
/**
* Wraps a Consumer into a Guava @Subscribe-compatible listener. TODO
* Subscribes a generic listener for all events (no type filtering).
*
* @param type The event to be used. (e.g. Events.ServerCommand.class)
* @param action The function, or lambda to run when fired.
* @return Object to be used for registering an event.
* @param listener the action to execute on any event
* @return the provided listener for possible unsubscription
*/
public static <T> Object subscribe(Class<T> type, Consumer<T> action) {
return new Object() {
@Subscribe
public void handle(Object event) {
if (type.isInstance(event)) {
action.accept(type.cast(event));
public static Consumer<Object> subscribe(Consumer<Object> listener) {
LISTENERS.computeIfAbsent(Object.class, _ -> new CopyOnWriteArrayList<>())
.add(listener);
return listener;
}
/**
* Subscribes a listener for a specific {@link EventWithUuid} identified by its UUID.
*
* @param eventClass the class of the UUID event
* @param eventId the UUID of the event to listen for
* @param listener the action to execute when the event with the matching UUID is posted
* @param <T> the event type extending EventWithUuid
*/
public static <T extends EventWithUuid> void subscribeById(Class<T> eventClass, String eventId, Consumer<T> listener) {
UUID_LISTENERS
.computeIfAbsent(eventClass, _ -> new ConcurrentHashMap<>())
.put(eventId, listener);
}
/**
* Unsubscribes a previously registered listener.
*
* @param listener the listener to remove
*/
public static void unsubscribe(Object listener) {
LISTENERS.values().forEach(list -> list.remove(listener));
}
/**
* Unsubscribes a UUID-specific listener.
*
* @param eventClass the class of the UUID event
* @param eventId the UUID of the listener to remove
* @param <T> the event type extending EventWithUuid
*/
public static <T extends EventWithUuid> void unsubscribeById(Class<T> eventClass, String eventId) {
Map<String, Consumer<? extends EventWithUuid>> map = UUID_LISTENERS.get(eventClass);
if (map != null) map.remove(eventId);
}
/**
* Posts an event synchronously to all subscribed listeners.
*
* @param event the event instance to post
* @param <T> the event type
*/
public static <T extends IEvent> void post(T event) {
dispatchEvent(event);
}
/**
* Posts an event asynchronously by adding it to the internal queue.
* If the queue is full, the event is dispatched synchronously.
*
* @param event the event instance to post
* @param <T> the event type
*/
public static <T extends IEvent> void postAsync(T event) {
if (!EVENT_QUEUE.offer(event)) {
dispatchEvent(event);
}
}
/** Dispatches an event to all type-specific, generic, and UUID-specific listeners. */
@SuppressWarnings("unchecked")
private static void dispatchEvent(IEvent event) {
Class<?> clazz = event.getClass();
CopyOnWriteArrayList<Consumer<? super IEvent>> classListeners = LISTENERS.get(clazz);
if (classListeners != null) {
for (Consumer<? super IEvent> listener : classListeners) {
try { listener.accept(event); } catch (Throwable ignored) {}
}
}
CopyOnWriteArrayList<Consumer<? super IEvent>> genericListeners = LISTENERS.get(Object.class);
if (genericListeners != null) {
for (Consumer<? super IEvent> listener : genericListeners) {
try { listener.accept(event); } catch (Throwable ignored) {}
}
}
if (event instanceof EventWithUuid uuidEvent) {
Map<String, Consumer<? extends EventWithUuid>> map = UUID_LISTENERS.get(clazz);
if (map != null) {
Consumer<EventWithUuid> listener = (Consumer<EventWithUuid>) map.remove(uuidEvent.eventId());
if (listener != null) {
try { listener.accept(uuidEvent); } catch (Throwable ignored) {}
}
}
};
}
@SuppressWarnings("unchecked")
public static <T> Object subscribe(Consumer<T> action) {
return new Object() {
@Subscribe
public void handle(Object event) {
try {
action.accept((T) event); // unchecked cast
} catch (ClassCastException ignored) {}
}
};
}
}
/**
* Wraps a Consumer into a Guava @Subscribe-compatible listener and registers it.
*
* @param type The event to be used. (e.g. Events.ServerCommand.class)
* @param action The function, or lambda to run when fired.
* @return Object to be used for registering an event.
* Shuts down the bus immediately, clearing all listeners and queued events.
* Worker threads are stopped.
*/
public static <T> Object subscribeAndRegister(Class<T> type, Consumer<T> action) {
var listener = subscribe(type, action);
register(listener);
return listener;
}
public static <T> Object subscribeAndRegister(Consumer<T> action) {
var listener = subscribe(action);
register(listener);
return listener;
public static void shutdown() {
WORKER_POOL.shutdownNow();
LISTENERS.clear();
UUID_LISTENERS.clear();
EVENT_QUEUE.clear();
}
/**
* Wrapper for registering a listener.
*
* @param listener The listener to register.
* Clears all listeners and UUID-specific subscriptions without stopping worker threads.
*/
public static void register(Object listener) {
GlobalEventBus.get().register(listener);
}
/**
* Wrapper for unregistering a listener.
*
* @param listener The listener to unregister.
*/
public static void unregister(Object listener) {
GlobalEventBus.get().unregister(listener);
}
/**
* Wrapper for posting events.
*
* @param event The event to post.
*/
public static <T> void post(T event) {
GlobalEventBus.get().post(event);
public static void reset() {
LISTENERS.clear();
UUID_LISTENERS.clear();
}
}

View File

@@ -2,7 +2,7 @@ package org.toop.eventbus.events;
import java.util.Map;
public interface EventWithUuid {
public interface EventWithUuid extends IEvent {
Map<String, Object> result();
String eventId();
}

View File

@@ -1,11 +1,11 @@
package org.toop.eventbus.events;
import org.apache.logging.log4j.core.jmx.Server;
import java.lang.reflect.Constructor;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import org.toop.core.Window;
/** Events that are used in the GlobalEventBus class. */
public class Events implements IEvent {
@@ -81,7 +81,7 @@ public class Events implements IEvent {
public record RequestsAllServers(CompletableFuture<String> future) {}
/** Forces closing all active servers immediately. */
public record ForceCloseAllServers() {}
public record ForceCloseAllServers() implements IEvent {}
/**
* Requests starting a server with a specific port and game type.
@@ -89,7 +89,7 @@ public class Events implements IEvent {
* @param port The port to open the server.
* @param gameType Either "tictactoe" or ...
*/
public record StartServer(int port, String gameType) {}
public record StartServer(int port, String gameType) implements IEvent {}
/**
* BLOCKING Requests starting a server with a specific port and game type, and returns a
@@ -100,7 +100,7 @@ public class Events implements IEvent {
* @param future The uuid of the server.
*/
public record StartServerRequest(
int port, String gameType, CompletableFuture<String> future) {}
int port, String gameType, CompletableFuture<String> future) implements IEvent{}
/**
* Represents a server that has successfully started.
@@ -108,7 +108,7 @@ public class Events implements IEvent {
* @param uuid The unique identifier of the server.
* @param port The port the server is listening on.
*/
public record ServerStarted(String uuid, int port) {}
public record ServerStarted(String uuid, int port) implements IEvent {}
/**
* BLOCKING Requests creation of a TicTacToe game on a specific server.
@@ -122,7 +122,7 @@ public class Events implements IEvent {
String serverUuid,
String playerA,
String playerB,
CompletableFuture<String> future) {}
CompletableFuture<String> future) implements IEvent {}
/**
* Requests running a TicTacToe game on a specific server.
@@ -130,7 +130,7 @@ public class Events implements IEvent {
* @param serverUuid The unique identifier of the server.
* @param gameUuid The UUID of the game to run.
*/
public record RunTicTacToeGame(String serverUuid, String gameUuid) {}
public record RunTicTacToeGame(String serverUuid, String gameUuid) implements IEvent {}
/**
* Requests ending a TicTacToe game on a specific server.
@@ -138,7 +138,7 @@ public class Events implements IEvent {
* @param serverUuid The UUID of the server the game is running on.
* @param gameUuid The UUID of the game to end.
*/
public record EndTicTacToeGame(String serverUuid, String gameUuid) {}
public record EndTicTacToeGame(String serverUuid, String gameUuid) implements IEvent {}
// public record StartGameConnectionRequest(String ip, String port,
// CompletableFuture<String> future) {}
@@ -165,13 +165,13 @@ public class Events implements IEvent {
public static class WindowEvents {
/** Triggers when the window wants to quit. */
public record OnQuitRequested() {}
public record OnQuitRequested() implements IEvent {}
/** Triggers when the window is resized. */
public record OnResize(Window.Size size) {}
// public record OnResize(Window.Size size) {}
/** Triggers when the mouse is moved within the window. */
public record OnMouseMove(int x, int y) {}
public record OnMouseMove(int x, int y) implements IEvent {}
/** Triggers when the mouse is clicked within the window. */
public record OnMouseClick(int button) {}

View File

@@ -1,12 +1,12 @@
package org.toop.eventbus.events;
import com.google.common.base.Supplier;
import org.toop.backend.tictactoe.TicTacToeServer;
import org.toop.frontend.networking.NetworkingGameClientHandler;
import java.lang.reflect.RecordComponent;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -18,14 +18,14 @@ public class NetworkEvents extends Events {
*
* @param future List of all connections in string form.
*/
public record RequestsAllClients(CompletableFuture<String> future) {}
public record RequestsAllClients(CompletableFuture<String> future) implements IEvent {}
/** Forces closing all active connections immediately. */
public record ForceCloseAllClients() {}
public record ForceCloseAllClients() implements IEvent {}
public record CloseClientRequest(CompletableFuture<String> future) {}
public record CloseClient(String connectionId) {}
public record CloseClient(String connectionId) implements IEvent {}
/**
* Event to start a new client connection to a server.
@@ -102,15 +102,14 @@ public class NetworkEvents extends Events {
*/
public record StartClientRequest(
Supplier<? extends NetworkingGameClientHandler> handlerFactory,
String ip, int port, CompletableFuture<String> future) {}
String ip, int port, CompletableFuture<String> future) implements IEvent {}
/**
* BLOCKING Triggers starting a server connection and returns a future.
*
* @param ip The IP address of the server to connect to.
* @param port The port of the server to connect to.
* @param clientId The ID of the client to be used in requests.
* @param eventId The eventID used in checking if event is for you.
*/
public record StartClientSuccess(Object connectionId, String ip, int port, String eventId)
public record StartClientSuccess(String clientId, String eventId)
implements EventWithUuid {
@Override
public Map<String, Object> result() {
@@ -139,7 +138,7 @@ public class NetworkEvents extends Events {
* @param connectionId The UUID of the connection to send the command on.
* @param args The command arguments.
*/
public record SendCommand(String connectionId, String... args) {}
public record SendCommand(String connectionId, String... args) implements IEvent {}
/**
* WIP Triggers when a command is sent to a server.
@@ -165,7 +164,7 @@ public class NetworkEvents extends Events {
* @param ConnectionUuid The UUID of the connection that received the message.
* @param message The message received.
*/
public record ReceivedMessage(String ConnectionUuid, String message) {}
public record ReceivedMessage(String ConnectionUuid, String message) implements IEvent {}
/**
* Triggers changing connection to a new address.

View File

@@ -0,0 +1,4 @@
package org.toop.eventbus.events;
public class ServerEvents {
}

View File

@@ -85,23 +85,23 @@ public class RemoteGameSelector {
throw new RuntimeException(ex);
} // TODO: Better error handling to not crash the system.
GlobalEventBus.subscribeAndRegister(
NetworkEvents.ReceivedMessage.class,
event -> {
if (event.message().equalsIgnoreCase("ok")) {
logger.info("received ok from server.");
} else if (event.message().toLowerCase().startsWith("gameid")) {
String gameId =
event.message()
.toLowerCase()
.replace("gameid ", "");
GlobalEventBus.post(
new NetworkEvents.SendCommand(
"start_game " + gameId));
} else {
logger.info("{}", event.message());
}
});
// GlobalEventBus.subscribeAndRegister(
// NetworkEvents.ReceivedMessage.class,
// event -> {
// if (event.message().equalsIgnoreCase("ok")) {
// logger.info("received ok from server.");
// } else if (event.message().toLowerCase().startsWith("gameid")) {
// String gameId =
// event.message()
// .toLowerCase()
// .replace("gameid ", "");
// GlobalEventBus.post(
// new NetworkEvents.SendCommand(
// "start_game " + gameId));
// } else {
// logger.info("{}", event.message());
// }
// });
GlobalEventBus.post(
new NetworkEvents.SendCommand(

View File

@@ -1,8 +1,11 @@
package org.toop.frontend.games;
import java.util.concurrent.*;
import jdk.jfr.Event;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.eventbus.EventPublisher;
import org.toop.eventbus.events.Events;
import org.toop.eventbus.GlobalEventBus;
import org.toop.eventbus.events.NetworkEvents;
@@ -11,6 +14,7 @@ import org.toop.frontend.networking.NetworkingGameClientHandler;
import org.toop.game.tictactoe.GameBase;
import org.toop.game.tictactoe.TicTacToe;
import org.toop.game.tictactoe.ai.MinMaxTicTacToe;
import java.util.function.Supplier;
/**
* A representation of a local tic-tac-toe game. Calls are made to a server for information about
@@ -66,9 +70,9 @@ public class LocalTicTacToe { // TODO: Implement runnable
* @param port The port of the server to connect to.
*/
private LocalTicTacToe(String ip, int port) {
this.receivedMessageListener =
GlobalEventBus.subscribe(this::receiveMessageAction);
GlobalEventBus.register(this.receivedMessageListener);
// this.receivedMessageListener =
// GlobalEventBus.subscribe(this::receiveMessageAction);
// GlobalEventBus.subscribe(this.receivedMessageListener);
this.connectionId = this.createConnection(ip, port);
this.createGame("X", "O");
this.isLocal = false;
@@ -100,8 +104,8 @@ public class LocalTicTacToe { // TODO: Implement runnable
private String createServer(int port) {
CompletableFuture<String> serverIdFuture = new CompletableFuture<>();
GlobalEventBus.post(
new Events.ServerEvents.StartServerRequest(port, "tictactoe", serverIdFuture));
new EventPublisher<>(Events.ServerEvents.StartServerRequest.class, port, "tictactoe", serverIdFuture)
.postEvent();
try {
return serverIdFuture.get();
} catch (Exception e) {
@@ -112,12 +116,9 @@ public class LocalTicTacToe { // TODO: Implement runnable
private String createConnection(String ip, int port) {
CompletableFuture<String> connectionIdFuture = new CompletableFuture<>();
GlobalEventBus.post(
new NetworkEvents.StartClientRequest(
NetworkingGameClientHandler::new,
ip,
port,
connectionIdFuture)); // TODO: what if server couldn't be started with port.
new EventPublisher<>(NetworkEvents.StartClientRequest.class,
(Supplier<NetworkingGameClientHandler>) NetworkingGameClientHandler::new,
ip, port, connectionIdFuture).postEvent(); // TODO: what if server couldn't be started with port.
try {
return connectionIdFuture.get();
} catch (InterruptedException | ExecutionException e) {
@@ -231,7 +232,7 @@ public class LocalTicTacToe { // TODO: Implement runnable
private void endTheGame() {
this.sendCommand("end_game", this.gameId);
this.endListeners();
// this.endListeners();
}
private void receiveMessageAction(NetworkEvents.ReceivedMessage receivedMessage) {
@@ -249,12 +250,12 @@ public class LocalTicTacToe { // TODO: Implement runnable
}
private void sendCommand(String... args) {
GlobalEventBus.post(new NetworkEvents.SendCommand(this.connectionId, args));
new EventPublisher<>(NetworkEvents.SendCommand.class, this.connectionId, args).postEvent();
}
private void endListeners() {
GlobalEventBus.unregister(this.receivedMessageListener);
}
// private void endListeners() {
// GlobalEventBus.unregister(this.receivedMessageListener);
// } TODO
public void setUIReference(UIGameBoard uiGameBoard) {
this.ui = uiGameBoard;

View File

@@ -1,50 +1,50 @@
package org.toop.frontend.graphics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.frontend.platform.graphics.opengl.OpenglRenderer;
public abstract class Renderer {
public enum API {
NONE,
OPENGL,
};
protected static final Logger logger = LogManager.getLogger(Renderer.class);
private static API api = API.NONE;
private static Renderer instance = null;
public static Renderer setup(API api) {
if (instance != null) {
logger.warn("Renderer is already setup.");
return instance;
}
switch (api) {
case OPENGL:
instance = new OpenglRenderer();
break;
default:
logger.fatal("No valid renderer api chosen");
return null;
}
Renderer.api = api;
return instance;
}
public static API getApi() {
return api;
}
public void cleanup() {
instance = null;
logger.info("Renderer cleanup.");
}
public abstract void clear();
public abstract void render();
}
//package org.toop.frontend.graphics;
//
//import org.apache.logging.log4j.LogManager;
//import org.apache.logging.log4j.Logger;
//import org.toop.frontend.platform.graphics.opengl.OpenglRenderer;
//
//public abstract class Renderer {
// public enum API {
// NONE,
// OPENGL,
// };
//
// protected static final Logger logger = LogManager.getLogger(Renderer.class);
//
// private static API api = API.NONE;
// private static Renderer instance = null;
//
// public static Renderer setup(API api) {
// if (instance != null) {
// logger.warn("Renderer is already setup.");
// return instance;
// }
//
// switch (api) {
// case OPENGL:
// instance = new OpenglRenderer();
// break;
//
// default:
// logger.fatal("No valid renderer api chosen");
// return null;
// }
//
// Renderer.api = api;
// return instance;
// }
//
// public static API getApi() {
// return api;
// }
//
// public void cleanup() {
// instance = null;
// logger.info("Renderer cleanup.");
// }
//
// public abstract void clear();
//
// public abstract void render();
//}

View File

@@ -1,27 +1,27 @@
package org.toop.frontend.graphics;
import org.toop.frontend.platform.graphics.opengl.OpenglShader;
public abstract class Shader {
public static Shader create(String vertexPath, String fragmentPath) {
Shader shader = null;
switch (Renderer.getApi()) {
case OPENGL:
shader = new OpenglShader(vertexPath, fragmentPath);
break;
case NONE:
default:
break;
}
return shader;
}
public abstract void cleanup();
public abstract void start();
public abstract void stop();
}
//package org.toop.frontend.graphics;
//
//import org.toop.frontend.platform.graphics.opengl.OpenglShader;
//
//public abstract class Shader {
// public static Shader create(String vertexPath, String fragmentPath) {
// Shader shader = null;
//
// switch (Renderer.getApi()) {
// case OPENGL:
// shader = new OpenglShader(vertexPath, fragmentPath);
// break;
//
// case NONE:
// default:
// break;
// }
//
// return shader;
// }
//
// public abstract void cleanup();
//
// public abstract void start();
//
// public abstract void stop();
//}

View File

@@ -1,68 +1,68 @@
package org.toop.frontend.graphics.node;
import java.util.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.eventbus.*;
import org.toop.eventbus.events.Events;
import org.toop.frontend.graphics.Shader;
public class NodeManager {
private static final Logger logger = LogManager.getLogger(NodeManager.class);
private static NodeManager instance = null;
public static NodeManager setup() {
if (instance != null) {
logger.warn("NodeManager is already setup.");
return instance;
}
instance = new NodeManager();
return instance;
}
private Shader shader;
private ArrayList<Node> nodes;
private Node active;
private NodeManager() {
shader =
Shader.create(
"src/main/resources/shaders/gui_vertex.glsl",
"src/main/resources/shaders/gui_fragment.glsl");
nodes = new ArrayList<Node>();
GlobalEventBus.subscribeAndRegister(
Events.WindowEvents.OnMouseMove.class,
event -> {
for (int i = 0; i < nodes.size(); i++) {
Node node = nodes.get(i);
if (node.check(event.x(), event.y())) {
active = node;
node.hover();
break;
}
}
});
GlobalEventBus.subscribeAndRegister(
Events.WindowEvents.OnMouseClick.class,
event -> {
if (active != null) {
active.click();
}
});
}
public void cleanup() {}
public void add(Node node) {
nodes.add(node);
}
public void render() {}
}
//package org.toop.frontend.graphics.node;
//
//import java.util.*;
//import org.apache.logging.log4j.LogManager;
//import org.apache.logging.log4j.Logger;
//import org.toop.eventbus.*;
//import org.toop.eventbus.events.Events;
//import org.toop.frontend.graphics.Shader;
//
//public class NodeManager {
// private static final Logger logger = LogManager.getLogger(NodeManager.class);
//
// private static NodeManager instance = null;
//
// public static NodeManager setup() {
// if (instance != null) {
// logger.warn("NodeManager is already setup.");
// return instance;
// }
//
// instance = new NodeManager();
// return instance;
// }
//
// private Shader shader;
// private ArrayList<Node> nodes;
// private Node active;
//
// private NodeManager() {
// shader =
// Shader.create(
// "src/main/resources/shaders/gui_vertex.glsl",
// "src/main/resources/shaders/gui_fragment.glsl");
//
// nodes = new ArrayList<Node>();
//
// GlobalEventBus.subscribeAndRegister(
// Events.WindowEvents.OnMouseMove.class,
// event -> {
// for (int i = 0; i < nodes.size(); i++) {
// Node node = nodes.get(i);
//
// if (node.check(event.x(), event.y())) {
// active = node;
// node.hover();
//
// break;
// }
// }
// });
//
// GlobalEventBus.subscribeAndRegister(
// Events.WindowEvents.OnMouseClick.class,
// event -> {
// if (active != null) {
// active.click();
// }
// });
// }
//
// public void cleanup() {}
//
// public void add(Node node) {
// nodes.add(node);
// }
//
// public void render() {}
//}

View File

@@ -1,6 +1,5 @@
package org.toop.frontend.networking;
import com.google.common.base.Supplier;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioIoHandler;
@@ -11,6 +10,8 @@ import io.netty.handler.codec.string.StringDecoder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.Supplier;
public class NetworkingClient {
private static final Logger logger = LogManager.getLogger(NetworkingClient.class);

View File

@@ -1,16 +1,12 @@
package org.toop.frontend.networking;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import com.google.common.base.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.eventbus.EventPublisher;
import org.toop.eventbus.events.Events;
import org.toop.eventbus.GlobalEventBus;
import org.toop.eventbus.events.NetworkEvents;
@@ -23,21 +19,17 @@ public class NetworkingClientManager {
/** Starts a connection manager, to manage, connections. */
public NetworkingClientManager() {
GlobalEventBus.subscribeAndRegister(this::handleStartClientRequest);
GlobalEventBus.subscribeAndRegister(this::handleStartClient);
GlobalEventBus.subscribeAndRegister(this::handleCommand);
GlobalEventBus.subscribeAndRegister(this::handleCloseClient);
// GlobalEventBus.subscribeAndRegister(
// Events.ServerEvents.Reconnect.class, this::handleReconnect);
// GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ChangeConnection.class,
// this::handleChangeConnection);
GlobalEventBus.subscribeAndRegister(this::shutdownAll);
GlobalEventBus.subscribeAndRegister(this::getAllConnections);
new EventPublisher<>(NetworkEvents.StartClientRequest.class, this::handleStartClientRequest);
new EventPublisher<>(NetworkEvents.StartClient.class, this::handleStartClient);
new EventPublisher<>(NetworkEvents.SendCommand.class, this::handleCommand);
new EventPublisher<>(NetworkEvents.CloseClient.class, this::handleCloseClient);
new EventPublisher<>(NetworkEvents.RequestsAllClients.class, this::getAllConnections);
new EventPublisher<>(NetworkEvents.ForceCloseAllClients.class, this::shutdownAll);
}
private String startConnectionRequest(Supplier<? extends NetworkingGameClientHandler> handlerFactory,
String ip,
int port) {
private String startClientRequest(Supplier<? extends NetworkingGameClientHandler> handlerFactory,
String ip,
int port) {
String connectionUuid = UUID.randomUUID().toString();
try {
NetworkingClient client = new NetworkingClient(
@@ -48,29 +40,24 @@ public class NetworkingClientManager {
} catch (Exception e) {
logger.error(e);
}
logger.info("Client {} started", connectionUuid);
return connectionUuid;
}
private void handleStartClientRequest(NetworkEvents.StartClientRequest request) {
request.future()
.complete(
this.startConnectionRequest(
this.startClientRequest(
request.handlerFactory(),
request.ip(),
request.port())); // TODO: Maybe post ConnectionEstablished event.
}
private void handleStartClient(NetworkEvents.StartClient event) {
GlobalEventBus.post(
new NetworkEvents.StartClientSuccess(
this.startConnectionRequest(
event.handlerFactory(),
event.ip(),
event.port()),
event.ip(),
event.port(),
event.eventId()
));
String uuid = this.startClientRequest(event.handlerFactory(), event.ip(), event.port());
new EventPublisher<>(NetworkEvents.StartClientSuccess.class,
uuid, event.eventId()
).asyncPostEvent();
}
private void handleCommand(

View File

@@ -1,11 +1,9 @@
package org.toop.frontend.networking;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.Main;
public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(NetworkingGameClientHandler.class);

View File

@@ -5,6 +5,7 @@ import java.net.InetAddress;
import java.util.concurrent.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.eventbus.EventPublisher;
import org.toop.eventbus.events.Events;
import org.toop.eventbus.GlobalEventBus;
import org.toop.eventbus.events.NetworkEvents;
@@ -85,9 +86,7 @@ public final class ServerConnection extends TcpClient implements Runnable {
if (received != null) {
logger.info("Connection: {} received: '{}'", this.uuid, received);
// this.addReceivedMessageToQueue(received); // TODO: Will never go empty
GlobalEventBus.post(
new NetworkEvents.ReceivedMessage(
this.uuid, received)); // TODO: mb change
new EventPublisher<>(NetworkEvents.ReceivedMessage.class, this.uuid, received).postEvent();
} else {
break;
}

View File

@@ -1,109 +1,109 @@
package org.toop.frontend.platform.core.glfw;
import org.lwjgl.glfw.*;
import org.lwjgl.system.*;
import org.toop.core.*;
import org.toop.eventbus.*;
import org.toop.eventbus.events.Events;
public class GlfwWindow extends Window {
private long window;
public GlfwWindow(String title, Size size) {
if (!GLFW.glfwInit()) {
logger.fatal("Failed to initialize glfw");
return;
}
GLFW.glfwDefaultWindowHints();
GLFW.glfwWindowHint(GLFW.GLFW_VISIBLE, GLFW.GLFW_FALSE);
GLFW.glfwWindowHint(GLFW.GLFW_RESIZABLE, GLFW.GLFW_TRUE);
GLFWVidMode videoMode = GLFW.glfwGetVideoMode(GLFW.glfwGetPrimaryMonitor());
int width = size.width();
int height = size.height();
if (width <= 0 || height <= 0 || width > videoMode.width() || height > videoMode.height()) {
width = videoMode.width();
height = videoMode.height();
GLFW.glfwWindowHint(GLFW.GLFW_MAXIMIZED, GLFW.GLFW_TRUE);
}
long window = GLFW.glfwCreateWindow(width, height, title, MemoryUtil.NULL, MemoryUtil.NULL);
if (window == MemoryUtil.NULL) {
GLFW.glfwTerminate();
logger.fatal("Failed to create glfw window");
return;
}
int[] widthBuffer = new int[1];
int[] heightBuffer = new int[1];
GLFW.glfwGetWindowSize(window, widthBuffer, heightBuffer);
GLFW.glfwMakeContextCurrent(window);
GLFW.glfwSwapInterval(1);
GLFW.glfwSetWindowCloseCallback(
window,
(lwindow) -> {
GlobalEventBus.post(new Events.WindowEvents.OnQuitRequested());
});
GLFW.glfwSetFramebufferSizeCallback(
window,
(lwindow, lwidth, lheight) -> {
GlobalEventBus.post(
new Events.WindowEvents.OnResize(new Size(lwidth, lheight)));
});
GLFW.glfwSetCursorPosCallback(
window,
(lwindow, lx, ly) -> {
GlobalEventBus.post(new Events.WindowEvents.OnMouseMove((int) lx, (int) ly));
});
GLFW.glfwSetMouseButtonCallback(
window,
(lwindow, lbutton, laction, lmods) -> {
switch (laction) {
case GLFW.GLFW_PRESS:
GlobalEventBus.post(new Events.WindowEvents.OnMouseClick(lbutton));
break;
case GLFW.GLFW_RELEASE:
GlobalEventBus.post(new Events.WindowEvents.OnMouseRelease(lbutton));
break;
default:
break;
}
});
this.window = window;
GLFW.glfwShowWindow(window);
logger.info(
"Glfw window setup. Title: {}. Width: {}. Height: {}.",
title,
size.width(),
size.height());
}
@Override
public void cleanup() {
GLFW.glfwDestroyWindow(window);
GLFW.glfwTerminate();
super.cleanup();
}
@Override
public void update() {
GLFW.glfwSwapBuffers(window);
GLFW.glfwPollEvents();
}
}
//package org.toop.frontend.platform.core.glfw;
//
//import org.lwjgl.glfw.*;
//import org.lwjgl.system.*;
//import org.toop.core.*;
//import org.toop.eventbus.*;
//import org.toop.eventbus.events.Events;
//
//public class GlfwWindow extends Window {
// private long window;
//
// public GlfwWindow(String title, Size size) {
// if (!GLFW.glfwInit()) {
// logger.fatal("Failed to initialize glfw");
// return;
// }
//
// GLFW.glfwDefaultWindowHints();
// GLFW.glfwWindowHint(GLFW.GLFW_VISIBLE, GLFW.GLFW_FALSE);
// GLFW.glfwWindowHint(GLFW.GLFW_RESIZABLE, GLFW.GLFW_TRUE);
//
// GLFWVidMode videoMode = GLFW.glfwGetVideoMode(GLFW.glfwGetPrimaryMonitor());
//
// int width = size.width();
// int height = size.height();
//
// if (width <= 0 || height <= 0 || width > videoMode.width() || height > videoMode.height()) {
// width = videoMode.width();
// height = videoMode.height();
//
// GLFW.glfwWindowHint(GLFW.GLFW_MAXIMIZED, GLFW.GLFW_TRUE);
// }
//
// long window = GLFW.glfwCreateWindow(width, height, title, MemoryUtil.NULL, MemoryUtil.NULL);
//
// if (window == MemoryUtil.NULL) {
// GLFW.glfwTerminate();
//
// logger.fatal("Failed to create glfw window");
// return;
// }
//
// int[] widthBuffer = new int[1];
// int[] heightBuffer = new int[1];
// GLFW.glfwGetWindowSize(window, widthBuffer, heightBuffer);
//
// GLFW.glfwMakeContextCurrent(window);
// GLFW.glfwSwapInterval(1);
//
// GLFW.glfwSetWindowCloseCallback(
// window,
// (lwindow) -> {
// GlobalEventBus.post(new Events.WindowEvents.OnQuitRequested());
// });
//
// GLFW.glfwSetFramebufferSizeCallback(
// window,
// (lwindow, lwidth, lheight) -> {
// GlobalEventBus.post(
// new Events.WindowEvents.OnResize(new Size(lwidth, lheight)));
// });
//
// GLFW.glfwSetCursorPosCallback(
// window,
// (lwindow, lx, ly) -> {
// GlobalEventBus.post(new Events.WindowEvents.OnMouseMove((int) lx, (int) ly));
// });
//
// GLFW.glfwSetMouseButtonCallback(
// window,
// (lwindow, lbutton, laction, lmods) -> {
// switch (laction) {
// case GLFW.GLFW_PRESS:
// GlobalEventBus.post(new Events.WindowEvents.OnMouseClick(lbutton));
// break;
//
// case GLFW.GLFW_RELEASE:
// GlobalEventBus.post(new Events.WindowEvents.OnMouseRelease(lbutton));
// break;
//
// default:
// break;
// }
// });
//
// this.window = window;
// GLFW.glfwShowWindow(window);
//
// logger.info(
// "Glfw window setup. Title: {}. Width: {}. Height: {}.",
// title,
// size.width(),
// size.height());
// }
//
// @Override
// public void cleanup() {
// GLFW.glfwDestroyWindow(window);
// GLFW.glfwTerminate();
//
// super.cleanup();
// }
//
// @Override
// public void update() {
// GLFW.glfwSwapBuffers(window);
// GLFW.glfwPollEvents();
// }
//}

View File

@@ -1,79 +1,79 @@
package org.toop.frontend.platform.graphics.opengl;
import org.lwjgl.opengl.*;
import org.lwjgl.system.*;
import org.toop.eventbus.*;
import org.toop.eventbus.events.Events;
import org.toop.frontend.graphics.Renderer;
import org.toop.frontend.graphics.Shader;
public class OpenglRenderer extends Renderer {
private Shader shader;
private int vao;
public OpenglRenderer() {
GL.createCapabilities();
GL45.glClearColor(0.65f, 0.9f, 0.65f, 1f);
GlobalEventBus.subscribeAndRegister(
Events.WindowEvents.OnResize.class,
event -> {
GL45.glViewport(0, 0, event.size().width(), event.size().height());
});
logger.info("Opengl renderer setup.");
// Form here on, everything is temporary
float vertices[] = {
-0.5f, 0.5f, 1.0f, 0.0f, 0.0f,
-0.5f, -0.5f, 0.0f, 1.0f, 0.0f,
0.5f, -0.5f, 0.0f, 0.0f, 1.0f,
0.5f, 0.5f, 1.0f, 1.0f, 0.0f,
};
int indicies[] = {
0, 1, 2,
2, 3, 0,
};
vao = GL45.glCreateVertexArrays();
GL45.glBindVertexArray(vao);
int vbo = GL45.glCreateBuffers();
GL45.glBindBuffer(GL45.GL_ARRAY_BUFFER, vbo);
GL45.glBufferData(GL45.GL_ARRAY_BUFFER, vertices, GL45.GL_STATIC_DRAW);
GL45.glVertexAttribPointer(0, 2, GL45.GL_FLOAT, false, 5 * 4, 0);
GL45.glVertexAttribPointer(1, 3, GL45.GL_FLOAT, false, 5 * 4, 2 * 4);
GL45.glEnableVertexAttribArray(0);
GL45.glEnableVertexAttribArray(1);
int ib = GL45.glCreateBuffers();
GL45.glBindBuffer(GL45.GL_ELEMENT_ARRAY_BUFFER, ib);
GL45.glBufferData(GL45.GL_ELEMENT_ARRAY_BUFFER, indicies, GL45.GL_STATIC_DRAW);
shader =
Shader.create(
"src/main/resources/shaders/gui_vertex.glsl",
"src/main/resources/shaders/gui_fragment.glsl");
}
@Override
public void cleanup() {
super.cleanup();
}
@Override
public void clear() {
GL45.glClear(GL45.GL_COLOR_BUFFER_BIT);
}
@Override
public void render() {
// temporary
// shader.start();
GL45.glBindVertexArray(vao);
GL45.glDrawElements(GL45.GL_TRIANGLES, 6, GL45.GL_UNSIGNED_INT, MemoryUtil.NULL);
}
}
//package org.toop.frontend.platform.graphics.opengl;
//
//import org.lwjgl.opengl.*;
//import org.lwjgl.system.*;
//import org.toop.eventbus.*;
//import org.toop.eventbus.events.Events;
//import org.toop.frontend.graphics.Renderer;
//import org.toop.frontend.graphics.Shader;
//
//public class OpenglRenderer extends Renderer {
// private Shader shader;
// private int vao;
//
// public OpenglRenderer() {
// GL.createCapabilities();
// GL45.glClearColor(0.65f, 0.9f, 0.65f, 1f);
//
// GlobalEventBus.subscribeAndRegister(
// Events.WindowEvents.OnResize.class,
// event -> {
// GL45.glViewport(0, 0, event.size().width(), event.size().height());
// });
//
// logger.info("Opengl renderer setup.");
//
// // Form here on, everything is temporary
// float vertices[] = {
// -0.5f, 0.5f, 1.0f, 0.0f, 0.0f,
// -0.5f, -0.5f, 0.0f, 1.0f, 0.0f,
// 0.5f, -0.5f, 0.0f, 0.0f, 1.0f,
// 0.5f, 0.5f, 1.0f, 1.0f, 0.0f,
// };
//
// int indicies[] = {
// 0, 1, 2,
// 2, 3, 0,
// };
//
// vao = GL45.glCreateVertexArrays();
// GL45.glBindVertexArray(vao);
//
// int vbo = GL45.glCreateBuffers();
// GL45.glBindBuffer(GL45.GL_ARRAY_BUFFER, vbo);
// GL45.glBufferData(GL45.GL_ARRAY_BUFFER, vertices, GL45.GL_STATIC_DRAW);
//
// GL45.glVertexAttribPointer(0, 2, GL45.GL_FLOAT, false, 5 * 4, 0);
// GL45.glVertexAttribPointer(1, 3, GL45.GL_FLOAT, false, 5 * 4, 2 * 4);
//
// GL45.glEnableVertexAttribArray(0);
// GL45.glEnableVertexAttribArray(1);
//
// int ib = GL45.glCreateBuffers();
// GL45.glBindBuffer(GL45.GL_ELEMENT_ARRAY_BUFFER, ib);
// GL45.glBufferData(GL45.GL_ELEMENT_ARRAY_BUFFER, indicies, GL45.GL_STATIC_DRAW);
//
// shader =
// Shader.create(
// "src/main/resources/shaders/gui_vertex.glsl",
// "src/main/resources/shaders/gui_fragment.glsl");
// }
//
// @Override
// public void cleanup() {
// super.cleanup();
// }
//
// @Override
// public void clear() {
// GL45.glClear(GL45.GL_COLOR_BUFFER_BIT);
// }
//
// @Override
// public void render() {
// // temporary
// // shader.start();
// GL45.glBindVertexArray(vao);
// GL45.glDrawElements(GL45.GL_TRIANGLES, 6, GL45.GL_UNSIGNED_INT, MemoryUtil.NULL);
// }
//}

View File

@@ -1,57 +1,57 @@
package org.toop.frontend.platform.graphics.opengl;
import org.lwjgl.opengl.*;
import org.toop.core.*;
import org.toop.frontend.graphics.Shader;
public class OpenglShader extends Shader {
private int programID;
public OpenglShader(String vertexPath, String fragmentPath) {
FileSystem.File vertexSource = FileSystem.read(vertexPath);
FileSystem.File fragmentSource = FileSystem.read(fragmentPath);
if (vertexSource == null || fragmentPath == null) {
return;
}
programID = GL45.glCreateProgram();
int vertexShader = GL45.glCreateShader(GL45.GL_VERTEX_SHADER);
int fragmentShader = GL45.glCreateShader(GL45.GL_FRAGMENT_SHADER);
GL45.glShaderSource(vertexShader, vertexSource.buffer());
GL45.glShaderSource(fragmentShader, fragmentSource.buffer());
GL45.glCompileShader(vertexShader);
GL45.glCompileShader(fragmentShader);
GL45.glAttachShader(programID, vertexShader);
GL45.glAttachShader(programID, fragmentShader);
GL45.glLinkProgram(programID);
GL45.glValidateProgram(programID);
GL45.glDetachShader(programID, vertexShader);
GL45.glDetachShader(programID, fragmentShader);
GL45.glDeleteShader(vertexShader);
GL45.glDeleteShader(fragmentShader);
}
@Override
public void cleanup() {
stop();
GL45.glDeleteProgram(programID);
}
@Override
public void start() {
GL45.glUseProgram(programID);
}
@Override
public void stop() {
GL45.glUseProgram(0);
}
}
//package org.toop.frontend.platform.graphics.opengl;
//
//import org.lwjgl.opengl.*;
//import org.toop.core.*;
//import org.toop.frontend.graphics.Shader;
//
//public class OpenglShader extends Shader {
// private int programID;
//
// public OpenglShader(String vertexPath, String fragmentPath) {
// FileSystem.File vertexSource = FileSystem.read(vertexPath);
// FileSystem.File fragmentSource = FileSystem.read(fragmentPath);
//
// if (vertexSource == null || fragmentPath == null) {
// return;
// }
//
// programID = GL45.glCreateProgram();
//
// int vertexShader = GL45.glCreateShader(GL45.GL_VERTEX_SHADER);
// int fragmentShader = GL45.glCreateShader(GL45.GL_FRAGMENT_SHADER);
//
// GL45.glShaderSource(vertexShader, vertexSource.buffer());
// GL45.glShaderSource(fragmentShader, fragmentSource.buffer());
//
// GL45.glCompileShader(vertexShader);
// GL45.glCompileShader(fragmentShader);
//
// GL45.glAttachShader(programID, vertexShader);
// GL45.glAttachShader(programID, fragmentShader);
//
// GL45.glLinkProgram(programID);
// GL45.glValidateProgram(programID);
//
// GL45.glDetachShader(programID, vertexShader);
// GL45.glDetachShader(programID, fragmentShader);
//
// GL45.glDeleteShader(vertexShader);
// GL45.glDeleteShader(fragmentShader);
// }
//
// @Override
// public void cleanup() {
// stop();
// GL45.glDeleteProgram(programID);
// }
//
// @Override
// public void start() {
// GL45.glUseProgram(programID);
// }
//
// @Override
// public void stop() {
// GL45.glUseProgram(0);
// }
//}

View File

@@ -34,10 +34,10 @@ class EventPublisherPerformanceTest {
@Test
void testEventPostSpeed() {
int iterations = 10_000;
int iterations = 100_000;
AtomicInteger counter = new AtomicInteger(0);
GlobalEventBus.subscribeAndRegister(PerfEvent.class, e -> counter.incrementAndGet());
GlobalEventBus.subscribe(PerfEvent.class, e -> counter.incrementAndGet());
long start = System.nanoTime();
@@ -59,7 +59,7 @@ class EventPublisherPerformanceTest {
int eventsPerThread = 5_000;
AtomicInteger counter = new AtomicInteger(0);
GlobalEventBus.subscribeAndRegister(PerfEvent.class, e -> counter.incrementAndGet());
GlobalEventBus.subscribe(PerfEvent.class, e -> counter.incrementAndGet());
Thread[] workers = new Thread[threads];

View File

@@ -5,32 +5,49 @@ import org.junit.jupiter.api.Test;
import org.toop.eventbus.events.EventWithUuid;
import java.math.BigInteger;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import static org.junit.jupiter.api.Assertions.assertEquals;
class EventPublisherStressTest {
/** Top-level record to ensure runtime type matches subscription */
public record HeavyEvent(String payload, String eventId) implements EventWithUuid {
@Override
public java.util.Map<String, Object> result() {
return java.util.Map.of("payload", payload, "eventId", eventId);
}
@Override
public String eventId() {
return eventId;
}
}
private static final int THREADS = 1;
private static final long EVENTS_PER_THREAD = 2_000_000_000;
public record HeavyEventSuccess(String payload, String eventId) implements EventWithUuid {
@Override
public java.util.Map<String, Object> result() {
return java.util.Map.of("payload", payload, "eventId", eventId);
}
@Override
public String eventId() {
return eventId;
}
}
private static final int THREADS = 16;
private static final long EVENTS_PER_THREAD = 1_000_000_000;
@Tag("stress")
@Test
void extremeConcurrencyTest_progressWithMemory() throws InterruptedException {
AtomicLong counter = new AtomicLong(0); // Big numbers safety
void extremeConcurrencySendTest_progressWithMemory() throws InterruptedException {
LongAdder counter = new LongAdder();
ExecutorService executor = Executors.newFixedThreadPool(THREADS);
GlobalEventBus.subscribeAndRegister(HeavyEvent.class, _ -> counter.incrementAndGet());
BigInteger totalEvents = BigInteger.valueOf(THREADS)
.multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
@@ -39,25 +56,22 @@ class EventPublisherStressTest {
// Monitor thread for EPS and memory
Thread monitor = new Thread(() -> {
long lastCount = 0;
long lastTime = startTime;
long lastTime = System.currentTimeMillis();
Runtime runtime = Runtime.getRuntime();
while (counter.get() < totalEvents.longValue()) {
try { Thread.sleep(1000); } catch (InterruptedException ignored) {}
while (counter.sum() < totalEvents.longValue()) {
try { Thread.sleep(200); } catch (InterruptedException ignored) {}
long now = System.currentTimeMillis();
long completed = counter.get();
long eventsThisSecond = completed - lastCount;
double eps = eventsThisSecond / ((now - lastTime) / 1000.0);
long completed = counter.sum();
long eventsThisPeriod = completed - lastCount;
double eps = eventsThisPeriod / ((now - lastTime) / 1000.0);
// Memory usage
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
long maxMemory = runtime.maxMemory();
double usedPercent = usedMemory * 100.0 / maxMemory;
double usedPercent = usedMemory * 100.0 / runtime.maxMemory();
System.out.printf(
"Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)\n",
"Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n",
completed,
totalEvents.longValue(),
completed * 100.0 / totalEvents.doubleValue(),
@@ -73,17 +87,22 @@ class EventPublisherStressTest {
monitor.setDaemon(true);
monitor.start();
// Submit events
var listener = new EventPublisher<>(HeavyEvent.class, _ -> counter.increment());
// Submit events asynchronously
for (int t = 0; t < THREADS; t++) {
executor.submit(() -> {
for (int i = 0; i < EVENTS_PER_THREAD; i++) {
new EventPublisher<>(HeavyEvent.class, "payload-" + i).postEvent();
var _ = new EventPublisher<>(HeavyEvent.class, "payload-" + i)
.asyncPostEvent();
}
});
}
executor.shutdown();
executor.awaitTermination(20, TimeUnit.MINUTES); // allow extra time for huge tests
executor.awaitTermination(10, TimeUnit.MINUTES);
listener.getResult();
long endTime = System.currentTimeMillis();
double durationSeconds = (endTime - startTime) / 1000.0;
@@ -92,13 +111,87 @@ class EventPublisherStressTest {
double averageEps = totalEvents.doubleValue() / durationSeconds;
System.out.printf("Average EPS: %.0f%n", averageEps);
assertEquals(totalEvents.longValue(), counter.get());
assertEquals(totalEvents.longValue(), counter.sum());
}
@Tag("stress")
@Test
void extremeConcurrencySendAndReturnTest_progressWithMemory() throws InterruptedException {
LongAdder counter = new LongAdder();
ExecutorService executor = Executors.newFixedThreadPool(THREADS);
BigInteger totalEvents = BigInteger.valueOf(THREADS)
.multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
long startTime = System.currentTimeMillis();
// Monitor thread for EPS and memory
Thread monitor = new Thread(() -> {
long lastCount = 0;
long lastTime = System.currentTimeMillis();
Runtime runtime = Runtime.getRuntime();
while (counter.sum() < totalEvents.longValue()) {
try { Thread.sleep(200); } catch (InterruptedException ignored) {}
long now = System.currentTimeMillis();
long completed = counter.sum();
long eventsThisPeriod = completed - lastCount;
double eps = eventsThisPeriod / ((now - lastTime) / 1000.0);
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
double usedPercent = usedMemory * 100.0 / runtime.maxMemory();
System.out.printf(
"Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n",
completed,
totalEvents.longValue(),
completed * 100.0 / totalEvents.doubleValue(),
eps,
usedMemory / 1024.0 / 1024.0,
usedPercent
);
lastCount = completed;
lastTime = now;
}
});
monitor.setDaemon(true);
monitor.start();
// Submit events asynchronously
for (int t = 0; t < THREADS; t++) {
executor.submit(() -> {
for (int i = 0; i < EVENTS_PER_THREAD; i++) {
var a = new EventPublisher<>(HeavyEvent.class, "payload-" + i)
.onEventById(HeavyEventSuccess.class, _ -> counter.increment())
.unsubscribeAfterSuccess()
.asyncPostEvent();
new EventPublisher<>(HeavyEventSuccess.class, "payload-" + i, a.getEventId())
.asyncPostEvent();
}
});
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.MINUTES);
long endTime = System.currentTimeMillis();
double durationSeconds = (endTime - startTime) / 1000.0;
System.out.println("Posted " + totalEvents + " events in " + durationSeconds + " seconds");
double averageEps = totalEvents.doubleValue() / durationSeconds;
System.out.printf("Average EPS: %.0f%n", averageEps);
assertEquals(totalEvents.longValue(), counter.sum());
}
@Tag("stress")
@Test
void efficientExtremeConcurrencyTest() throws InterruptedException {
final int THREADS = Runtime.getRuntime().availableProcessors(); // threads ≈ CPU cores
final int THREADS = Runtime.getRuntime().availableProcessors();
final int EVENTS_PER_THREAD = 5000;
ExecutorService executor = Executors.newFixedThreadPool(THREADS);
@@ -128,11 +221,9 @@ class EventPublisherStressTest {
System.out.printf("Posted %s events in %.3f seconds%n", totalEvents, durationSeconds);
System.out.printf("Throughput: %.0f events/sec%n", eps);
// Memory snapshot
Runtime rt = Runtime.getRuntime();
System.out.printf("Used memory: %.2f MB%n", (rt.totalMemory() - rt.freeMemory()) / 1024.0 / 1024.0);
// Ensure all events were processed
assertEquals(totalEvents.intValue(), processedEvents.size());
}
@@ -142,14 +233,12 @@ class EventPublisherStressTest {
int iterations = 1_000_000;
long startReflect = System.nanoTime();
for (int i = 0; i < iterations; i++) {
// Reflection every time
HeavyEvent.class.getDeclaredConstructors()[0].newInstance("payload", "uuid-" + i);
}
long endReflect = System.nanoTime();
long startHandle = System.nanoTime();
for (int i = 0; i < iterations; i++) {
// Using cached MethodHandle
EventPublisher<HeavyEvent> ep = new EventPublisher<>(HeavyEvent.class, "payload-" + i);
}
long endHandle = System.nanoTime();

View File

@@ -1,14 +1,11 @@
package org.toop.eventbus;
import com.google.common.eventbus.EventBus;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.toop.eventbus.events.EventWithUuid;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.*;
@@ -71,7 +68,7 @@ class EventPublisherTest {
EventPublisher<TestEvent> publisher = new EventPublisher<>(TestEvent.class, "event");
publisher.onEventById(TestEvent.class, event -> triggered.set(true))
.unregisterAfterSuccess()
.unsubscribeAfterSuccess()
.postEvent();
// Subscriber should have been removed after first trigger
@@ -109,6 +106,13 @@ class EventPublisherTest {
assertTrue(firstTriggered.get());
assertTrue(secondTriggered.get());
publisher.onEventById(TestEvent.class, e -> firstTriggered.set(true))
.onEventById(TestEvent.class, e -> secondTriggered.set(true))
.asyncPostEvent();
assertTrue(firstTriggered.get());
assertTrue(secondTriggered.get());
}
@Test

View File

@@ -1,84 +1,110 @@
package org.toop.eventbus;
//package org.toop.eventbus;
//
//import net.engio.mbassy.bus.publication.SyncAsyncPostCommand;
//import org.junit.jupiter.api.AfterEach;
//import org.junit.jupiter.api.Test;
//import org.toop.eventbus.events.IEvent;
//
//import java.util.concurrent.atomic.AtomicBoolean;
//import java.util.concurrent.atomic.AtomicReference;
//
//import static org.junit.jupiter.api.Assertions.*;
//
//class GlobalEventBusTest {
//
// // A simple test event
// static class TestEvent implements IEvent {
// private final String message;
//
// TestEvent(String message) {
// this.message = message;
// }
//
// String getMessage() {
// return message;
// }
// }
//
// @AfterEach
// void tearDown() {
// // Reset to avoid leaking subscribers between tests
// GlobalEventBus.reset();
// }
//
// @Test
// void testSubscribeWithType() {
// AtomicReference<String> result = new AtomicReference<>();
//
// GlobalEventBus.subscribe(TestEvent.class, e -> result.set(e.getMessage()));
//
// GlobalEventBus.post(new TestEvent("hello"));
//
// assertEquals("hello", result.get());
// }
//
// @Test
// void testSubscribeWithoutType() {
// AtomicReference<String> result = new AtomicReference<>();
//
// GlobalEventBus.subscribe((TestEvent e) -> result.set(e.getMessage()));
//
// GlobalEventBus.post(new TestEvent("world"));
//
// assertEquals("world", result.get());
// }
//
// @Test
// void testUnsubscribeStopsReceivingEvents() {
// AtomicBoolean called = new AtomicBoolean(false);
//
// Object listener = GlobalEventBus.subscribe(TestEvent.class, e -> called.set(true));
//
// // First event should trigger
// GlobalEventBus.post(new TestEvent("first"));
// assertTrue(called.get());
//
// // Reset flag
// called.set(false);
//
// // Unsubscribe and post again
// GlobalEventBus.unsubscribe(listener);
// GlobalEventBus.post(new TestEvent("second"));
//
// assertFalse(called.get(), "Listener should not be called after unsubscribe");
// }
//
// @Test
// void testResetClearsListeners() {
// AtomicBoolean called = new AtomicBoolean(false);
//
// GlobalEventBus.subscribe(TestEvent.class, e -> called.set(true));
//
// GlobalEventBus.reset(); // should wipe subscriptions
//
// GlobalEventBus.post(new TestEvent("ignored"));
//
// assertFalse(called.get(), "Listener should not survive reset()");
// }
import static org.junit.jupiter.api.Assertions.*;
import com.google.common.eventbus.EventBus;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.*;
class GlobalEventBusTest {
@BeforeEach
void setup() {
// Reset the singleton before each test
GlobalEventBus.reset();
}
@AfterEach
void teardown() {
// Ensure reset after tests
GlobalEventBus.reset();
}
@Test
void testGet_returnsEventBus() {
EventBus bus = GlobalEventBus.get();
assertNotNull(bus, "EventBus should not be null");
assertEquals("global-bus", bus.identifier(), "EventBus name should match");
}
@Test
void testSet_replacesEventBus() {
EventBus newBus = new EventBus("new-bus");
GlobalEventBus.set(newBus);
assertEquals(newBus, GlobalEventBus.get(), "EventBus should be replaced");
}
@Test
void testSubscribe_wrapsConsumerAndHandlesEvent() {
AtomicBoolean called = new AtomicBoolean(false);
var listener = GlobalEventBus.subscribe(String.class, _ -> called.set(true));
GlobalEventBus.register(listener);
GlobalEventBus.post("hello");
assertTrue(called.get(), "Consumer should have been called");
}
@Test
void testSubscribeAndRegister_registersListenerAutomatically() {
AtomicBoolean called = new AtomicBoolean(false);
GlobalEventBus.subscribeAndRegister(String.class, _ -> called.set(true));
GlobalEventBus.post("test-event");
assertTrue(called.get(), "Consumer should have been called");
}
@Test
void testUnregister_removesListener() {
AtomicBoolean called = new AtomicBoolean(false);
var listener = GlobalEventBus.subscribe(String.class, _ -> called.set(true));
GlobalEventBus.register(listener);
GlobalEventBus.unregister(listener);
GlobalEventBus.post("hello");
assertFalse(called.get(), "Consumer should not be called after unregister");
}
// @Test
// void testPost_storesEventInRegistry() {
// // Simple EventMeta check
// class MyEvent {}
//
// MyEvent event = new MyEvent();
// GlobalEventBus.post(event);
//
// EventMeta<MyEvent> stored = EventRegistry.getStoredEvent(MyEvent.class);
// assertNotNull(stored, "EventMeta should be stored");
// assertEquals(event, stored.event(), "Stored event should match the posted one");
// }
}
// @Test
// void testSetReplacesBus() {
// MBassadorMock<IEvent> mockBus = new MBassadorMock<>();
// GlobalEventBus.set(mockBus);
//
// TestEvent event = new TestEvent("test");
// GlobalEventBus.post(event);
//
// assertEquals(event, mockBus.lastPosted, "Custom bus should receive the event");
// }
//
// // Minimal fake MBassador for verifying set()
// static class MBassadorMock<T extends IEvent> extends net.engio.mbassy.bus.MBassador<T> {
// T lastPosted;
//
// @Override
// public SyncAsyncPostCommand<T> post(T message) {
// this.lastPosted = message;
// return super.post(message);
// }
// }
//}