From fd357fd27d020c4f63ea356ae222514d66878b64 Mon Sep 17 00:00:00 2001 From: lieght <49651652+BAFGdeJong@users.noreply.github.com> Date: Sat, 6 Dec 2025 17:26:00 +0100 Subject: [PATCH] Can now test the event bus, created testable interfaces --- app/src/main/java/org/toop/Main.java | 7 - app/src/main/java/org/toop/app/Server.java | 39 ++-- .../toop/app/widget/display/SongDisplay.java | 6 +- .../framework/audio/AudioEventListener.java | 2 +- .../toop/framework/audio/MusicManager.java | 3 +- .../framework/eventbus/DisruptorEventBus.java | 118 +++++++++++ .../eventbus/DisruptorEventsHolder.java | 31 +++ .../org/toop/framework/eventbus/EventBus.java | 11 + .../toop/framework/eventbus/EventFlow.java | 23 ++- .../toop/framework/eventbus/EventsHolder.java | 10 + .../framework/eventbus/GlobalEventBus.java | 188 +----------------- .../framework/eventbus/OldGlobalEventBus.java | 159 +++++++++++++++ .../NetworkingClientEventListener.java | 1 + .../networking/NetworkingClientManager.java | 19 +- .../networking/events/NetworkEvents.java | 9 +- 15 files changed, 394 insertions(+), 232 deletions(-) create mode 100644 framework/src/main/java/org/toop/framework/eventbus/DisruptorEventBus.java create mode 100644 framework/src/main/java/org/toop/framework/eventbus/DisruptorEventsHolder.java create mode 100644 framework/src/main/java/org/toop/framework/eventbus/EventBus.java create mode 100644 framework/src/main/java/org/toop/framework/eventbus/EventsHolder.java create mode 100644 framework/src/main/java/org/toop/framework/eventbus/OldGlobalEventBus.java diff --git a/app/src/main/java/org/toop/Main.java b/app/src/main/java/org/toop/Main.java index 3115e34..3b4fef3 100644 --- a/app/src/main/java/org/toop/Main.java +++ b/app/src/main/java/org/toop/Main.java @@ -1,13 +1,6 @@ package org.toop; import org.toop.app.App; -import org.toop.framework.audio.*; -import org.toop.framework.networking.NetworkingClientEventListener; -import org.toop.framework.networking.NetworkingClientManager; -import org.toop.framework.resource.ResourceLoader; -import org.toop.framework.resource.ResourceManager; -import org.toop.framework.resource.resources.MusicAsset; -import org.toop.framework.resource.resources.SoundEffectAsset; public final class Main { static void main(String[] args) { diff --git a/app/src/main/java/org/toop/app/Server.java b/app/src/main/java/org/toop/app/Server.java index 408982d..ff28928 100644 --- a/app/src/main/java/org/toop/app/Server.java +++ b/app/src/main/java/org/toop/app/Server.java @@ -88,7 +88,7 @@ public final class Server { Primitive.text("connecting"), 0, 0, reconnectAttempts, true, true ); - WidgetContainer.getCurrentView().transitionNext(loading); + WidgetContainer.getCurrentView().transitionNextCustom(loading, "disconnect", this::disconnect); var a = new EventFlow() .addPostEvent(NetworkEvents.StartClient.class, @@ -105,8 +105,9 @@ public final class Server { ); }); - a.onResponse(NetworkEvents.StartClientResponse.class, e -> { + a.onResponse(NetworkEvents.CreatedIdForClient.class, e -> clientId = e.clientId(), true); + a.onResponse(NetworkEvents.StartClientResponse.class, e -> { if (!e.successful()) { return; } @@ -118,7 +119,6 @@ public final class Server { a.unsubscribe("startclient"); this.user = user; - clientId = e.clientId(); new EventFlow().addPostEvent(new NetworkEvents.SendLogin(clientId, user)).postEvent(); @@ -129,21 +129,24 @@ public final class Server { }, false, "startclient") .listen( - NetworkEvents.ConnectTry.class, - e -> Platform.runLater( - () -> { - try { - loading.setAmount(e.amount()); - if (e.amount() >= loading.getMaxAmount()) { - loading.triggerFailure(); - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - ), - false, "connecting" - ) + NetworkEvents.ConnectTry.class, + e -> { + if (clientId != e.clientId()) return; + Platform.runLater( + () -> { + try { + loading.setAmount(e.amount()); + if (e.amount() >= loading.getMaxAmount()) { + loading.triggerFailure(); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + ); + }, + false, "connecting" + ) .postEvent(); a.listen(NetworkEvents.ChallengeResponse.class, this::handleReceivedChallenge, false, "challenge") diff --git a/app/src/main/java/org/toop/app/widget/display/SongDisplay.java b/app/src/main/java/org/toop/app/widget/display/SongDisplay.java index ab9c345..9df8f21 100644 --- a/app/src/main/java/org/toop/app/widget/display/SongDisplay.java +++ b/app/src/main/java/org/toop/app/widget/display/SongDisplay.java @@ -49,11 +49,11 @@ public class SongDisplay extends VBox implements Widget { previousButton.getStyleClass().setAll("previous-button"); skipButton.setOnAction( event -> { - GlobalEventBus.post(new AudioEvents.SkipMusic()); + GlobalEventBus.get().post(new AudioEvents.SkipMusic()); }); pauseButton.setOnAction(event -> { - GlobalEventBus.post(new AudioEvents.PauseMusic()); + GlobalEventBus.get().post(new AudioEvents.PauseMusic()); if (pauseButton.getText().equals("⏸")) { pauseButton.setText("▶"); } @@ -63,7 +63,7 @@ public class SongDisplay extends VBox implements Widget { }); previousButton.setOnAction( event -> { - GlobalEventBus.post(new AudioEvents.PreviousMusic()); + GlobalEventBus.get().post(new AudioEvents.PreviousMusic()); }); HBox control = new HBox(10, previousButton, pauseButton, skipButton); diff --git a/framework/src/main/java/org/toop/framework/audio/AudioEventListener.java b/framework/src/main/java/org/toop/framework/audio/AudioEventListener.java index 49bced1..ed39a52 100644 --- a/framework/src/main/java/org/toop/framework/audio/AudioEventListener.java +++ b/framework/src/main/java/org/toop/framework/audio/AudioEventListener.java @@ -73,7 +73,7 @@ public class AudioEventListener implements org.toop.framework Runnable currentMusicTask = new Runnable() { @Override public void run() { - GlobalEventBus.post(new AudioEvents.PlayingMusic(track.getName(), track.currentPosition(), track.duration())); + GlobalEventBus.get().post(new AudioEvents.PlayingMusic(track.getName(), track.currentPosition(), track.duration())); scheduler.schedule(this, 1, TimeUnit.SECONDS); } }; diff --git a/framework/src/main/java/org/toop/framework/eventbus/DisruptorEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/DisruptorEventBus.java new file mode 100644 index 0000000..e756c8f --- /dev/null +++ b/framework/src/main/java/org/toop/framework/eventbus/DisruptorEventBus.java @@ -0,0 +1,118 @@ +package org.toop.framework.eventbus; + +import com.lmax.disruptor.BusySpinWaitStrategy; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import org.toop.framework.eventbus.events.EventType; + +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ThreadFactory; + +public class DisruptorEventBus implements EventBus { + /** Wrapper used inside the ring buffer. */ + private class EventHolder { + EventType event; + } + + EventsHolder eventsHolder; + + private final Disruptor disruptor; + private final RingBuffer ringBuffer; + + public DisruptorEventBus(EventsHolder eventsHolder) { + this.eventsHolder = eventsHolder; + + ThreadFactory threadFactory = + r -> { + Thread t = new Thread(r, "EventBus-Disruptor"); + t.setDaemon(true); + return t; + }; + + disruptor = getEventHolderDisruptor(threadFactory); + + disruptor.start(); + this.ringBuffer = disruptor.getRingBuffer(); + } + + private Disruptor getEventHolderDisruptor(ThreadFactory threadFactory) { + int RING_BUFFER_SIZE = 1024 * 64; + Disruptor disruptor = new Disruptor<>( + EventHolder::new, + RING_BUFFER_SIZE, + threadFactory, + ProducerType.MULTI, + new BusySpinWaitStrategy()); + + disruptor.handleEventsWith( + (holder, _, _) -> { + if (holder.event != null) { + dispatchEvent(holder.event); + holder.event = null; + } + }); + return disruptor; + } + + @Override + public void subscribe(ListenerHandler listener) { + eventsHolder.add(listener); + } + + @Override + public void unsubscribe(ListenerHandler listener) { + eventsHolder.remove(listener); + } + + @Override + public void post(EventType event) { + long seq = ringBuffer.next(); + try { + EventHolder holder = ringBuffer.get(seq); + holder.event = event; + } finally { + ringBuffer.publish(seq); + } + } + + @Override + public void shutdown() { + disruptor.shutdown(); + eventsHolder.reset(); + } + + @Override + public void reset() { + eventsHolder.reset(); + } + + private void dispatchEvent(EventType event) { + CopyOnWriteArrayList> classListeners = (CopyOnWriteArrayList>) eventsHolder.get(event.getClass()); + if (classListeners != null) { + for (ListenerHandler listener : classListeners) { + try { + callListener(listener, event); + } catch (Throwable e) { + // logger.warn("Exception while handling event: {}", event, e); TODO + } + } + } + + CopyOnWriteArrayList> genericListeners = (CopyOnWriteArrayList>) eventsHolder.get(Object.class); + if (genericListeners != null) { + for (ListenerHandler listener : genericListeners) { + try { + callListener(listener, event); + } catch (Throwable e) { + // logger.warn("Exception while handling event: {}", event, e); TODO + } + } + } + } + + + private static void callListener(ListenerHandler handler, EventType event) { + handler.getListener().accept((T) event); + } +} diff --git a/framework/src/main/java/org/toop/framework/eventbus/DisruptorEventsHolder.java b/framework/src/main/java/org/toop/framework/eventbus/DisruptorEventsHolder.java new file mode 100644 index 0000000..53b4e4a --- /dev/null +++ b/framework/src/main/java/org/toop/framework/eventbus/DisruptorEventsHolder.java @@ -0,0 +1,31 @@ +package org.toop.framework.eventbus; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +public class DisruptorEventsHolder implements EventsHolder { + private final Map, CopyOnWriteArrayList>> LISTENERS = new ConcurrentHashMap<>(); + + @Override + public void add(ListenerHandler listener) { + LISTENERS.computeIfAbsent(listener.getListenerClass(), _ -> new CopyOnWriteArrayList<>()).add(listener); + } + + @Override + public void remove(ListenerHandler listener) { + LISTENERS.getOrDefault(listener.getListenerClass(), new CopyOnWriteArrayList<>()).remove(listener); + LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty()); + } + + @Override + public List> get(Class listenerClass) { + return LISTENERS.get(listenerClass); + } + + @Override + public void reset() { + LISTENERS.clear(); + } +} diff --git a/framework/src/main/java/org/toop/framework/eventbus/EventBus.java b/framework/src/main/java/org/toop/framework/eventbus/EventBus.java new file mode 100644 index 0000000..8e6d92f --- /dev/null +++ b/framework/src/main/java/org/toop/framework/eventbus/EventBus.java @@ -0,0 +1,11 @@ +package org.toop.framework.eventbus; + +import org.toop.framework.eventbus.events.EventType; + +public interface EventBus { + void subscribe(ListenerHandler listener); + void unsubscribe(ListenerHandler listener); + void post(EventType event); + void shutdown(); + void reset(); +} diff --git a/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java b/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java index 5c66728..b6aacc6 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java +++ b/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java @@ -158,7 +158,7 @@ public class EventFlow { (Consumer) newAction ); - GlobalEventBus.subscribe(listener); + GlobalEventBus.get().subscribe(listener); this.listeners.add(listener); return this; } @@ -246,7 +246,7 @@ public class EventFlow { newAction ); - GlobalEventBus.subscribe(listener); + GlobalEventBus.get().subscribe(listener); this.listeners.add(listener); return this; } @@ -294,7 +294,7 @@ public class EventFlow { newAction ); - GlobalEventBus.subscribe(listener); + GlobalEventBus.get().subscribe(listener); this.listeners.add(listener); return this; } @@ -378,7 +378,7 @@ public class EventFlow { newAction ); - GlobalEventBus.subscribe(listener); + GlobalEventBus.get().subscribe(listener); this.listeners.add(listener); return this; } @@ -401,15 +401,18 @@ public class EventFlow { * Posts the event added through {@link #addPostEvent}. */ public EventFlow postEvent() { - GlobalEventBus.post(this.event); + GlobalEventBus.get().post(this.event); return this; } /** * Posts the event added through {@link #addPostEvent} asynchronously. + * + * @deprecated use {@link #postEvent()} instead. */ + @Deprecated public EventFlow asyncPostEvent() { - GlobalEventBus.postAsync(this.event); + GlobalEventBus.get().post(this.event); return this; } @@ -422,7 +425,7 @@ public class EventFlow { public void unsubscribe(Object listenerObject) { this.listeners.removeIf(handler -> { if (handler.getListener() == listenerObject) { - GlobalEventBus.unsubscribe(handler); + GlobalEventBus.get().unsubscribe(handler); return true; } return false; @@ -438,7 +441,7 @@ public class EventFlow { public void unsubscribe(long listenerId) { this.listeners.removeIf(handler -> { if (handler.getId() == listenerId) { - GlobalEventBus.unsubscribe(handler); + GlobalEventBus.get().unsubscribe(handler); return true; } return false; @@ -453,7 +456,7 @@ public class EventFlow { public void unsubscribe(String name) { this.listeners.removeIf(handler -> { if (handler.getName().equals(name)) { - GlobalEventBus.unsubscribe(handler); + GlobalEventBus.get().unsubscribe(handler); return true; } return false; @@ -465,7 +468,7 @@ public class EventFlow { */ public void unsubscribeAll() { listeners.removeIf(handler -> { - GlobalEventBus.unsubscribe(handler); + GlobalEventBus.get().unsubscribe(handler); return true; }); } diff --git a/framework/src/main/java/org/toop/framework/eventbus/EventsHolder.java b/framework/src/main/java/org/toop/framework/eventbus/EventsHolder.java new file mode 100644 index 0000000..fda14cc --- /dev/null +++ b/framework/src/main/java/org/toop/framework/eventbus/EventsHolder.java @@ -0,0 +1,10 @@ +package org.toop.framework.eventbus; + +import java.util.List; + +public interface EventsHolder { + void add(ListenerHandler listener); + void remove(ListenerHandler listener); + List> get(Class listenerClass); + void reset(); +} diff --git a/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java index dfefcb1..a928ca0 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java +++ b/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java @@ -1,193 +1,13 @@ package org.toop.framework.eventbus; -import com.lmax.disruptor.*; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; -import java.util.Map; -import java.util.concurrent.*; -import java.util.function.Consumer; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.toop.framework.eventbus.events.EventType; -import org.toop.framework.eventbus.events.UniqueEvent; -/** - * GlobalEventBus backed by the LMAX Disruptor for ultra-low latency, high-throughput event - * publishing. - */ -public final class GlobalEventBus { - private static final Logger logger = LogManager.getLogger(GlobalEventBus.class); +public class GlobalEventBus { + private static final EventBus INSTANCE = new DisruptorEventBus<>(new DisruptorEventsHolder()); - /** Map of event class to type-specific listeners. */ - private static final Map, CopyOnWriteArrayList>> - LISTENERS = new ConcurrentHashMap<>(); - - /** Map of event class to Snowflake-ID-specific listeners. */ - private static final Map< - Class, ConcurrentHashMap>> - UUID_LISTENERS = new ConcurrentHashMap<>(); - - /** Disruptor ring buffer size (must be power of two). */ - private static final int RING_BUFFER_SIZE = 1024 * 64; - - /** Disruptor instance. */ - private static final Disruptor DISRUPTOR; - - /** Ring buffer used for publishing events. */ - private static final RingBuffer RING_BUFFER; - - static { - ThreadFactory threadFactory = - r -> { - Thread t = new Thread(r, "EventBus-Disruptor"); - t.setDaemon(true); - return t; - }; - - DISRUPTOR = - new Disruptor<>( - EventHolder::new, - RING_BUFFER_SIZE, - threadFactory, - ProducerType.MULTI, - new BusySpinWaitStrategy()); - - DISRUPTOR.handleEventsWith( - (holder, seq, endOfBatch) -> { - if (holder.event != null) { - dispatchEvent(holder.event); - holder.event = null; - } - }); - - DISRUPTOR.start(); - RING_BUFFER = DISRUPTOR.getRingBuffer(); - } - - /** Prevent instantiation. */ private GlobalEventBus() {} - /** Wrapper used inside the ring buffer. */ - private static class EventHolder { - EventType event; - } - - // ------------------------------------------------------------------------ - // Subscription - // ------------------------------------------------------------------------ - public static void subscribe(ListenerHandler listener) { - logger.debug("Subscribing to {}: {}", listener.getListenerClass().getSimpleName(), listener.getListener().getClass().getSimpleName()); - LISTENERS.computeIfAbsent(listener.getListenerClass(), _ -> new CopyOnWriteArrayList<>()).add(listener); - } - - // TODO - public static void subscribeById( - Class eventClass, long eventId, Consumer listener) { - UUID_LISTENERS - .computeIfAbsent(eventClass, _ -> new ConcurrentHashMap<>()) - .put(eventId, listener); - } - - public static void unsubscribe(ListenerHandler listener) { - logger.debug("Unsubscribing from {}: {}", listener.getListenerClass().getSimpleName(), listener.getListener().getClass().getSimpleName()); - LISTENERS.getOrDefault(listener.getListenerClass(), new CopyOnWriteArrayList<>()) - .remove(listener); - LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty()); - } - - // TODO - public static void unsubscribeById( - Class eventClass, long eventId) { - Map> map = UUID_LISTENERS.get(eventClass); - if (map != null) map.remove(eventId); - } - - // ------------------------------------------------------------------------ - // Posting - // ------------------------------------------------------------------------ - public static void post(T event) { - dispatchEvent(event); // synchronous - } - - public static void postAsync(T event) { - long seq = RING_BUFFER.next(); - try { - EventHolder holder = RING_BUFFER.get(seq); - holder.event = event; - } finally { - RING_BUFFER.publish(seq); - } - } - - @SuppressWarnings("unchecked") - private static void callListener(ListenerHandler raw, EventType event) { - ListenerHandler handler = (ListenerHandler) raw; - Consumer listener = handler.getListener(); - - T casted = (T) event; - - listener.accept(casted); - } - - @SuppressWarnings("unchecked") - private static void dispatchEvent(EventType event) { - Class clazz = event.getClass(); - - logger.debug("Triggered event: {}", event.getClass().getSimpleName()); - - CopyOnWriteArrayList> classListeners = LISTENERS.get(clazz); - if (classListeners != null) { - for (ListenerHandler listener : classListeners) { - try { - callListener(listener, event); - } catch (Throwable e) { - logger.warn("Exception while handling event: {}", event, e); - } - } - } - - CopyOnWriteArrayList> genericListeners = LISTENERS.get(Object.class); - if (genericListeners != null) { - for (ListenerHandler listener : genericListeners) { - try { - callListener(listener, event); - } catch (Throwable e) { - logger.warn("Exception while handling event: {}", event, e); - } - } - } - - if (event instanceof UniqueEvent snowflakeEvent) { - Map> map = UUID_LISTENERS.get(clazz); - if (map != null) { - Consumer listener = - (Consumer) map.remove(snowflakeEvent.getIdentifier()); - if (listener != null) { - try { - listener.accept(snowflakeEvent); - } catch (Throwable ignored) { - } - } - } - } - } - - // ------------------------------------------------------------------------ - // Lifecycle - // ------------------------------------------------------------------------ - public static void shutdown() { - DISRUPTOR.shutdown(); - LISTENERS.clear(); - UUID_LISTENERS.clear(); - } - - public static void reset() { - LISTENERS.clear(); - UUID_LISTENERS.clear(); - } - - public static Map, CopyOnWriteArrayList>> getAllListeners() { - return LISTENERS; + public static EventBus get() { + return INSTANCE; } } diff --git a/framework/src/main/java/org/toop/framework/eventbus/OldGlobalEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/OldGlobalEventBus.java new file mode 100644 index 0000000..1f281a7 --- /dev/null +++ b/framework/src/main/java/org/toop/framework/eventbus/OldGlobalEventBus.java @@ -0,0 +1,159 @@ +package org.toop.framework.eventbus; + +import com.lmax.disruptor.*; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; + +import java.util.concurrent.*; +import java.util.function.Consumer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.toop.framework.eventbus.events.EventType; + +/** + * GlobalEventBus backed by the LMAX Disruptor for ultra-low latency, high-throughput event + * publishing. + */ +public final class OldGlobalEventBus { + private static final Logger logger = LogManager.getLogger(OldGlobalEventBus.class); + + /** Disruptor ring buffer size (must be power of two). */ + private static final int RING_BUFFER_SIZE = 1024 * 64; + + /** Disruptor instance. */ + private static final Disruptor DISRUPTOR; + + /** Ring buffer used for publishing events. */ + private static final RingBuffer RING_BUFFER; + + static { + ThreadFactory threadFactory = + r -> { + Thread t = new Thread(r, "EventBus-Disruptor"); + t.setDaemon(true); + return t; + }; + + DISRUPTOR = + new Disruptor<>( + EventHolder::new, + RING_BUFFER_SIZE, + threadFactory, + ProducerType.MULTI, + new BusySpinWaitStrategy()); + + DISRUPTOR.handleEventsWith( + (holder, seq, endOfBatch) -> { + if (holder.event != null) { + dispatchEvent(holder.event); + holder.event = null; + } + }); + + DISRUPTOR.start(); + RING_BUFFER = DISRUPTOR.getRingBuffer(); + } + + /** Prevent instantiation. */ + private OldGlobalEventBus() {} + + /** Wrapper used inside the ring buffer. */ + private static class EventHolder { + EventType event; + } + + /** Map of event class to type-specific listeners. */ + private static EventsHolder eventsHolder; + + public static void setEventsHolder(EventsHolder eventsHolder) { + if (OldGlobalEventBus.eventsHolder != null) return; + + OldGlobalEventBus.eventsHolder = eventsHolder; + } + + // ------------------------------------------------------------------------ + // Subscription + // ------------------------------------------------------------------------ + public static void subscribe(ListenerHandler listener) { + logger.debug("Subscribing to {}: {}", listener.getListenerClass().getSimpleName(), listener.getListener().getClass().getSimpleName()); + eventsHolder.add(listener); + } + + public static void unsubscribe(ListenerHandler listener) { + logger.debug("Unsubscribing from {}: {}", listener.getListenerClass().getSimpleName(), listener.getListener().getClass().getSimpleName()); + eventsHolder.remove(listener); + } + + // ------------------------------------------------------------------------ + // Posting + // ------------------------------------------------------------------------ + public static void post(T event) { + dispatchEvent(event); // synchronous + } + + public static void postAsync(T event) { + long seq = RING_BUFFER.next(); + try { + EventHolder holder = RING_BUFFER.get(seq); + holder.event = event; + } finally { + RING_BUFFER.publish(seq); + } + } + + @SuppressWarnings("unchecked") + private static void callListener(ListenerHandler raw, EventType event) { + ListenerHandler handler = (ListenerHandler) raw; + Consumer listener = handler.getListener(); + + T casted = (T) event; + + listener.accept(casted); + } + + @SuppressWarnings("unchecked") + private static void dispatchEvent(EventType event) { + Class clazz = event.getClass(); + + logger.debug("Triggered event: {}", event.getClass().getSimpleName()); + + CopyOnWriteArrayList> classListeners = (CopyOnWriteArrayList>) eventsHolder.get(clazz); + if (classListeners != null) { + for (ListenerHandler listener : classListeners) { + try { + callListener(listener, event); + } catch (Throwable e) { + logger.warn("Exception while handling event: {}", event, e); + } + } + } + + CopyOnWriteArrayList> genericListeners = (CopyOnWriteArrayList>) eventsHolder.get(Object.class); + if (genericListeners != null) { + for (ListenerHandler listener : genericListeners) { + try { + callListener(listener, event); + } catch (Throwable e) { + logger.warn("Exception while handling event: {}", event, e); + } + } + } + } + + // ------------------------------------------------------------------------ + // Lifecycle + // ------------------------------------------------------------------------ + public static void shutdown() { + DISRUPTOR.shutdown(); + eventsHolder.reset(); + } + + public static void reset() { + eventsHolder.reset(); + } + +// public static Map, CopyOnWriteArrayList>> getAllListeners() { +//// return LISTENERS; +// } +} diff --git a/framework/src/main/java/org/toop/framework/networking/NetworkingClientEventListener.java b/framework/src/main/java/org/toop/framework/networking/NetworkingClientEventListener.java index 661dc02..101dc06 100644 --- a/framework/src/main/java/org/toop/framework/networking/NetworkingClientEventListener.java +++ b/framework/src/main/java/org/toop/framework/networking/NetworkingClientEventListener.java @@ -40,6 +40,7 @@ public class NetworkingClientEventListener { void handleStartClient(NetworkEvents.StartClient event) { long clientId = SnowflakeGenerator.nextId(); + new EventFlow().addPostEvent(new NetworkEvents.CreatedIdForClient(clientId, event.identifier())).postEvent(); clientManager.startClient( clientId, event.networkingClient(), diff --git a/framework/src/main/java/org/toop/framework/networking/NetworkingClientManager.java b/framework/src/main/java/org/toop/framework/networking/NetworkingClientManager.java index 8357980..ad53dab 100644 --- a/framework/src/main/java/org/toop/framework/networking/NetworkingClientManager.java +++ b/framework/src/main/java/org/toop/framework/networking/NetworkingClientManager.java @@ -8,6 +8,7 @@ import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.toop.framework.eventbus.EventFlow; import org.toop.framework.eventbus.GlobalEventBus; import org.toop.framework.networking.events.NetworkEvents; import org.toop.framework.networking.exceptions.ClientNotFoundException; @@ -28,8 +29,16 @@ public class NetworkingClientManager implements org.toop.framework.networking.in Runnable onSuccess, Runnable onFailure ) { + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + EventFlow closeEvent = new EventFlow() + .listen( + NetworkEvents.CloseClient.class, + e -> { + if (e.clientId() == id) scheduler.shutdownNow(); + }, "close"); + Runnable connectTask = new Runnable() { int attempts = 0; @@ -46,7 +55,7 @@ public class NetworkingClientManager implements org.toop.framework.networking.in nClient.connect(id, nConnector.host(), nConnector.port()); networkClients.put(id, nClient); logger.info("New client started successfully for {}:{}", nConnector.host(), nConnector.port()); - GlobalEventBus.post(new NetworkEvents.ConnectTry(id, attempts, nConnector.reconnectAttempts(), true)); + GlobalEventBus.get().post(new NetworkEvents.ConnectTry(id, attempts, nConnector.reconnectAttempts(), true)); onSuccess.run(); scheduler.shutdown(); } catch (CouldNotConnectException e) { @@ -54,17 +63,17 @@ public class NetworkingClientManager implements org.toop.framework.networking.in if (attempts < nConnector.reconnectAttempts()) { logger.warn("Could not connect to {}:{}. Retrying in {} {}", nConnector.host(), nConnector.port(), nConnector.timeout(), nConnector.timeUnit()); - GlobalEventBus.post(new NetworkEvents.ConnectTry(id, attempts, nConnector.reconnectAttempts(), false)); + GlobalEventBus.get().post(new NetworkEvents.ConnectTry(id, attempts, nConnector.reconnectAttempts(), false)); scheduler.schedule(this, nConnector.timeout(), nConnector.timeUnit()); } else { logger.error("Failed to start client for {}:{} after {} attempts", nConnector.host(), nConnector.port(), attempts); - GlobalEventBus.post(new NetworkEvents.ConnectTry(id, -1, nConnector.reconnectAttempts(), false)); + GlobalEventBus.get().post(new NetworkEvents.ConnectTry(id, -1, nConnector.reconnectAttempts(), false)); onFailure.run(); scheduler.shutdown(); } } catch (Exception e) { logger.error("Unexpected exception during startClient", e); - GlobalEventBus.post(new NetworkEvents.ConnectTry(id, -1, nConnector.reconnectAttempts(), false)); + GlobalEventBus.get().post(new NetworkEvents.ConnectTry(id, -1, nConnector.reconnectAttempts(), false)); onFailure.run(); scheduler.shutdown(); } @@ -72,6 +81,8 @@ public class NetworkingClientManager implements org.toop.framework.networking.in }; scheduler.schedule(connectTask, 0, TimeUnit.MILLISECONDS); +// +// closeEvent.unsubscribe("close"); } @Override diff --git a/framework/src/main/java/org/toop/framework/networking/events/NetworkEvents.java b/framework/src/main/java/org/toop/framework/networking/events/NetworkEvents.java index f199634..891fb39 100644 --- a/framework/src/main/java/org/toop/framework/networking/events/NetworkEvents.java +++ b/framework/src/main/java/org/toop/framework/networking/events/NetworkEvents.java @@ -4,6 +4,7 @@ import java.util.*; import java.util.concurrent.CompletableFuture; import org.toop.annotations.AutoResponseResult; +import org.toop.framework.eventbus.GlobalEventBus; import org.toop.framework.eventbus.events.*; import org.toop.framework.networking.interfaces.NetworkingClient; import org.toop.framework.networking.types.NetworkingConnector; @@ -11,7 +12,7 @@ import org.toop.framework.networking.types.NetworkingConnector; /** * Defines all event types related to the networking subsystem. *

- * These events are used in conjunction with the {@link org.toop.framework.eventbus.GlobalEventBus} + * These events are used in conjunction with the {@link GlobalEventBus} * and {@link org.toop.framework.eventbus.EventFlow} to communicate between components * such as networking clients, managers, and listeners. *

@@ -166,6 +167,10 @@ public class NetworkEvents extends EventsBase { long identifier) implements UniqueEvent {} + public record CreatedIdForClient(long clientId, long identifier) implements ResponseToUniqueEvent {} + + public record ConnectTry(long clientId, int amount, int maxAmount, boolean success) implements GenericEvent {} + /** * Response confirming that a client has been successfully started. *

@@ -181,8 +186,6 @@ public class NetworkEvents extends EventsBase { public record StartClientResponse(long clientId, boolean successful, long identifier) implements ResponseToUniqueEvent {} - public record ConnectTry(long clientId, int amount, int maxAmount, boolean success) implements GenericEvent {} - /** * Requests reconnection of an existing client using its previous configuration. *