Fix eventbus problems (#265)

* Added unsubscribe to EventFlow. ListenerHandler now functional. GlobalEventbus now user listenerHandler

* getAllListeners

* Removed nulls

* Fixed stress tests

* Added docs, no more list creation when adding events to the bus.

* Fixed unsubscribe not working.

* Moved away from deprecated functions

* moved from wildcard to typed

* Moved away from deprecated function
This commit is contained in:
Bas Antonius de Jong
2025-11-30 14:22:05 +01:00
committed by GitHub
parent ec0ce4ea37
commit 25c02c7ad0
6 changed files with 682 additions and 411 deletions

View File

@@ -25,17 +25,16 @@ public class AudioEventListener<T extends AudioResource, K extends AudioResource
public AudioEventListener<?, ?> initListeners(String buttonSoundToPlay) { public AudioEventListener<?, ?> initListeners(String buttonSoundToPlay) {
new EventFlow() new EventFlow()
.listen(this::handleStopMusicManager) .listen(AudioEvents.StopAudioManager.class, this::handleStopMusicManager, false)
.listen(this::handlePlaySound) .listen(AudioEvents.PlayEffect.class, this::handlePlaySound, false)
.listen(this::handleSkipSong) .listen(AudioEvents.SkipMusic.class, this::handleSkipSong, false)
.listen(this::handlePauseSong) .listen(AudioEvents.PauseMusic.class, this::handlePauseSong, false)
.listen(this::handlePreviousSong) .listen(AudioEvents.PreviousMusic.class, this::handlePreviousSong, false)
.listen(this::handleStopSound) .listen(AudioEvents.StopEffect.class, this::handleStopSound, false)
.listen(this::handleMusicStart) .listen(AudioEvents.StartBackgroundMusic.class, this::handleMusicStart, false)
.listen(this::handleVolumeChange) .listen(AudioEvents.ChangeVolume.class, this::handleVolumeChange, false)
.listen(this::handleGetVolume) .listen(AudioEvents.GetVolume.class, this::handleGetVolume,false)
.listen(AudioEvents.ClickButton.class, _ -> .listen(AudioEvents.ClickButton.class, _ -> soundEffectManager.play(buttonSoundToPlay, false), false);
soundEffectManager.play(buttonSoundToPlay, false));
return this; return this;
} }

View File

@@ -38,7 +38,7 @@ public class EventFlow {
private EventType event = null; private EventType event = null;
/** The listener returned by GlobalEventBus subscription. Used for unsubscription. */ /** The listener returned by GlobalEventBus subscription. Used for unsubscription. */
private final List<ListenerHandler> listeners = new ArrayList<>(); private final List<ListenerHandler<?>> listeners = new ArrayList<>();
/** Holds the results returned from the subscribed event, if any. */ /** Holds the results returned from the subscribed event, if any. */
private Map<String, ?> result = null; private Map<String, ?> result = null;
@@ -46,16 +46,15 @@ public class EventFlow {
/** Empty constructor (event must be added via {@link #addPostEvent(Class, Object...)}). */ /** Empty constructor (event must be added via {@link #addPostEvent(Class, Object...)}). */
public EventFlow() {} public EventFlow() {}
public EventFlow addPostEvent(EventType event) { /**
this.event = event; *
return this; * Add an event that will be triggered when {@link #postEvent()} or {@link #asyncPostEvent()} is called.
} *
* @param eventClass The event that will be posted.
public EventFlow addPostEvent(Supplier<? extends EventType> eventSupplier) { * @param args The event arguments, see the added event record for more information.
this.event = eventSupplier.get(); * @return {@link #EventFlow}
return this; *
} */
public <T extends EventType> EventFlow addPostEvent(Class<T> eventClass, Object... args) { public <T extends EventType> EventFlow addPostEvent(Class<T> eventClass, Object... args) {
try { try {
boolean isUuidEvent = UniqueEvent.class.isAssignableFrom(eventClass); boolean isUuidEvent = UniqueEvent.class.isAssignableFrom(eventClass);
@@ -98,155 +97,418 @@ public class EventFlow {
} }
} }
/** Subscribe by ID: only fires if UUID matches this publisher's eventId. */ /**
public <TT extends ResponseToUniqueEvent> EventFlow onResponse( *
Class<TT> eventClass, Consumer<TT> action, boolean unsubscribeAfterSuccess) { * Add an event that will be triggered when {@link #postEvent()} or {@link #asyncPostEvent()} is called.
ListenerHandler[] listenerHolder = new ListenerHandler[1]; *
listenerHolder[0] = * @param event The event to be posted.
new ListenerHandler( * @return {@link #EventFlow}
GlobalEventBus.subscribe( *
eventClass, */
event -> { public EventFlow addPostEvent(EventType event) {
if (event.getIdentifier() != this.eventSnowflake) return; this.event = event;
action.accept(event);
if (unsubscribeAfterSuccess && listenerHolder[0] != null) {
GlobalEventBus.unsubscribe(listenerHolder[0]);
this.listeners.remove(listenerHolder[0]);
}
this.result = event.result();
}));
this.listeners.add(listenerHolder[0]);
return this; return this;
} }
/** Subscribe by ID: only fires if UUID matches this publisher's eventId. */ /**
public <TT extends ResponseToUniqueEvent> EventFlow onResponse(Class<TT> eventClass, Consumer<TT> action) { *
return this.onResponse(eventClass, action, true); * Add an event that will be triggered when {@link #postEvent()} or {@link #asyncPostEvent()} is called.
*
* @param eventSupplier The event that will be posted through a Supplier.
* @return {@link #EventFlow}
*
*/
public EventFlow addPostEvent(Supplier<? extends EventType> eventSupplier) {
this.event = eventSupplier.get();
return this;
} }
/** Subscribe by ID without explicit class. */ /**
*
* Start listening for an event and trigger when ID correlates.
*
* @param event The {@link ResponseToUniqueEvent} to trigger the lambda.
* @param action The lambda to run when triggered.
* @param unsubscribeAfterSuccess Enable/disable auto unsubscribing to event after being triggered.
* @param name A name given to the event, can later be used to unsubscribe.
* @return {@link #EventFlow}
*
*/
public <TT extends ResponseToUniqueEvent> EventFlow onResponse(
Class<TT> event, Consumer<TT> action, boolean unsubscribeAfterSuccess, String name
) {
final long id = SnowflakeGenerator.nextId();
Consumer<TT> newAction = eventClass -> {
if (eventClass.getIdentifier() != this.eventSnowflake) return;
action.accept(eventClass);
if (unsubscribeAfterSuccess) unsubscribe(id);
this.result = eventClass.result();
};
// TODO Remove casts
var listener = new ListenerHandler<>(
id,
name,
(Class<ResponseToUniqueEvent>) event,
(Consumer<ResponseToUniqueEvent>) newAction
);
GlobalEventBus.subscribe(listener);
this.listeners.add(listener);
return this;
}
/**
*
* Start listening for an event and trigger when ID correlates, auto unsubscribes after being triggered and adds an empty name.
*
* @param event The {@link ResponseToUniqueEvent} to trigger the lambda.
* @param action The lambda to run when triggered.
* @return {@link #EventFlow}
*
*/
public <TT extends ResponseToUniqueEvent> EventFlow onResponse(Class<TT> event, Consumer<TT> action) {
return this.onResponse(event, action, true, "");
}
/**
*
* Start listening for an event and trigger when ID correlates, auto adds an empty name.
*
* @param event The {@link ResponseToUniqueEvent} to trigger the lambda.
* @param action The lambda to run when triggered.
* @param unsubscribeAfterSuccess Enable/disable auto unsubscribing to event after being triggered.
* @return {@link #EventFlow}
*
*/
public <TT extends ResponseToUniqueEvent> EventFlow onResponse(Class<TT> event, Consumer<TT> action, boolean unsubscribeAfterSuccess) {
return this.onResponse(event, action, unsubscribeAfterSuccess, "");
}
/**
*
* Start listening for an event and trigger when ID correlates, auto unsubscribes after being triggered.
*
* @param event The {@link ResponseToUniqueEvent} to trigger the lambda.
* @param action The lambda to run when triggered.
* @param name A name given to the event, can later be used to unsubscribe.
* @return {@link #EventFlow}
*
*/
public <TT extends ResponseToUniqueEvent> EventFlow onResponse(Class<TT> event, Consumer<TT> action, String name) {
return this.onResponse(event, action, true, name);
}
/**
*
* Subscribe by ID without explicit class.
*
* @param action The lambda to run when triggered.
* @return {@link #EventFlow}
*
* @deprecated use {@link #onResponse(Class, Consumer, boolean, String)} instead.
*/
@Deprecated
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <TT extends ResponseToUniqueEvent> EventFlow onResponse( public <TT extends ResponseToUniqueEvent> EventFlow onResponse(
Consumer<TT> action, boolean unsubscribeAfterSuccess) { Consumer<TT> action, boolean unsubscribeAfterSuccess, String name) {
ListenerHandler[] listenerHolder = new ListenerHandler[1];
listenerHolder[0] = final long id = SnowflakeGenerator.nextId();
new ListenerHandler(
GlobalEventBus.subscribe( Consumer<TT> newAction = event -> {
event -> { if (!(event instanceof UniqueEvent uuidEvent)) return;
if (!(event instanceof UniqueEvent uuidEvent)) return; if (uuidEvent.getIdentifier() == this.eventSnowflake) {
if (uuidEvent.getIdentifier() == this.eventSnowflake) { try {
try { TT typedEvent = (TT) uuidEvent;
TT typedEvent = (TT) uuidEvent; action.accept(typedEvent);
action.accept(typedEvent);
if (unsubscribeAfterSuccess if (unsubscribeAfterSuccess) unsubscribe(id);
&& listenerHolder[0] != null) {
GlobalEventBus.unsubscribe(listenerHolder[0]); this.result = typedEvent.result();
this.listeners.remove(listenerHolder[0]); } catch (ClassCastException _) {
} throw new ClassCastException(
this.result = typedEvent.result(); "Cannot cast "
} catch (ClassCastException _) { + event.getClass().getName()
throw new ClassCastException( + " to UniqueEvent");
"Cannot cast " }
+ event.getClass().getName() }
+ " to UniqueEvent"); };
}
} var listener = new ListenerHandler<>(
})); id,
this.listeners.add(listenerHolder[0]); name,
(Class<TT>) action.getClass().getDeclaredMethods()[0].getParameterTypes()[0],
newAction
);
GlobalEventBus.subscribe(listener);
this.listeners.add(listener);
return this; return this;
} }
/**
*
* Subscribe by ID without explicit class.
*
* @param action The lambda to run when triggered.
* @return {@link #EventFlow}
*
* @deprecated use {@link #onResponse(Class, Consumer)} instead.
*/
@Deprecated
public <TT extends ResponseToUniqueEvent> EventFlow onResponse(Consumer<TT> action) { public <TT extends ResponseToUniqueEvent> EventFlow onResponse(Consumer<TT> action) {
return this.onResponse(action, true); return this.onResponse(action, true, "");
} }
/**
*
* Start listening for an event, and run a lambda when triggered.
*
* @param event The {@link EventType} to trigger the lambda.
* @param action The lambda to run when triggered.
* @param unsubscribeAfterSuccess Enable/disable auto unsubscribing to event after being triggered.
* @param name A name given to the event, can later be used to unsubscribe.
* @return {@link #EventFlow}
*
*/
public <TT extends EventType> EventFlow listen( public <TT extends EventType> EventFlow listen(
Class<TT> eventClass, Consumer<TT> action, boolean unsubscribeAfterSuccess) { Class<TT> event, Consumer<TT> action, boolean unsubscribeAfterSuccess, String name) {
ListenerHandler[] listenerHolder = new ListenerHandler[1];
listenerHolder[0] =
new ListenerHandler(
GlobalEventBus.subscribe(
eventClass,
event -> {
action.accept(event);
if (unsubscribeAfterSuccess && listenerHolder[0] != null) { long id = SnowflakeGenerator.nextId();
GlobalEventBus.unsubscribe(listenerHolder[0]);
this.listeners.remove(listenerHolder[0]); Consumer<TT> newAction = eventc -> {
} action.accept(eventc);
}));
this.listeners.add(listenerHolder[0]); if (unsubscribeAfterSuccess) unsubscribe(id);
};
var listener = new ListenerHandler<>(
id,
name,
event,
newAction
);
GlobalEventBus.subscribe(listener);
this.listeners.add(listener);
return this; return this;
} }
public <TT extends EventType> EventFlow listen(Class<TT> eventClass, Consumer<TT> action) { /**
return this.listen(eventClass, action, true); *
* Start listening for an event, and run a lambda when triggered, auto unsubscribes.
*
* @param event The {@link EventType} to trigger the lambda.
* @param action The lambda to run when triggered.
* @param name A name given to the event, can later be used to unsubscribe.
* @return {@link #EventFlow}
*
*/
public <TT extends EventType> EventFlow listen(Class<TT> event, Consumer<TT> action, String name) {
return this.listen(event, action, true, name);
} }
/**
*
* Start listening for an event, and run a lambda when triggered, auto unsubscribe and gives it an empty name.
*
* @param event The {@link EventType} to trigger the lambda.
* @param action The lambda to run when triggered.
* @return {@link #EventFlow}
*
*/
public <TT extends EventType> EventFlow listen(Class<TT> event, Consumer<TT> action) {
return this.listen(event, action, true, "");
}
/**
*
* Start listening for an event, and run a lambda when triggered, adds an empty name.
*
* @param event The {@link EventType} to trigger the lambda.
* @param action The lambda to run when triggered.
* @param unsubscribeAfterSuccess Enable/disable auto unsubscribing to event after being triggered.
* @return {@link #EventFlow}
*
*/
public <TT extends EventType> EventFlow listen(Class<TT> event, Consumer<TT> action, boolean unsubscribeAfterSuccess) {
return this.listen(event, action, unsubscribeAfterSuccess, "");
}
/**
*
* Start listening to an event.
*
* @param action The lambda to run when triggered.
* @return {@link EventFlow}
*
* @deprecated use {@link #listen(Class, Consumer, boolean, String)} instead.
*/
@Deprecated
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <TT extends EventType> EventFlow listen( public <TT extends EventType> EventFlow listen(
Consumer<TT> action, boolean unsubscribeAfterSuccess) { Consumer<TT> action, boolean unsubscribeAfterSuccess, String name) {
ListenerHandler[] listenerHolder = new ListenerHandler[1]; long id = SnowflakeGenerator.nextId();
listenerHolder[0] =
new ListenerHandler( Class<TT> eventClass = (Class<TT>) action.getClass().getDeclaredMethods()[0].getParameterTypes()[0];
GlobalEventBus.subscribe(
event -> { Consumer<TT> newAction = event -> {
if (!(event instanceof EventType nonUuidEvent)) return; if (!(event instanceof EventType nonUuidEvent)) return;
try { try {
TT typedEvent = (TT) nonUuidEvent; TT typedEvent = (TT) nonUuidEvent;
action.accept(typedEvent); action.accept(typedEvent);
if (unsubscribeAfterSuccess && listenerHolder[0] != null) { if (unsubscribeAfterSuccess) unsubscribe(id);
GlobalEventBus.unsubscribe(listenerHolder[0]); } catch (ClassCastException _) {
this.listeners.remove(listenerHolder[0]); throw new ClassCastException(
} "Cannot cast "
} catch (ClassCastException _) { + event.getClass().getName()
throw new ClassCastException( + " to UniqueEvent");
"Cannot cast " }
+ event.getClass().getName() };
+ " to UniqueEvent");
} var listener = new ListenerHandler<>(
})); id,
this.listeners.add(listenerHolder[0]); name,
eventClass,
newAction
);
GlobalEventBus.subscribe(listener);
this.listeners.add(listener);
return this; return this;
} }
/**
*
* Start listening to an event.
*
* @param action The lambda to run when triggered.
* @return {@link EventFlow}
*
* @deprecated use {@link #listen(Class, Consumer)} instead.
*/
@Deprecated
public <TT extends EventType> EventFlow listen(Consumer<TT> action) { public <TT extends EventType> EventFlow listen(Consumer<TT> action) {
return this.listen(action, true); return this.listen(action, true, "");
} }
/** Post synchronously */ /**
* Posts the event added through {@link #addPostEvent}.
*/
public EventFlow postEvent() { public EventFlow postEvent() {
GlobalEventBus.post(this.event); GlobalEventBus.post(this.event);
return this; return this;
} }
/** Post asynchronously */ /**
* Posts the event added through {@link #addPostEvent} asynchronously.
*/
public EventFlow asyncPostEvent() { public EventFlow asyncPostEvent() {
GlobalEventBus.postAsync(this.event); GlobalEventBus.postAsync(this.event);
return this; return this;
} }
/**
*
* Unsubscribe from an event.
*
* @param listenerObject The listener object to remove and unsubscribe.
*/
public void unsubscribe(Object listenerObject) {
this.listeners.removeIf(handler -> {
if (handler.getListener() == listenerObject) {
GlobalEventBus.unsubscribe(handler);
return true;
}
return false;
});
}
/**
*
* Unsubscribe from an event.
*
* @param listenerId The id given to the {@link ListenerHandler}.
*/
public void unsubscribe(long listenerId) {
this.listeners.removeIf(handler -> {
if (handler.getId() == listenerId) {
GlobalEventBus.unsubscribe(handler);
return true;
}
return false;
});
}
/**
* Unsubscribe from an event.
*
* @param name The name given to the listener.
*/
public void unsubscribe(String name) {
this.listeners.removeIf(handler -> {
if (handler.getName().equals(name)) {
GlobalEventBus.unsubscribe(handler);
return true;
}
return false;
});
}
/**
* Unsubscribe all events.
*/
public void unsubscribeAll() {
this.listeners.forEach(this::unsubscribe);
}
/**
* Clean and remove everything inside {@link EventFlow}.
*/
private void clean() { private void clean() {
this.listeners.clear(); this.listeners.clear();
this.event = null; this.event = null;
this.result = null; this.result = null;
} // TODO } // TODO
/**
* TODO
*
* @return TODO
*/
public Map<String, ?> getResult() { public Map<String, ?> getResult() {
return this.result; return this.result;
} }
/**
* TODO
*
* @return TODO
*/
public EventType getEvent() { public EventType getEvent() {
return event; return event;
} }
/**
*
* Returns a copy of the list of listeners.
*
* @return Copy of the list of listeners.
*/
public ListenerHandler[] getListeners() { public ListenerHandler[] getListeners() {
return listeners.toArray(new ListenerHandler[0]); return listeners.toArray(new ListenerHandler[0]);
} }
/**
* Returns the generated snowflake for the {@link EventFlow}
*
* @return The generated snowflake for this {@link EventFlow}
*/
public long getEventSnowflake() { public long getEventSnowflake() {
return eventSnowflake; return eventSnowflake;
} }

View File

@@ -16,7 +16,7 @@ import org.toop.framework.eventbus.events.UniqueEvent;
public final class GlobalEventBus { public final class GlobalEventBus {
/** Map of event class to type-specific listeners. */ /** Map of event class to type-specific listeners. */
private static final Map<Class<?>, CopyOnWriteArrayList<Consumer<? super EventType>>> private static final Map<Class<?>, CopyOnWriteArrayList<ListenerHandler<?>>>
LISTENERS = new ConcurrentHashMap<>(); LISTENERS = new ConcurrentHashMap<>();
/** Map of event class to Snowflake-ID-specific listeners. */ /** Map of event class to Snowflake-ID-specific listeners. */
@@ -49,7 +49,6 @@ public final class GlobalEventBus {
ProducerType.MULTI, ProducerType.MULTI,
new BusySpinWaitStrategy()); new BusySpinWaitStrategy());
// Single consumer that dispatches to subscribers
DISRUPTOR.handleEventsWith( DISRUPTOR.handleEventsWith(
(holder, seq, endOfBatch) -> { (holder, seq, endOfBatch) -> {
if (holder.event != null) { if (holder.event != null) {
@@ -73,23 +72,11 @@ public final class GlobalEventBus {
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Subscription // Subscription
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
public static <T extends EventType> Consumer<? super EventType> subscribe( public static <T extends EventType> void subscribe(ListenerHandler<T> listener) {
Class<T> eventClass, Consumer<T> listener) { LISTENERS.computeIfAbsent(listener.getListenerClass(), _ -> new CopyOnWriteArrayList<>()).add(listener);
CopyOnWriteArrayList<Consumer<? super EventType>> list =
LISTENERS.computeIfAbsent(eventClass, k -> new CopyOnWriteArrayList<>());
Consumer<? super EventType> wrapper = event -> listener.accept(eventClass.cast(event));
list.add(wrapper);
return wrapper;
}
public static Consumer<? super EventType> subscribe(Consumer<Object> listener) {
Consumer<? super EventType> wrapper = event -> listener.accept(event);
LISTENERS.computeIfAbsent(Object.class, _ -> new CopyOnWriteArrayList<>()).add(wrapper);
return wrapper;
} }
// TODO
public static <T extends UniqueEvent> void subscribeById( public static <T extends UniqueEvent> void subscribeById(
Class<T> eventClass, long eventId, Consumer<T> listener) { Class<T> eventClass, long eventId, Consumer<T> listener) {
UUID_LISTENERS UUID_LISTENERS
@@ -97,10 +84,14 @@ public final class GlobalEventBus {
.put(eventId, listener); .put(eventId, listener);
} }
public static void unsubscribe(Object listener) { public static void unsubscribe(ListenerHandler<?> listener) {
LISTENERS.values().forEach(list -> list.remove(listener)); // TODO suspicious call
LISTENERS.getOrDefault(listener.getListenerClass(), new CopyOnWriteArrayList<>())
.remove(listener);
LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty());
} }
// TODO
public static <T extends UniqueEvent> void unsubscribeById( public static <T extends UniqueEvent> void unsubscribeById(
Class<T> eventClass, long eventId) { Class<T> eventClass, long eventId) {
Map<Long, Consumer<? extends UniqueEvent>> map = UUID_LISTENERS.get(eventClass); Map<Long, Consumer<? extends UniqueEvent>> map = UUID_LISTENERS.get(eventClass);
@@ -124,36 +115,42 @@ public final class GlobalEventBus {
} }
} }
@SuppressWarnings("unchecked")
private static <T extends EventType> void callListener(ListenerHandler<?> raw, EventType event) {
ListenerHandler<T> handler = (ListenerHandler<T>) raw;
Consumer<T> listener = handler.getListener();
T casted = (T) event;
listener.accept(casted);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private static void dispatchEvent(EventType event) { private static void dispatchEvent(EventType event) {
Class<?> clazz = event.getClass(); Class<?> clazz = event.getClass();
// class-specific listeners CopyOnWriteArrayList<ListenerHandler<?>> classListeners = LISTENERS.get(clazz);
CopyOnWriteArrayList<Consumer<? super EventType>> classListeners = LISTENERS.get(clazz);
if (classListeners != null) { if (classListeners != null) {
for (Consumer<? super EventType> listener : classListeners) { for (ListenerHandler<?> listener : classListeners) {
try { try {
listener.accept(event); callListener(listener, event);
} catch (Throwable e) { } catch (Throwable e) {
// e.printStackTrace(); // e.printStackTrace();
} }
} }
} }
// generic listeners CopyOnWriteArrayList<ListenerHandler<?>> genericListeners = LISTENERS.get(Object.class);
CopyOnWriteArrayList<Consumer<? super EventType>> genericListeners =
LISTENERS.get(Object.class);
if (genericListeners != null) { if (genericListeners != null) {
for (Consumer<? super EventType> listener : genericListeners) { for (ListenerHandler<?> listener : genericListeners) {
try { try {
listener.accept(event); callListener(listener, event);
} catch (Throwable e) { } catch (Throwable e) {
// e.printStackTrace(); // e.printStackTrace();
} }
} }
} }
// snowflake listeners
if (event instanceof UniqueEvent snowflakeEvent) { if (event instanceof UniqueEvent snowflakeEvent) {
Map<Long, Consumer<? extends UniqueEvent>> map = UUID_LISTENERS.get(clazz); Map<Long, Consumer<? extends UniqueEvent>> map = UUID_LISTENERS.get(clazz);
if (map != null) { if (map != null) {
@@ -182,4 +179,8 @@ public final class GlobalEventBus {
LISTENERS.clear(); LISTENERS.clear();
UUID_LISTENERS.clear(); UUID_LISTENERS.clear();
} }
public static Map<Class<?>, CopyOnWriteArrayList<ListenerHandler<?>>> getAllListeners() {
return LISTENERS;
}
} }

View File

@@ -1,25 +1,48 @@
package org.toop.framework.eventbus; package org.toop.framework.eventbus;
public class ListenerHandler { import org.toop.framework.SnowflakeGenerator;
private Object listener; import org.toop.framework.eventbus.events.EventType;
// private boolean unsubscribeAfterSuccess = true; import java.util.function.Consumer;
// public ListenerHandler(Object listener, boolean unsubAfterSuccess) { public class ListenerHandler<T extends EventType> {
// this.listener = listener; private final long id;
// this.unsubscribeAfterSuccess = unsubAfterSuccess; private final String name;
// } private final Class<T> clazz;
private final Consumer<T> listener;
public ListenerHandler(Object listener) { public ListenerHandler(long id, String name, Class<T> clazz, Consumer<T> listener) {
this.id = id;
this.name = name;
this.clazz = clazz;
this.listener = listener; this.listener = listener;
} }
public Object getListener() { public ListenerHandler(String name, Class<T> clazz, Consumer<T> listener) {
return this.listener; this(SnowflakeGenerator.nextId(), name, clazz, listener);
} }
// public boolean isUnsubscribeAfterSuccess() { public ListenerHandler(long id, Class<T> clazz, Consumer<T> listener) {
// return this.unsubscribeAfterSuccess; this(id, String.valueOf(id), clazz, listener);
// } }
public ListenerHandler(Class<T> clazz, Consumer<T> listener) {
this(SnowflakeGenerator.nextId(), clazz, listener);
}
public long getId() {
return id;
}
public Consumer<T> getListener() {
return listener;
}
public Class<T> getListenerClass() {
return clazz;
}
public String getName() {
return name;
}
} }

View File

@@ -17,25 +17,25 @@ public class NetworkingClientEventListener {
public NetworkingClientEventListener(NetworkingClientManager clientManager) { public NetworkingClientEventListener(NetworkingClientManager clientManager) {
this.clientManager = clientManager; this.clientManager = clientManager;
new EventFlow() new EventFlow()
.listen(this::handleStartClient) .listen(NetworkEvents.StartClient.class, this::handleStartClient, false)
.listen(this::handleCommand) .listen(NetworkEvents.SendCommand.class, this::handleCommand, false)
.listen(this::handleSendLogin) .listen(NetworkEvents.SendLogin.class, this::handleSendLogin, false)
.listen(this::handleSendLogout) .listen(NetworkEvents.SendLogout.class, this::handleSendLogout, false)
.listen(this::handleSendGetPlayerlist) .listen(NetworkEvents.SendGetPlayerlist.class, this::handleSendGetPlayerlist, false)
.listen(this::handleSendGetGamelist) .listen(NetworkEvents.SendGetGamelist.class, this::handleSendGetGamelist, false)
.listen(this::handleSendSubscribe) .listen(NetworkEvents.SendSubscribe.class, this::handleSendSubscribe, false)
.listen(this::handleSendMove) .listen(NetworkEvents.SendMove.class, this::handleSendMove, false)
.listen(this::handleSendChallenge) .listen(NetworkEvents.SendChallenge.class, this::handleSendChallenge, false)
.listen(this::handleSendAcceptChallenge) .listen(NetworkEvents.SendAcceptChallenge.class, this::handleSendAcceptChallenge, false)
.listen(this::handleSendForfeit) .listen(NetworkEvents.SendForfeit.class, this::handleSendForfeit, false)
.listen(this::handleSendMessage) .listen(NetworkEvents.SendMessage.class, this::handleSendMessage, false)
.listen(this::handleSendHelp) .listen(NetworkEvents.SendHelp.class, this::handleSendHelp, false)
.listen(this::handleSendHelpForCommand) .listen(NetworkEvents.SendHelpForCommand.class, this::handleSendHelpForCommand, false)
.listen(this::handleCloseClient) .listen(NetworkEvents.CloseClient.class, this::handleCloseClient, false)
.listen(this::handleReconnect) .listen(NetworkEvents.Reconnect.class, this::handleReconnect, false)
.listen(this::handleChangeAddress) .listen(NetworkEvents.ChangeAddress.class, this::handleChangeAddress, false)
.listen(this::handleGetAllConnections) .listen(NetworkEvents.RequestsAllClients.class, this::handleGetAllConnections, false)
.listen(this::handleShutdownAll); .listen(NetworkEvents.ForceCloseAllClients.class, this::handleShutdownAll, false);
} }
void handleStartClient(NetworkEvents.StartClient event) { void handleStartClient(NetworkEvents.StartClient event) {

View File

@@ -1,235 +1,222 @@
// package org.toop.framework.eventbus; package org.toop.framework.eventbus;
//
// import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
// import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
// import org.toop.framework.eventbus.events.UniqueEvent; import org.toop.framework.eventbus.events.ResponseToUniqueEvent;
//
// import java.math.BigInteger; import java.math.BigInteger;
// import java.util.concurrent.*; import java.util.concurrent.*;
// import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
//
// import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
//
// class EventFlowStressTest { class EventFlowStressTest {
//
// /** Top-level record to ensure runtime type matches subscription */ public record HeavyEvent(String payload, long eventSnowflake) implements ResponseToUniqueEvent {
// public record HeavyEvent(String payload, long eventSnowflake) implements UniqueEvent { @Override
// @Override public long getIdentifier() {
// public java.util.Map<String, Object> result() { return eventSnowflake;
// return java.util.Map.of("payload", payload, "eventId", eventSnowflake); }
// } }
//
// @Override public record HeavyEventSuccess(String payload, long eventSnowflake) implements ResponseToUniqueEvent {
// public long eventSnowflake() { @Override
// return this.eventSnowflake; public long getIdentifier() {
// } return eventSnowflake;
// } }
// }
// public record HeavyEventSuccess(String payload, long eventSnowflake) implements
// UniqueEvent { private static final int THREADS = 32;
// @Override private static final long EVENTS_PER_THREAD = 10_000_000;
// public java.util.Map<String, Object> result() {
// return java.util.Map.of("payload", payload, "eventId", eventSnowflake); @Tag("stress")
// } @Test
// void extremeConcurrencySendTest_progressWithMemory() throws InterruptedException {
// @Override LongAdder counter = new LongAdder();
// public long eventSnowflake() { ExecutorService executor = Executors.newFixedThreadPool(THREADS);
// return eventSnowflake;
// } BigInteger totalEvents = BigInteger.valueOf(THREADS)
// } .multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
//
// private static final int THREADS = 32; long startTime = System.currentTimeMillis();
// private static final long EVENTS_PER_THREAD = 10_000_000;
// Thread monitor = new Thread(() -> {
// @Tag("stress") long lastCount = 0;
// @Test long lastTime = System.currentTimeMillis();
// void extremeConcurrencySendTest_progressWithMemory() throws InterruptedException { Runtime runtime = Runtime.getRuntime();
// LongAdder counter = new LongAdder();
// ExecutorService executor = Executors.newFixedThreadPool(THREADS); while (counter.sum() < totalEvents.longValue()) {
// try { Thread.sleep(200); } catch (InterruptedException ignored) {}
// BigInteger totalEvents = BigInteger.valueOf(THREADS)
// .multiply(BigInteger.valueOf(EVENTS_PER_THREAD)); long now = System.currentTimeMillis();
// long completed = counter.sum();
// long startTime = System.currentTimeMillis(); long eventsThisPeriod = completed - lastCount;
// double eps = eventsThisPeriod / ((now - lastTime) / 1000.0);
// // Monitor thread for EPS and memory
// Thread monitor = new Thread(() -> { long usedMemory = runtime.totalMemory() - runtime.freeMemory();
// long lastCount = 0; double usedPercent = usedMemory * 100.0 / runtime.maxMemory();
// long lastTime = System.currentTimeMillis();
// Runtime runtime = Runtime.getRuntime(); System.out.printf(
// "Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n",
// while (counter.sum() < totalEvents.longValue()) { completed,
// try { Thread.sleep(200); } catch (InterruptedException ignored) {} totalEvents.longValue(),
// completed * 100.0 / totalEvents.doubleValue(),
// long now = System.currentTimeMillis(); eps,
// long completed = counter.sum(); usedMemory / 1024.0 / 1024.0,
// long eventsThisPeriod = completed - lastCount; usedPercent
// double eps = eventsThisPeriod / ((now - lastTime) / 1000.0); );
//
// long usedMemory = runtime.totalMemory() - runtime.freeMemory(); lastCount = completed;
// double usedPercent = usedMemory * 100.0 / runtime.maxMemory(); lastTime = now;
// }
// System.out.printf( });
// "Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n", monitor.setDaemon(true);
// completed, monitor.start();
// totalEvents.longValue(),
// completed * 100.0 / totalEvents.doubleValue(), var listener = new EventFlow().listen(HeavyEvent.class, _ -> counter.increment());
// eps,
// usedMemory / 1024.0 / 1024.0, for (int t = 0; t < THREADS; t++) {
// usedPercent executor.submit(() -> {
// ); for (int i = 0; i < EVENTS_PER_THREAD; i++) {
// var _ = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i)
// lastCount = completed; .asyncPostEvent();
// lastTime = now; }
// } });
// }); }
// monitor.setDaemon(true);
// monitor.start(); executor.shutdown();
// executor.awaitTermination(10, TimeUnit.MINUTES);
// var listener = new EventFlow().listen(HeavyEvent.class, _ -> counter.increment());
// listener.getResult();
// // Submit events asynchronously
// for (int t = 0; t < THREADS; t++) { long endTime = System.currentTimeMillis();
// executor.submit(() -> { double durationSeconds = (endTime - startTime) / 1000.0;
// for (int i = 0; i < EVENTS_PER_THREAD; i++) {
// var _ = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i) System.out.println("Posted " + totalEvents + " events in " + durationSeconds + " seconds");
// .asyncPostEvent(); double averageEps = totalEvents.doubleValue() / durationSeconds;
// } System.out.printf("Average EPS: %.0f%n", averageEps);
// });
// } assertEquals(totalEvents.longValue(), counter.sum());
// }
// executor.shutdown();
// executor.awaitTermination(10, TimeUnit.MINUTES); @Tag("stress")
// @Test
// listener.getResult(); void extremeConcurrencySendAndReturnTest_progressWithMemory() throws InterruptedException {
// LongAdder counter = new LongAdder();
// long endTime = System.currentTimeMillis(); ExecutorService executor = Executors.newFixedThreadPool(THREADS);
// double durationSeconds = (endTime - startTime) / 1000.0;
// BigInteger totalEvents = BigInteger.valueOf(THREADS)
// System.out.println("Posted " + totalEvents + " events in " + durationSeconds + " .multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
// seconds");
// double averageEps = totalEvents.doubleValue() / durationSeconds; long startTime = System.currentTimeMillis();
// System.out.printf("Average EPS: %.0f%n", averageEps);
// Thread monitor = new Thread(() -> {
// assertEquals(totalEvents.longValue(), counter.sum()); long lastCount = 0;
// } long lastTime = System.currentTimeMillis();
// Runtime runtime = Runtime.getRuntime();
// @Tag("stress")
// @Test while (counter.sum() < totalEvents.longValue()) {
// void extremeConcurrencySendAndReturnTest_progressWithMemory() throws InterruptedException { try { Thread.sleep(500); } catch (InterruptedException ignored) {}
// LongAdder counter = new LongAdder();
// ExecutorService executor = Executors.newFixedThreadPool(THREADS); long now = System.currentTimeMillis();
// long completed = counter.sum();
// BigInteger totalEvents = BigInteger.valueOf(THREADS) long eventsThisPeriod = completed - lastCount;
// .multiply(BigInteger.valueOf(EVENTS_PER_THREAD)); double eps = eventsThisPeriod / ((now - lastTime) / 1000.0);
//
// long startTime = System.currentTimeMillis(); long usedMemory = runtime.totalMemory() - runtime.freeMemory();
// double usedPercent = usedMemory * 100.0 / runtime.maxMemory();
// // Monitor thread for EPS and memory
// Thread monitor = new Thread(() -> { System.out.printf(
// long lastCount = 0; "Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n",
// long lastTime = System.currentTimeMillis(); completed,
// Runtime runtime = Runtime.getRuntime(); totalEvents.longValue(),
// completed * 100.0 / totalEvents.doubleValue(),
// while (counter.sum() < totalEvents.longValue()) { eps,
// try { Thread.sleep(200); } catch (InterruptedException ignored) {} usedMemory / 1024.0 / 1024.0,
// usedPercent
// long now = System.currentTimeMillis(); );
// long completed = counter.sum();
// long eventsThisPeriod = completed - lastCount; lastCount = completed;
// double eps = eventsThisPeriod / ((now - lastTime) / 1000.0); lastTime = now;
// }
// long usedMemory = runtime.totalMemory() - runtime.freeMemory(); });
// double usedPercent = usedMemory * 100.0 / runtime.maxMemory(); monitor.setDaemon(true);
// monitor.start();
// System.out.printf(
// "Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n", EventFlow sharedFlow = new EventFlow();
// completed, sharedFlow.listen(HeavyEventSuccess.class, _ -> counter.increment(), false, "heavyEventSuccessListener");
// totalEvents.longValue(),
// completed * 100.0 / totalEvents.doubleValue(), for (int t = 0; t < THREADS; t++) {
// eps, executor.submit(() -> {
// usedMemory / 1024.0 / 1024.0, EventFlow threadFlow = new EventFlow();
// usedPercent
// ); for (int i = 0; i < EVENTS_PER_THREAD; i++) {
// var heavyEvent = threadFlow.addPostEvent(HeavyEvent.class, "payload-" + i)
// lastCount = completed; .postEvent();
// lastTime = now;
// } threadFlow.addPostEvent(HeavyEventSuccess.class, "payload-" + i, heavyEvent.getEventSnowflake())
// }); .postEvent();
// monitor.setDaemon(true); }
// monitor.start(); });
// }
// // Submit events asynchronously
// for (int t = 0; t < THREADS; t++) { executor.shutdown();
// executor.submit(() -> { executor.awaitTermination(10, TimeUnit.MINUTES);
// for (int i = 0; i < EVENTS_PER_THREAD; i++) {
// var a = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i) long endTime = System.currentTimeMillis();
// .onResponse(HeavyEventSuccess.class, _ -> counter.increment()) double durationSeconds = (endTime - startTime) / 1000.0;
// .postEvent();
// System.out.println("Posted " + totalEvents + " events in " + durationSeconds + " seconds");
// new EventFlow().addPostEvent(HeavyEventSuccess.class, "payload-" + i, double averageEps = totalEvents.doubleValue() / durationSeconds;
// a.getEventSnowflake()) System.out.printf("Average EPS: %.0f%n", averageEps);
// .postEvent();
// } assertEquals(totalEvents.longValue(), counter.sum());
// }); }
// }
// @Tag("stress")
// executor.shutdown(); @Test
// executor.awaitTermination(10, TimeUnit.MINUTES); void efficientExtremeConcurrencyTest() throws InterruptedException {
// final int THREADS = Runtime.getRuntime().availableProcessors();
// long endTime = System.currentTimeMillis(); final int EVENTS_PER_THREAD = 1_000_000;
// double durationSeconds = (endTime - startTime) / 1000.0;
// ExecutorService executor = Executors.newFixedThreadPool(THREADS);
// System.out.println("Posted " + totalEvents + " events in " + durationSeconds + " ConcurrentLinkedQueue<HeavyEvent> processedEvents = new ConcurrentLinkedQueue<>();
// seconds");
// double averageEps = totalEvents.doubleValue() / durationSeconds; long start = System.nanoTime();
// System.out.printf("Average EPS: %.0f%n", averageEps);
// EventFlow sharedFlow = new EventFlow();
// assertEquals(totalEvents.longValue(), counter.sum()); sharedFlow.listen(HeavyEvent.class, processedEvents::add, false, "heavyEventListener");
// }
// for (int t = 0; t < THREADS; t++) {
// executor.submit(() -> {
// @Tag("stress") EventFlow threadFlow = new EventFlow();
// @Test
// void efficientExtremeConcurrencyTest() throws InterruptedException { for (int i = 0; i < EVENTS_PER_THREAD; i++) {
// final int THREADS = Runtime.getRuntime().availableProcessors(); threadFlow.addPostEvent(HeavyEvent.class, "payload-" + i)
// final int EVENTS_PER_THREAD = 5000; .postEvent();
// }
// ExecutorService executor = Executors.newFixedThreadPool(THREADS); });
// ConcurrentLinkedQueue<HeavyEvent> processedEvents = new ConcurrentLinkedQueue<>(); }
//
// long start = System.nanoTime(); executor.shutdown();
// executor.awaitTermination(10, TimeUnit.MINUTES);
// for (int t = 0; t < THREADS; t++) {
// executor.submit(() -> { long end = System.nanoTime();
// for (int i = 0; i < EVENTS_PER_THREAD; i++) { double durationSeconds = (end - start) / 1_000_000_000.0;
// new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i)
// .onResponse(HeavyEvent.class, processedEvents::add) BigInteger totalEvents = BigInteger.valueOf(THREADS)
// .postEvent(); .multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
// } double eps = totalEvents.doubleValue() / durationSeconds;
// });
// } System.out.printf("Posted %s events in %.3f seconds%n", totalEvents, durationSeconds);
// System.out.printf("Throughput: %.0f events/sec%n", eps);
// executor.shutdown();
// executor.awaitTermination(10, TimeUnit.MINUTES); Runtime rt = Runtime.getRuntime();
// System.out.printf("Used memory: %.2f MB%n", (rt.totalMemory() - rt.freeMemory()) / 1024.0 / 1024.0);
// long end = System.nanoTime();
// double durationSeconds = (end - start) / 1_000_000_000.0; assertEquals(totalEvents.intValue(), processedEvents.size());
// }
// BigInteger totalEvents = BigInteger.valueOf((long)
// THREADS).multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
// double eps = totalEvents.doubleValue() / durationSeconds;
//
// System.out.printf("Posted %s events in %.3f seconds%n", totalEvents, durationSeconds);
// System.out.printf("Throughput: %.0f events/sec%n", eps);
//
// Runtime rt = Runtime.getRuntime();
// System.out.printf("Used memory: %.2f MB%n", (rt.totalMemory() - rt.freeMemory()) / 1024.0
// / 1024.0);
//
// assertEquals(totalEvents.intValue(), processedEvents.size());
// }
//
// @Tag("stress") // @Tag("stress")
// @Test // @Test
// void constructorCacheVsReflection() throws Throwable { // void constructorCacheVsReflection() throws Throwable {
@@ -247,7 +234,6 @@
// long endHandle = System.nanoTime(); // long endHandle = System.nanoTime();
// //
// System.out.println("Reflection: " + (endReflect - startReflect) / 1_000_000 + " ms"); // System.out.println("Reflection: " + (endReflect - startReflect) / 1_000_000 + " ms");
// System.out.println("MethodHandle Cache: " + (endHandle - startHandle) / 1_000_000 + " // System.out.println("MethodHandle Cache: " + (endHandle - startHandle) / 1_000_000 + " ms");
// ms");
// } // }
// } }