diff --git a/framework/src/main/java/org/toop/framework/audio/AudioEventListener.java b/framework/src/main/java/org/toop/framework/audio/AudioEventListener.java index 628eae6..49bced1 100644 --- a/framework/src/main/java/org/toop/framework/audio/AudioEventListener.java +++ b/framework/src/main/java/org/toop/framework/audio/AudioEventListener.java @@ -25,17 +25,16 @@ public class AudioEventListener initListeners(String buttonSoundToPlay) { new EventFlow() - .listen(this::handleStopMusicManager) - .listen(this::handlePlaySound) - .listen(this::handleSkipSong) - .listen(this::handlePauseSong) - .listen(this::handlePreviousSong) - .listen(this::handleStopSound) - .listen(this::handleMusicStart) - .listen(this::handleVolumeChange) - .listen(this::handleGetVolume) - .listen(AudioEvents.ClickButton.class, _ -> - soundEffectManager.play(buttonSoundToPlay, false)); + .listen(AudioEvents.StopAudioManager.class, this::handleStopMusicManager, false) + .listen(AudioEvents.PlayEffect.class, this::handlePlaySound, false) + .listen(AudioEvents.SkipMusic.class, this::handleSkipSong, false) + .listen(AudioEvents.PauseMusic.class, this::handlePauseSong, false) + .listen(AudioEvents.PreviousMusic.class, this::handlePreviousSong, false) + .listen(AudioEvents.StopEffect.class, this::handleStopSound, false) + .listen(AudioEvents.StartBackgroundMusic.class, this::handleMusicStart, false) + .listen(AudioEvents.ChangeVolume.class, this::handleVolumeChange, false) + .listen(AudioEvents.GetVolume.class, this::handleGetVolume,false) + .listen(AudioEvents.ClickButton.class, _ -> soundEffectManager.play(buttonSoundToPlay, false), false); return this; } diff --git a/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java b/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java index 8836819..797c4a4 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java +++ b/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java @@ -38,7 +38,7 @@ public class EventFlow { private EventType event = null; /** The listener returned by GlobalEventBus subscription. Used for unsubscription. */ - private final List listeners = new ArrayList<>(); + private final List> listeners = new ArrayList<>(); /** Holds the results returned from the subscribed event, if any. */ private Map result = null; @@ -46,16 +46,15 @@ public class EventFlow { /** Empty constructor (event must be added via {@link #addPostEvent(Class, Object...)}). */ public EventFlow() {} - public EventFlow addPostEvent(EventType event) { - this.event = event; - return this; - } - - public EventFlow addPostEvent(Supplier eventSupplier) { - this.event = eventSupplier.get(); - return this; - } - + /** + * + * Add an event that will be triggered when {@link #postEvent()} or {@link #asyncPostEvent()} is called. + * + * @param eventClass The event that will be posted. + * @param args The event arguments, see the added event record for more information. + * @return {@link #EventFlow} + * + */ public EventFlow addPostEvent(Class eventClass, Object... args) { try { boolean isUuidEvent = UniqueEvent.class.isAssignableFrom(eventClass); @@ -98,155 +97,418 @@ public class EventFlow { } } - /** Subscribe by ID: only fires if UUID matches this publisher's eventId. */ - public EventFlow onResponse( - Class eventClass, Consumer action, boolean unsubscribeAfterSuccess) { - ListenerHandler[] listenerHolder = new ListenerHandler[1]; - listenerHolder[0] = - new ListenerHandler( - GlobalEventBus.subscribe( - eventClass, - event -> { - if (event.getIdentifier() != this.eventSnowflake) return; - - action.accept(event); - - if (unsubscribeAfterSuccess && listenerHolder[0] != null) { - GlobalEventBus.unsubscribe(listenerHolder[0]); - this.listeners.remove(listenerHolder[0]); - } - - this.result = event.result(); - })); - this.listeners.add(listenerHolder[0]); + /** + * + * Add an event that will be triggered when {@link #postEvent()} or {@link #asyncPostEvent()} is called. + * + * @param event The event to be posted. + * @return {@link #EventFlow} + * + */ + public EventFlow addPostEvent(EventType event) { + this.event = event; return this; } - /** Subscribe by ID: only fires if UUID matches this publisher's eventId. */ - public EventFlow onResponse(Class eventClass, Consumer action) { - return this.onResponse(eventClass, action, true); + /** + * + * Add an event that will be triggered when {@link #postEvent()} or {@link #asyncPostEvent()} is called. + * + * @param eventSupplier The event that will be posted through a Supplier. + * @return {@link #EventFlow} + * + */ + public EventFlow addPostEvent(Supplier eventSupplier) { + this.event = eventSupplier.get(); + return this; } - /** Subscribe by ID without explicit class. */ + /** + * + * Start listening for an event and trigger when ID correlates. + * + * @param event The {@link ResponseToUniqueEvent} to trigger the lambda. + * @param action The lambda to run when triggered. + * @param unsubscribeAfterSuccess Enable/disable auto unsubscribing to event after being triggered. + * @param name A name given to the event, can later be used to unsubscribe. + * @return {@link #EventFlow} + * + */ + public EventFlow onResponse( + Class event, Consumer action, boolean unsubscribeAfterSuccess, String name + ) { + + final long id = SnowflakeGenerator.nextId(); + + Consumer newAction = eventClass -> { + if (eventClass.getIdentifier() != this.eventSnowflake) return; + + action.accept(eventClass); + + if (unsubscribeAfterSuccess) unsubscribe(id); + + this.result = eventClass.result(); + }; + + // TODO Remove casts + var listener = new ListenerHandler<>( + id, + name, + (Class) event, + (Consumer) newAction + ); + + GlobalEventBus.subscribe(listener); + this.listeners.add(listener); + return this; + } + + /** + * + * Start listening for an event and trigger when ID correlates, auto unsubscribes after being triggered and adds an empty name. + * + * @param event The {@link ResponseToUniqueEvent} to trigger the lambda. + * @param action The lambda to run when triggered. + * @return {@link #EventFlow} + * + */ + public EventFlow onResponse(Class event, Consumer action) { + return this.onResponse(event, action, true, ""); + } + + /** + * + * Start listening for an event and trigger when ID correlates, auto adds an empty name. + * + * @param event The {@link ResponseToUniqueEvent} to trigger the lambda. + * @param action The lambda to run when triggered. + * @param unsubscribeAfterSuccess Enable/disable auto unsubscribing to event after being triggered. + * @return {@link #EventFlow} + * + */ + public EventFlow onResponse(Class event, Consumer action, boolean unsubscribeAfterSuccess) { + return this.onResponse(event, action, unsubscribeAfterSuccess, ""); + } + + /** + * + * Start listening for an event and trigger when ID correlates, auto unsubscribes after being triggered. + * + * @param event The {@link ResponseToUniqueEvent} to trigger the lambda. + * @param action The lambda to run when triggered. + * @param name A name given to the event, can later be used to unsubscribe. + * @return {@link #EventFlow} + * + */ + public EventFlow onResponse(Class event, Consumer action, String name) { + return this.onResponse(event, action, true, name); + } + + /** + * + * Subscribe by ID without explicit class. + * + * @param action The lambda to run when triggered. + * @return {@link #EventFlow} + * + * @deprecated use {@link #onResponse(Class, Consumer, boolean, String)} instead. + */ + @Deprecated @SuppressWarnings("unchecked") public EventFlow onResponse( - Consumer action, boolean unsubscribeAfterSuccess) { - ListenerHandler[] listenerHolder = new ListenerHandler[1]; - listenerHolder[0] = - new ListenerHandler( - GlobalEventBus.subscribe( - event -> { - if (!(event instanceof UniqueEvent uuidEvent)) return; - if (uuidEvent.getIdentifier() == this.eventSnowflake) { - try { - TT typedEvent = (TT) uuidEvent; - action.accept(typedEvent); - if (unsubscribeAfterSuccess - && listenerHolder[0] != null) { - GlobalEventBus.unsubscribe(listenerHolder[0]); - this.listeners.remove(listenerHolder[0]); - } - this.result = typedEvent.result(); - } catch (ClassCastException _) { - throw new ClassCastException( - "Cannot cast " - + event.getClass().getName() - + " to UniqueEvent"); - } - } - })); - this.listeners.add(listenerHolder[0]); + Consumer action, boolean unsubscribeAfterSuccess, String name) { + + final long id = SnowflakeGenerator.nextId(); + + Consumer newAction = event -> { + if (!(event instanceof UniqueEvent uuidEvent)) return; + if (uuidEvent.getIdentifier() == this.eventSnowflake) { + try { + TT typedEvent = (TT) uuidEvent; + action.accept(typedEvent); + + if (unsubscribeAfterSuccess) unsubscribe(id); + + this.result = typedEvent.result(); + } catch (ClassCastException _) { + throw new ClassCastException( + "Cannot cast " + + event.getClass().getName() + + " to UniqueEvent"); + } + } + }; + + var listener = new ListenerHandler<>( + id, + name, + (Class) action.getClass().getDeclaredMethods()[0].getParameterTypes()[0], + newAction + ); + + GlobalEventBus.subscribe(listener); + this.listeners.add(listener); return this; } + /** + * + * Subscribe by ID without explicit class. + * + * @param action The lambda to run when triggered. + * @return {@link #EventFlow} + * + * @deprecated use {@link #onResponse(Class, Consumer)} instead. + */ + @Deprecated public EventFlow onResponse(Consumer action) { - return this.onResponse(action, true); + return this.onResponse(action, true, ""); } + /** + * + * Start listening for an event, and run a lambda when triggered. + * + * @param event The {@link EventType} to trigger the lambda. + * @param action The lambda to run when triggered. + * @param unsubscribeAfterSuccess Enable/disable auto unsubscribing to event after being triggered. + * @param name A name given to the event, can later be used to unsubscribe. + * @return {@link #EventFlow} + * + */ public EventFlow listen( - Class eventClass, Consumer action, boolean unsubscribeAfterSuccess) { - ListenerHandler[] listenerHolder = new ListenerHandler[1]; - listenerHolder[0] = - new ListenerHandler( - GlobalEventBus.subscribe( - eventClass, - event -> { - action.accept(event); + Class event, Consumer action, boolean unsubscribeAfterSuccess, String name) { - if (unsubscribeAfterSuccess && listenerHolder[0] != null) { - GlobalEventBus.unsubscribe(listenerHolder[0]); - this.listeners.remove(listenerHolder[0]); - } - })); - this.listeners.add(listenerHolder[0]); + long id = SnowflakeGenerator.nextId(); + + Consumer newAction = eventc -> { + action.accept(eventc); + + if (unsubscribeAfterSuccess) unsubscribe(id); + }; + + var listener = new ListenerHandler<>( + id, + name, + event, + newAction + ); + + GlobalEventBus.subscribe(listener); + this.listeners.add(listener); return this; } - public EventFlow listen(Class eventClass, Consumer action) { - return this.listen(eventClass, action, true); + /** + * + * Start listening for an event, and run a lambda when triggered, auto unsubscribes. + * + * @param event The {@link EventType} to trigger the lambda. + * @param action The lambda to run when triggered. + * @param name A name given to the event, can later be used to unsubscribe. + * @return {@link #EventFlow} + * + */ + public EventFlow listen(Class event, Consumer action, String name) { + return this.listen(event, action, true, name); } + /** + * + * Start listening for an event, and run a lambda when triggered, auto unsubscribe and gives it an empty name. + * + * @param event The {@link EventType} to trigger the lambda. + * @param action The lambda to run when triggered. + * @return {@link #EventFlow} + * + */ + public EventFlow listen(Class event, Consumer action) { + return this.listen(event, action, true, ""); + } + + /** + * + * Start listening for an event, and run a lambda when triggered, adds an empty name. + * + * @param event The {@link EventType} to trigger the lambda. + * @param action The lambda to run when triggered. + * @param unsubscribeAfterSuccess Enable/disable auto unsubscribing to event after being triggered. + * @return {@link #EventFlow} + * + */ + public EventFlow listen(Class event, Consumer action, boolean unsubscribeAfterSuccess) { + return this.listen(event, action, unsubscribeAfterSuccess, ""); + } + + /** + * + * Start listening to an event. + * + * @param action The lambda to run when triggered. + * @return {@link EventFlow} + * + * @deprecated use {@link #listen(Class, Consumer, boolean, String)} instead. + */ + @Deprecated @SuppressWarnings("unchecked") public EventFlow listen( - Consumer action, boolean unsubscribeAfterSuccess) { - ListenerHandler[] listenerHolder = new ListenerHandler[1]; - listenerHolder[0] = - new ListenerHandler( - GlobalEventBus.subscribe( - event -> { - if (!(event instanceof EventType nonUuidEvent)) return; - try { - TT typedEvent = (TT) nonUuidEvent; - action.accept(typedEvent); - if (unsubscribeAfterSuccess && listenerHolder[0] != null) { - GlobalEventBus.unsubscribe(listenerHolder[0]); - this.listeners.remove(listenerHolder[0]); - } - } catch (ClassCastException _) { - throw new ClassCastException( - "Cannot cast " - + event.getClass().getName() - + " to UniqueEvent"); - } - })); - this.listeners.add(listenerHolder[0]); + Consumer action, boolean unsubscribeAfterSuccess, String name) { + long id = SnowflakeGenerator.nextId(); + + Class eventClass = (Class) action.getClass().getDeclaredMethods()[0].getParameterTypes()[0]; + + Consumer newAction = event -> { + if (!(event instanceof EventType nonUuidEvent)) return; + try { + TT typedEvent = (TT) nonUuidEvent; + action.accept(typedEvent); + if (unsubscribeAfterSuccess) unsubscribe(id); + } catch (ClassCastException _) { + throw new ClassCastException( + "Cannot cast " + + event.getClass().getName() + + " to UniqueEvent"); + } + }; + + var listener = new ListenerHandler<>( + id, + name, + eventClass, + newAction + ); + + GlobalEventBus.subscribe(listener); + this.listeners.add(listener); return this; } + /** + * + * Start listening to an event. + * + * @param action The lambda to run when triggered. + * @return {@link EventFlow} + * + * @deprecated use {@link #listen(Class, Consumer)} instead. + */ + @Deprecated public EventFlow listen(Consumer action) { - return this.listen(action, true); + return this.listen(action, true, ""); } - /** Post synchronously */ + /** + * Posts the event added through {@link #addPostEvent}. + */ public EventFlow postEvent() { GlobalEventBus.post(this.event); return this; } - /** Post asynchronously */ + /** + * Posts the event added through {@link #addPostEvent} asynchronously. + */ public EventFlow asyncPostEvent() { GlobalEventBus.postAsync(this.event); return this; } + /** + * + * Unsubscribe from an event. + * + * @param listenerObject The listener object to remove and unsubscribe. + */ + public void unsubscribe(Object listenerObject) { + this.listeners.removeIf(handler -> { + if (handler.getListener() == listenerObject) { + GlobalEventBus.unsubscribe(handler); + return true; + } + return false; + }); + } + + /** + * + * Unsubscribe from an event. + * + * @param listenerId The id given to the {@link ListenerHandler}. + */ + public void unsubscribe(long listenerId) { + this.listeners.removeIf(handler -> { + if (handler.getId() == listenerId) { + GlobalEventBus.unsubscribe(handler); + return true; + } + return false; + }); + } + + /** + * Unsubscribe from an event. + * + * @param name The name given to the listener. + */ + public void unsubscribe(String name) { + this.listeners.removeIf(handler -> { + if (handler.getName().equals(name)) { + GlobalEventBus.unsubscribe(handler); + return true; + } + return false; + }); + } + + /** + * Unsubscribe all events. + */ + public void unsubscribeAll() { + this.listeners.forEach(this::unsubscribe); + } + + /** + * Clean and remove everything inside {@link EventFlow}. + */ private void clean() { this.listeners.clear(); this.event = null; this.result = null; } // TODO + /** + * TODO + * + * @return TODO + */ public Map getResult() { return this.result; } + /** + * TODO + * + * @return TODO + */ public EventType getEvent() { return event; } + /** + * + * Returns a copy of the list of listeners. + * + * @return Copy of the list of listeners. + */ public ListenerHandler[] getListeners() { return listeners.toArray(new ListenerHandler[0]); } + /** + * Returns the generated snowflake for the {@link EventFlow} + * + * @return The generated snowflake for this {@link EventFlow} + */ public long getEventSnowflake() { return eventSnowflake; } diff --git a/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java index ee6bdfb..9d03140 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java +++ b/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java @@ -16,7 +16,7 @@ import org.toop.framework.eventbus.events.UniqueEvent; public final class GlobalEventBus { /** Map of event class to type-specific listeners. */ - private static final Map, CopyOnWriteArrayList>> + private static final Map, CopyOnWriteArrayList>> LISTENERS = new ConcurrentHashMap<>(); /** Map of event class to Snowflake-ID-specific listeners. */ @@ -49,7 +49,6 @@ public final class GlobalEventBus { ProducerType.MULTI, new BusySpinWaitStrategy()); - // Single consumer that dispatches to subscribers DISRUPTOR.handleEventsWith( (holder, seq, endOfBatch) -> { if (holder.event != null) { @@ -73,23 +72,11 @@ public final class GlobalEventBus { // ------------------------------------------------------------------------ // Subscription // ------------------------------------------------------------------------ - public static Consumer subscribe( - Class eventClass, Consumer listener) { - - CopyOnWriteArrayList> list = - LISTENERS.computeIfAbsent(eventClass, k -> new CopyOnWriteArrayList<>()); - - Consumer wrapper = event -> listener.accept(eventClass.cast(event)); - list.add(wrapper); - return wrapper; - } - - public static Consumer subscribe(Consumer listener) { - Consumer wrapper = event -> listener.accept(event); - LISTENERS.computeIfAbsent(Object.class, _ -> new CopyOnWriteArrayList<>()).add(wrapper); - return wrapper; + public static void subscribe(ListenerHandler listener) { + LISTENERS.computeIfAbsent(listener.getListenerClass(), _ -> new CopyOnWriteArrayList<>()).add(listener); } + // TODO public static void subscribeById( Class eventClass, long eventId, Consumer listener) { UUID_LISTENERS @@ -97,10 +84,14 @@ public final class GlobalEventBus { .put(eventId, listener); } - public static void unsubscribe(Object listener) { - LISTENERS.values().forEach(list -> list.remove(listener)); + public static void unsubscribe(ListenerHandler listener) { + // TODO suspicious call + LISTENERS.getOrDefault(listener.getListenerClass(), new CopyOnWriteArrayList<>()) + .remove(listener); + LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty()); } + // TODO public static void unsubscribeById( Class eventClass, long eventId) { Map> map = UUID_LISTENERS.get(eventClass); @@ -124,36 +115,42 @@ public final class GlobalEventBus { } } + @SuppressWarnings("unchecked") + private static void callListener(ListenerHandler raw, EventType event) { + ListenerHandler handler = (ListenerHandler) raw; + Consumer listener = handler.getListener(); + + T casted = (T) event; + + listener.accept(casted); + } + @SuppressWarnings("unchecked") private static void dispatchEvent(EventType event) { Class clazz = event.getClass(); - // class-specific listeners - CopyOnWriteArrayList> classListeners = LISTENERS.get(clazz); + CopyOnWriteArrayList> classListeners = LISTENERS.get(clazz); if (classListeners != null) { - for (Consumer listener : classListeners) { + for (ListenerHandler listener : classListeners) { try { - listener.accept(event); + callListener(listener, event); } catch (Throwable e) { // e.printStackTrace(); } } } - // generic listeners - CopyOnWriteArrayList> genericListeners = - LISTENERS.get(Object.class); + CopyOnWriteArrayList> genericListeners = LISTENERS.get(Object.class); if (genericListeners != null) { - for (Consumer listener : genericListeners) { + for (ListenerHandler listener : genericListeners) { try { - listener.accept(event); + callListener(listener, event); } catch (Throwable e) { // e.printStackTrace(); } } } - // snowflake listeners if (event instanceof UniqueEvent snowflakeEvent) { Map> map = UUID_LISTENERS.get(clazz); if (map != null) { @@ -182,4 +179,8 @@ public final class GlobalEventBus { LISTENERS.clear(); UUID_LISTENERS.clear(); } + + public static Map, CopyOnWriteArrayList>> getAllListeners() { + return LISTENERS; + } } diff --git a/framework/src/main/java/org/toop/framework/eventbus/ListenerHandler.java b/framework/src/main/java/org/toop/framework/eventbus/ListenerHandler.java index cc5fbc4..fe721ea 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/ListenerHandler.java +++ b/framework/src/main/java/org/toop/framework/eventbus/ListenerHandler.java @@ -1,25 +1,48 @@ package org.toop.framework.eventbus; -public class ListenerHandler { - private Object listener; +import org.toop.framework.SnowflakeGenerator; +import org.toop.framework.eventbus.events.EventType; - // private boolean unsubscribeAfterSuccess = true; +import java.util.function.Consumer; - // public ListenerHandler(Object listener, boolean unsubAfterSuccess) { - // this.listener = listener; - // this.unsubscribeAfterSuccess = unsubAfterSuccess; - // } +public class ListenerHandler { + private final long id; + private final String name; + private final Class clazz; + private final Consumer listener; - public ListenerHandler(Object listener) { + public ListenerHandler(long id, String name, Class clazz, Consumer listener) { + this.id = id; + this.name = name; + this.clazz = clazz; this.listener = listener; } - public Object getListener() { - return this.listener; + public ListenerHandler(String name, Class clazz, Consumer listener) { + this(SnowflakeGenerator.nextId(), name, clazz, listener); } - // public boolean isUnsubscribeAfterSuccess() { - // return this.unsubscribeAfterSuccess; - // } + public ListenerHandler(long id, Class clazz, Consumer listener) { + this(id, String.valueOf(id), clazz, listener); + } + public ListenerHandler(Class clazz, Consumer listener) { + this(SnowflakeGenerator.nextId(), clazz, listener); + } + + public long getId() { + return id; + } + + public Consumer getListener() { + return listener; + } + + public Class getListenerClass() { + return clazz; + } + + public String getName() { + return name; + } } diff --git a/framework/src/main/java/org/toop/framework/networking/NetworkingClientEventListener.java b/framework/src/main/java/org/toop/framework/networking/NetworkingClientEventListener.java index ef51a46..661dc02 100644 --- a/framework/src/main/java/org/toop/framework/networking/NetworkingClientEventListener.java +++ b/framework/src/main/java/org/toop/framework/networking/NetworkingClientEventListener.java @@ -17,25 +17,25 @@ public class NetworkingClientEventListener { public NetworkingClientEventListener(NetworkingClientManager clientManager) { this.clientManager = clientManager; new EventFlow() - .listen(this::handleStartClient) - .listen(this::handleCommand) - .listen(this::handleSendLogin) - .listen(this::handleSendLogout) - .listen(this::handleSendGetPlayerlist) - .listen(this::handleSendGetGamelist) - .listen(this::handleSendSubscribe) - .listen(this::handleSendMove) - .listen(this::handleSendChallenge) - .listen(this::handleSendAcceptChallenge) - .listen(this::handleSendForfeit) - .listen(this::handleSendMessage) - .listen(this::handleSendHelp) - .listen(this::handleSendHelpForCommand) - .listen(this::handleCloseClient) - .listen(this::handleReconnect) - .listen(this::handleChangeAddress) - .listen(this::handleGetAllConnections) - .listen(this::handleShutdownAll); + .listen(NetworkEvents.StartClient.class, this::handleStartClient, false) + .listen(NetworkEvents.SendCommand.class, this::handleCommand, false) + .listen(NetworkEvents.SendLogin.class, this::handleSendLogin, false) + .listen(NetworkEvents.SendLogout.class, this::handleSendLogout, false) + .listen(NetworkEvents.SendGetPlayerlist.class, this::handleSendGetPlayerlist, false) + .listen(NetworkEvents.SendGetGamelist.class, this::handleSendGetGamelist, false) + .listen(NetworkEvents.SendSubscribe.class, this::handleSendSubscribe, false) + .listen(NetworkEvents.SendMove.class, this::handleSendMove, false) + .listen(NetworkEvents.SendChallenge.class, this::handleSendChallenge, false) + .listen(NetworkEvents.SendAcceptChallenge.class, this::handleSendAcceptChallenge, false) + .listen(NetworkEvents.SendForfeit.class, this::handleSendForfeit, false) + .listen(NetworkEvents.SendMessage.class, this::handleSendMessage, false) + .listen(NetworkEvents.SendHelp.class, this::handleSendHelp, false) + .listen(NetworkEvents.SendHelpForCommand.class, this::handleSendHelpForCommand, false) + .listen(NetworkEvents.CloseClient.class, this::handleCloseClient, false) + .listen(NetworkEvents.Reconnect.class, this::handleReconnect, false) + .listen(NetworkEvents.ChangeAddress.class, this::handleChangeAddress, false) + .listen(NetworkEvents.RequestsAllClients.class, this::handleGetAllConnections, false) + .listen(NetworkEvents.ForceCloseAllClients.class, this::handleShutdownAll, false); } void handleStartClient(NetworkEvents.StartClient event) { diff --git a/framework/src/test/java/org/toop/framework/eventbus/EventFlowStressTest.java b/framework/src/test/java/org/toop/framework/eventbus/EventFlowStressTest.java index 9a9ec76..cd7d090 100644 --- a/framework/src/test/java/org/toop/framework/eventbus/EventFlowStressTest.java +++ b/framework/src/test/java/org/toop/framework/eventbus/EventFlowStressTest.java @@ -1,235 +1,222 @@ -// package org.toop.framework.eventbus; -// -// import org.junit.jupiter.api.Tag; -// import org.junit.jupiter.api.Test; -// import org.toop.framework.eventbus.events.UniqueEvent; -// -// import java.math.BigInteger; -// import java.util.concurrent.*; -// import java.util.concurrent.atomic.LongAdder; -// -// import static org.junit.jupiter.api.Assertions.assertEquals; -// -// class EventFlowStressTest { -// -// /** Top-level record to ensure runtime type matches subscription */ -// public record HeavyEvent(String payload, long eventSnowflake) implements UniqueEvent { -// @Override -// public java.util.Map result() { -// return java.util.Map.of("payload", payload, "eventId", eventSnowflake); -// } -// -// @Override -// public long eventSnowflake() { -// return this.eventSnowflake; -// } -// } -// -// public record HeavyEventSuccess(String payload, long eventSnowflake) implements -// UniqueEvent { -// @Override -// public java.util.Map result() { -// return java.util.Map.of("payload", payload, "eventId", eventSnowflake); -// } -// -// @Override -// public long eventSnowflake() { -// return eventSnowflake; -// } -// } -// -// private static final int THREADS = 32; -// private static final long EVENTS_PER_THREAD = 10_000_000; -// -// @Tag("stress") -// @Test -// void extremeConcurrencySendTest_progressWithMemory() throws InterruptedException { -// LongAdder counter = new LongAdder(); -// ExecutorService executor = Executors.newFixedThreadPool(THREADS); -// -// BigInteger totalEvents = BigInteger.valueOf(THREADS) -// .multiply(BigInteger.valueOf(EVENTS_PER_THREAD)); -// -// long startTime = System.currentTimeMillis(); -// -// // Monitor thread for EPS and memory -// Thread monitor = new Thread(() -> { -// long lastCount = 0; -// long lastTime = System.currentTimeMillis(); -// Runtime runtime = Runtime.getRuntime(); -// -// while (counter.sum() < totalEvents.longValue()) { -// try { Thread.sleep(200); } catch (InterruptedException ignored) {} -// -// long now = System.currentTimeMillis(); -// long completed = counter.sum(); -// long eventsThisPeriod = completed - lastCount; -// double eps = eventsThisPeriod / ((now - lastTime) / 1000.0); -// -// long usedMemory = runtime.totalMemory() - runtime.freeMemory(); -// double usedPercent = usedMemory * 100.0 / runtime.maxMemory(); -// -// System.out.printf( -// "Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n", -// completed, -// totalEvents.longValue(), -// completed * 100.0 / totalEvents.doubleValue(), -// eps, -// usedMemory / 1024.0 / 1024.0, -// usedPercent -// ); -// -// lastCount = completed; -// lastTime = now; -// } -// }); -// monitor.setDaemon(true); -// monitor.start(); -// -// var listener = new EventFlow().listen(HeavyEvent.class, _ -> counter.increment()); -// -// // Submit events asynchronously -// for (int t = 0; t < THREADS; t++) { -// executor.submit(() -> { -// for (int i = 0; i < EVENTS_PER_THREAD; i++) { -// var _ = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i) -// .asyncPostEvent(); -// } -// }); -// } -// -// executor.shutdown(); -// executor.awaitTermination(10, TimeUnit.MINUTES); -// -// listener.getResult(); -// -// long endTime = System.currentTimeMillis(); -// double durationSeconds = (endTime - startTime) / 1000.0; -// -// System.out.println("Posted " + totalEvents + " events in " + durationSeconds + " -// seconds"); -// double averageEps = totalEvents.doubleValue() / durationSeconds; -// System.out.printf("Average EPS: %.0f%n", averageEps); -// -// assertEquals(totalEvents.longValue(), counter.sum()); -// } -// -// @Tag("stress") -// @Test -// void extremeConcurrencySendAndReturnTest_progressWithMemory() throws InterruptedException { -// LongAdder counter = new LongAdder(); -// ExecutorService executor = Executors.newFixedThreadPool(THREADS); -// -// BigInteger totalEvents = BigInteger.valueOf(THREADS) -// .multiply(BigInteger.valueOf(EVENTS_PER_THREAD)); -// -// long startTime = System.currentTimeMillis(); -// -// // Monitor thread for EPS and memory -// Thread monitor = new Thread(() -> { -// long lastCount = 0; -// long lastTime = System.currentTimeMillis(); -// Runtime runtime = Runtime.getRuntime(); -// -// while (counter.sum() < totalEvents.longValue()) { -// try { Thread.sleep(200); } catch (InterruptedException ignored) {} -// -// long now = System.currentTimeMillis(); -// long completed = counter.sum(); -// long eventsThisPeriod = completed - lastCount; -// double eps = eventsThisPeriod / ((now - lastTime) / 1000.0); -// -// long usedMemory = runtime.totalMemory() - runtime.freeMemory(); -// double usedPercent = usedMemory * 100.0 / runtime.maxMemory(); -// -// System.out.printf( -// "Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n", -// completed, -// totalEvents.longValue(), -// completed * 100.0 / totalEvents.doubleValue(), -// eps, -// usedMemory / 1024.0 / 1024.0, -// usedPercent -// ); -// -// lastCount = completed; -// lastTime = now; -// } -// }); -// monitor.setDaemon(true); -// monitor.start(); -// -// // Submit events asynchronously -// for (int t = 0; t < THREADS; t++) { -// executor.submit(() -> { -// for (int i = 0; i < EVENTS_PER_THREAD; i++) { -// var a = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i) -// .onResponse(HeavyEventSuccess.class, _ -> counter.increment()) -// .postEvent(); -// -// new EventFlow().addPostEvent(HeavyEventSuccess.class, "payload-" + i, -// a.getEventSnowflake()) -// .postEvent(); -// } -// }); -// } -// -// executor.shutdown(); -// executor.awaitTermination(10, TimeUnit.MINUTES); -// -// long endTime = System.currentTimeMillis(); -// double durationSeconds = (endTime - startTime) / 1000.0; -// -// System.out.println("Posted " + totalEvents + " events in " + durationSeconds + " -// seconds"); -// double averageEps = totalEvents.doubleValue() / durationSeconds; -// System.out.printf("Average EPS: %.0f%n", averageEps); -// -// assertEquals(totalEvents.longValue(), counter.sum()); -// } -// -// -// @Tag("stress") -// @Test -// void efficientExtremeConcurrencyTest() throws InterruptedException { -// final int THREADS = Runtime.getRuntime().availableProcessors(); -// final int EVENTS_PER_THREAD = 5000; -// -// ExecutorService executor = Executors.newFixedThreadPool(THREADS); -// ConcurrentLinkedQueue processedEvents = new ConcurrentLinkedQueue<>(); -// -// long start = System.nanoTime(); -// -// for (int t = 0; t < THREADS; t++) { -// executor.submit(() -> { -// for (int i = 0; i < EVENTS_PER_THREAD; i++) { -// new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i) -// .onResponse(HeavyEvent.class, processedEvents::add) -// .postEvent(); -// } -// }); -// } -// -// executor.shutdown(); -// executor.awaitTermination(10, TimeUnit.MINUTES); -// -// long end = System.nanoTime(); -// double durationSeconds = (end - start) / 1_000_000_000.0; -// -// BigInteger totalEvents = BigInteger.valueOf((long) -// THREADS).multiply(BigInteger.valueOf(EVENTS_PER_THREAD)); -// double eps = totalEvents.doubleValue() / durationSeconds; -// -// System.out.printf("Posted %s events in %.3f seconds%n", totalEvents, durationSeconds); -// System.out.printf("Throughput: %.0f events/sec%n", eps); -// -// Runtime rt = Runtime.getRuntime(); -// System.out.printf("Used memory: %.2f MB%n", (rt.totalMemory() - rt.freeMemory()) / 1024.0 -// / 1024.0); -// -// assertEquals(totalEvents.intValue(), processedEvents.size()); -// } -// + package org.toop.framework.eventbus; + + import org.junit.jupiter.api.Tag; + import org.junit.jupiter.api.Test; + import org.toop.framework.eventbus.events.ResponseToUniqueEvent; + + import java.math.BigInteger; + import java.util.concurrent.*; + import java.util.concurrent.atomic.LongAdder; + + import static org.junit.jupiter.api.Assertions.assertEquals; + + class EventFlowStressTest { + + public record HeavyEvent(String payload, long eventSnowflake) implements ResponseToUniqueEvent { + @Override + public long getIdentifier() { + return eventSnowflake; + } + } + + public record HeavyEventSuccess(String payload, long eventSnowflake) implements ResponseToUniqueEvent { + @Override + public long getIdentifier() { + return eventSnowflake; + } + } + + private static final int THREADS = 32; + private static final long EVENTS_PER_THREAD = 10_000_000; + + @Tag("stress") + @Test + void extremeConcurrencySendTest_progressWithMemory() throws InterruptedException { + LongAdder counter = new LongAdder(); + ExecutorService executor = Executors.newFixedThreadPool(THREADS); + + BigInteger totalEvents = BigInteger.valueOf(THREADS) + .multiply(BigInteger.valueOf(EVENTS_PER_THREAD)); + + long startTime = System.currentTimeMillis(); + + Thread monitor = new Thread(() -> { + long lastCount = 0; + long lastTime = System.currentTimeMillis(); + Runtime runtime = Runtime.getRuntime(); + + while (counter.sum() < totalEvents.longValue()) { + try { Thread.sleep(200); } catch (InterruptedException ignored) {} + + long now = System.currentTimeMillis(); + long completed = counter.sum(); + long eventsThisPeriod = completed - lastCount; + double eps = eventsThisPeriod / ((now - lastTime) / 1000.0); + + long usedMemory = runtime.totalMemory() - runtime.freeMemory(); + double usedPercent = usedMemory * 100.0 / runtime.maxMemory(); + + System.out.printf( + "Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n", + completed, + totalEvents.longValue(), + completed * 100.0 / totalEvents.doubleValue(), + eps, + usedMemory / 1024.0 / 1024.0, + usedPercent + ); + + lastCount = completed; + lastTime = now; + } + }); + monitor.setDaemon(true); + monitor.start(); + + var listener = new EventFlow().listen(HeavyEvent.class, _ -> counter.increment()); + + for (int t = 0; t < THREADS; t++) { + executor.submit(() -> { + for (int i = 0; i < EVENTS_PER_THREAD; i++) { + var _ = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i) + .asyncPostEvent(); + } + }); + } + + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.MINUTES); + + listener.getResult(); + + long endTime = System.currentTimeMillis(); + double durationSeconds = (endTime - startTime) / 1000.0; + + System.out.println("Posted " + totalEvents + " events in " + durationSeconds + " seconds"); + double averageEps = totalEvents.doubleValue() / durationSeconds; + System.out.printf("Average EPS: %.0f%n", averageEps); + + assertEquals(totalEvents.longValue(), counter.sum()); + } + + @Tag("stress") + @Test + void extremeConcurrencySendAndReturnTest_progressWithMemory() throws InterruptedException { + LongAdder counter = new LongAdder(); + ExecutorService executor = Executors.newFixedThreadPool(THREADS); + + BigInteger totalEvents = BigInteger.valueOf(THREADS) + .multiply(BigInteger.valueOf(EVENTS_PER_THREAD)); + + long startTime = System.currentTimeMillis(); + + Thread monitor = new Thread(() -> { + long lastCount = 0; + long lastTime = System.currentTimeMillis(); + Runtime runtime = Runtime.getRuntime(); + + while (counter.sum() < totalEvents.longValue()) { + try { Thread.sleep(500); } catch (InterruptedException ignored) {} + + long now = System.currentTimeMillis(); + long completed = counter.sum(); + long eventsThisPeriod = completed - lastCount; + double eps = eventsThisPeriod / ((now - lastTime) / 1000.0); + + long usedMemory = runtime.totalMemory() - runtime.freeMemory(); + double usedPercent = usedMemory * 100.0 / runtime.maxMemory(); + + System.out.printf( + "Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n", + completed, + totalEvents.longValue(), + completed * 100.0 / totalEvents.doubleValue(), + eps, + usedMemory / 1024.0 / 1024.0, + usedPercent + ); + + lastCount = completed; + lastTime = now; + } + }); + monitor.setDaemon(true); + monitor.start(); + + EventFlow sharedFlow = new EventFlow(); + sharedFlow.listen(HeavyEventSuccess.class, _ -> counter.increment(), false, "heavyEventSuccessListener"); + + for (int t = 0; t < THREADS; t++) { + executor.submit(() -> { + EventFlow threadFlow = new EventFlow(); + + for (int i = 0; i < EVENTS_PER_THREAD; i++) { + var heavyEvent = threadFlow.addPostEvent(HeavyEvent.class, "payload-" + i) + .postEvent(); + + threadFlow.addPostEvent(HeavyEventSuccess.class, "payload-" + i, heavyEvent.getEventSnowflake()) + .postEvent(); + } + }); + } + + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.MINUTES); + + long endTime = System.currentTimeMillis(); + double durationSeconds = (endTime - startTime) / 1000.0; + + System.out.println("Posted " + totalEvents + " events in " + durationSeconds + " seconds"); + double averageEps = totalEvents.doubleValue() / durationSeconds; + System.out.printf("Average EPS: %.0f%n", averageEps); + + assertEquals(totalEvents.longValue(), counter.sum()); + } + + @Tag("stress") + @Test + void efficientExtremeConcurrencyTest() throws InterruptedException { + final int THREADS = Runtime.getRuntime().availableProcessors(); + final int EVENTS_PER_THREAD = 1_000_000; + + ExecutorService executor = Executors.newFixedThreadPool(THREADS); + ConcurrentLinkedQueue processedEvents = new ConcurrentLinkedQueue<>(); + + long start = System.nanoTime(); + + EventFlow sharedFlow = new EventFlow(); + sharedFlow.listen(HeavyEvent.class, processedEvents::add, false, "heavyEventListener"); + + for (int t = 0; t < THREADS; t++) { + executor.submit(() -> { + EventFlow threadFlow = new EventFlow(); + + for (int i = 0; i < EVENTS_PER_THREAD; i++) { + threadFlow.addPostEvent(HeavyEvent.class, "payload-" + i) + .postEvent(); + } + }); + } + + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.MINUTES); + + long end = System.nanoTime(); + double durationSeconds = (end - start) / 1_000_000_000.0; + + BigInteger totalEvents = BigInteger.valueOf(THREADS) + .multiply(BigInteger.valueOf(EVENTS_PER_THREAD)); + double eps = totalEvents.doubleValue() / durationSeconds; + + System.out.printf("Posted %s events in %.3f seconds%n", totalEvents, durationSeconds); + System.out.printf("Throughput: %.0f events/sec%n", eps); + + Runtime rt = Runtime.getRuntime(); + System.out.printf("Used memory: %.2f MB%n", (rt.totalMemory() - rt.freeMemory()) / 1024.0 / 1024.0); + + assertEquals(totalEvents.intValue(), processedEvents.size()); + } + // @Tag("stress") // @Test // void constructorCacheVsReflection() throws Throwable { @@ -247,7 +234,6 @@ // long endHandle = System.nanoTime(); // // System.out.println("Reflection: " + (endReflect - startReflect) / 1_000_000 + " ms"); -// System.out.println("MethodHandle Cache: " + (endHandle - startHandle) / 1_000_000 + " -// ms"); +// System.out.println("MethodHandle Cache: " + (endHandle - startHandle) / 1_000_000 + " ms"); // } -// } + }