Updated eventpublisher to also accept normal events without uuid

This commit is contained in:
lieght
2025-09-22 17:07:39 +02:00
parent 3fa0bae46a
commit 63b08f3010
8 changed files with 169 additions and 63 deletions

View File

@@ -4,6 +4,7 @@
<w>aosp</w>
<w>gamelist</w>
<w>playerlist</w>
<w>tictactoe</w>
<w>vmoptions</w>
</words>
</dictionary>

View File

@@ -38,34 +38,15 @@ public class Main {
new Events.ServerEvents.StartServerRequest(5001, "tictactoe", serverIdFuture));
var serverId = serverIdFuture.get();
new MainTest();
// CompletableFuture<String> conIdFuture = new CompletableFuture<>();
// GlobalEventBus.post(
// new NetworkEvents.StartClientRequest(NetworkingGameClientHandler::new,
// "127.0.0.1", 5001, conIdFuture));
// var conId = conIdFuture.get();
int numThreads = 100; // how many EventPublisher tests you want
ExecutorService executor = Executors.newFixedThreadPool(200); // 20 threads in pool
for (int i = 0; i < numThreads; i++) {
executor.submit(() -> {
new EventPublisher<>(
NetworkEvents.StartClient.class,
(Supplier<NetworkingGameClientHandler>) NetworkingGameClientHandler::new,
"127.0.0.1",
5001
).onEventById(
NetworkEvents.StartClientSuccess.class,
event -> GlobalEventBus.post(
new NetworkEvents.CloseClient((String) event.connectionId()))
).unregisterAfterSuccess()
.postEvent();
});
}
// Shutdown after tasks complete
executor.shutdown();
// GlobalEventBus.post(new NetworkEvents.SendCommand(conId, "move", "5"));
// GlobalEventBus.post(new NetworkEvents.ForceCloseAllClients());

View File

@@ -0,0 +1,39 @@
package org.toop;
import com.google.common.base.Supplier;
import org.toop.eventbus.EventPublisher;
import org.toop.eventbus.GlobalEventBus;
import org.toop.eventbus.events.Events;
import org.toop.eventbus.events.NetworkEvents;
import org.toop.frontend.networking.NetworkingGameClientHandler;
public class MainTest {
MainTest() {
var ep = new EventPublisher<>(
Events.ServerEvents.StartServer.class,
5001,
"tictactoe"
).onEvent(
this::handleServerStarted
).unregisterAfterSuccess().postEvent();
// var ep = new EventPublisher<>(
// NetworkEvents.SendCommand.class,
// (Supplier<NetworkingGameClientHandler>) NetworkingGameClientHandler::new,
// "127.0.0.1",
// 5001
// ).onEventById(this::handleStartClientRequest).unregisterAfterSuccess().postEvent();
}
private void handleStartClientRequest(NetworkEvents.StartClientSuccess event) {
GlobalEventBus.post(new NetworkEvents.CloseClient((String) event.connectionId()));
}
private void handleServerStarted(Events.ServerEvents.ServerStarted event) {
System.out.println("Server started");
}
}

View File

@@ -26,7 +26,7 @@ import java.util.function.Consumer;
*
* @param <T> the type of event to publish, must extend EventWithUuid
*/
public class EventPublisher<T extends EventWithUuid> {
public class EventPublisher<T> {
private static final MethodHandles.Lookup LOOKUP = MethodHandles.lookup();
private static final Map<Class<?>, MethodHandle> CONSTRUCTOR_CACHE = new ConcurrentHashMap<>();
@@ -58,9 +58,10 @@ public class EventPublisher<T extends EventWithUuid> {
this.eventId = UUID.randomUUID().toString();
try {
boolean isUuidEvent = EventWithUuid.class.isAssignableFrom(postEventClass);
MethodHandle ctorHandle = CONSTRUCTOR_CACHE.computeIfAbsent(postEventClass, cls -> {
try {
// Build signature dynamically (arg types + String for UUID)
Class<?>[] paramTypes = cls.getDeclaredConstructors()[0].getParameterTypes();
MethodType mt = MethodType.methodType(void.class, paramTypes);
return LOOKUP.findConstructor(cls, mt);
@@ -69,41 +70,16 @@ public class EventPublisher<T extends EventWithUuid> {
}
});
// Append UUID to args
Object[] finalArgs = new Object[args.length + 1];
Object[] finalArgs;
if (isUuidEvent) {
// append UUID to args
finalArgs = new Object[args.length + 1];
System.arraycopy(args, 0, finalArgs, 0, args.length);
finalArgs[args.length] = this.eventId;
// --------------------
@SuppressWarnings("unchecked")
T instance = (T) ctorHandle.invokeWithArguments(finalArgs);
this.event = instance;
} catch (Throwable e) {
throw new RuntimeException("Failed to instantiate event", e);
} else {
// just forward args
finalArgs = args;
}
}
public EventPublisher(EventBus eventbus, Class<T> postEventClass, Object... args) {
this.eventId = UUID.randomUUID().toString();
try {
MethodHandle ctorHandle = CONSTRUCTOR_CACHE.computeIfAbsent(postEventClass, cls -> {
try {
// Build signature dynamically (arg types + String for UUID)
Class<?>[] paramTypes = cls.getDeclaredConstructors()[0].getParameterTypes();
MethodType mt = MethodType.methodType(void.class, paramTypes);
return LOOKUP.findConstructor(cls, mt);
} catch (Exception e) {
throw new RuntimeException("Failed to find constructor handle for " + cls, e);
}
});
// Append UUID to args
Object[] finalArgs = new Object[args.length + 1];
System.arraycopy(args, 0, finalArgs, 0, args.length);
finalArgs[args.length] = this.eventId;
// --------------------
@SuppressWarnings("unchecked")
T instance = (T) ctorHandle.invokeWithArguments(finalArgs);
@@ -141,6 +117,107 @@ public class EventPublisher<T extends EventWithUuid> {
return this;
}
/**
* Subscribes a listener for a specific event type, but only triggers the listener
* if the incoming event's UUID matches this EventPublisher's UUID.
*
* @param action the action (function) to execute when a matching event is received
* @param <TT> the type of the event to subscribe to; must extend EventWithUuid
* @return this EventPublisher instance, for chainable calls
*/
@SuppressWarnings("unchecked")
public <TT extends EventWithUuid> EventPublisher<T> onEventById(
Consumer<TT> action) {
this.listener = GlobalEventBus.subscribeAndRegister(event -> {
// Only process events that are EventWithUuid
if (event instanceof EventWithUuid uuidEvent) {
if (uuidEvent.eventId().equals(this.eventId)) {
try {
TT typedEvent = (TT) uuidEvent; // unchecked cast
action.accept(typedEvent);
if (unregisterAfterSuccess && listener != null) {
GlobalEventBus.unregister(listener);
}
this.result = typedEvent.result();
} catch (ClassCastException ignored) {
// TODO: Not the right type, ignore silently
}
}
}
});
return this;
}
/**
* Subscribes a listener for a specific event type. The listener will be invoked
* whenever an event of the given class is posted to the global event bus.
*
* <p>This overload provides type safety by requiring the event class explicitly
* and casting the incoming event before passing it to the provided action.</p>
*
* <pre>{@code
* new EventPublisher<>(MyEvent.class)
* .onEvent(MyEvent.class, e -> logger.info("Received: " + e))
* .postEvent();
* }</pre>
*
* @param eventClass the class of the event to subscribe to
* @param action the action to execute when an event of the given class is received
* @param <TT> the type of the event to subscribe to
* @return this EventPublisher instance, for chainable calls
*/
public <TT> EventPublisher<T> onEvent(Class<TT> eventClass, Consumer<TT> action) {
this.listener = GlobalEventBus.subscribeAndRegister(eventClass, event -> {
action.accept(eventClass.cast(event));
if (unregisterAfterSuccess && listener != null) {
GlobalEventBus.unregister(listener);
}
});
return this;
}
/**
* Subscribes a listener for events without requiring the event class explicitly.
* The listener will attempt to cast each posted event to the expected type.
* If the cast fails, the event is ignored silently.
*
* <p>This overload provides more concise syntax, but relies on an unchecked cast
* at runtime. Use {@link #onEvent(Class, Consumer)} if you prefer explicit
* type safety.</p>
*
* <pre>{@code
* new EventPublisher<>(MyEvent.class)
* .onEvent((MyEvent e) -> logger.info("Received: " + e))
* .postEvent();
* }</pre>
*
* @param action the action to execute when a matching event is received
* @param <TT> the type of the event to subscribe to
* @return this EventPublisher instance, for chainable calls
*/
@SuppressWarnings("unchecked")
public <TT> EventPublisher<T> onEvent(Consumer<TT> action) {
this.listener = GlobalEventBus.subscribeAndRegister(event -> {
try {
// unchecked cast if wrong type, ClassCastException is caught
TT typedEvent = (TT) event;
action.accept(typedEvent);
if (unregisterAfterSuccess && listener != null) {
GlobalEventBus.unregister(listener);
}
} catch (ClassCastException ignored) {
// Ignore events of unrelated types
}
});
return this;
}
/**
* Posts the event to the global event bus. This should generally be the
* final call in the chain.
@@ -164,6 +241,13 @@ public class EventPublisher<T extends EventWithUuid> {
return this;
}
public EventPublisher<T> unregisterNow() {
if (unregisterAfterSuccess && listener != null) {
GlobalEventBus.unregister(listener);
}
return this;
}
public Map<String, Object> getResult() {
if (this.result != null) {
return this.result;

View File

@@ -7,7 +7,7 @@ import java.util.concurrent.CompletableFuture;
import org.toop.core.Window;
/** Events that are used in the GlobalEventBus class. */
public class Events implements IEvents {
public class Events implements IEvent {
/**
* WIP, DO NOT USE!

View File

@@ -1,3 +1,3 @@
package org.toop.eventbus.events;
public interface IEvents {}
public interface IEvent {}

View File

@@ -9,6 +9,7 @@ import java.util.concurrent.ConcurrentHashMap;
import com.google.common.base.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.eventbus.EventPublisher;
import org.toop.eventbus.events.Events;
import org.toop.eventbus.GlobalEventBus;
import org.toop.eventbus.events.NetworkEvents;

View File

@@ -20,8 +20,8 @@ class EventPublisherStressTest {
}
}
private static final int THREADS = 100;
private static final long EVENTS_PER_THREAD = 2_000_000;
private static final int THREADS = 1;
private static final long EVENTS_PER_THREAD = 2_000_000_000;
@Tag("stress")
@Test