diff --git a/framework/src/main/java/org/toop/framework/eventbus/DisruptorEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/DisruptorEventBus.java index e756c8f..8456a66 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/DisruptorEventBus.java +++ b/framework/src/main/java/org/toop/framework/eventbus/DisruptorEventBus.java @@ -4,6 +4,7 @@ import com.lmax.disruptor.BusySpinWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import org.apache.logging.log4j.Logger; import org.toop.framework.eventbus.events.EventType; import java.util.concurrent.CopyOnWriteArrayList; @@ -11,16 +12,18 @@ import java.util.concurrent.ThreadFactory; public class DisruptorEventBus implements EventBus { /** Wrapper used inside the ring buffer. */ - private class EventHolder { + private static class EventHolder { EventType event; } - EventsHolder eventsHolder; + private final Logger logger; + private final EventsHolder eventsHolder; private final Disruptor disruptor; private final RingBuffer ringBuffer; - public DisruptorEventBus(EventsHolder eventsHolder) { + public DisruptorEventBus(Logger logger, EventsHolder eventsHolder) { + this.logger = logger; this.eventsHolder = eventsHolder; ThreadFactory threadFactory = @@ -94,7 +97,7 @@ public class DisruptorEventBus implements EventBus { try { callListener(listener, event); } catch (Throwable e) { - // logger.warn("Exception while handling event: {}", event, e); TODO + logger.warn("Exception while handling event: {}", event, e); } } } @@ -105,7 +108,7 @@ public class DisruptorEventBus implements EventBus { try { callListener(listener, event); } catch (Throwable e) { - // logger.warn("Exception while handling event: {}", event, e); TODO + logger.warn("Exception while handling event: {}", event, e); } } } @@ -113,6 +116,7 @@ public class DisruptorEventBus implements EventBus { private static void callListener(ListenerHandler handler, EventType event) { - handler.getListener().accept((T) event); + T casted = handler.getListenerClass().cast(event); + handler.getListener().accept(casted); } } diff --git a/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java index a928ca0..1f8e480 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java +++ b/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java @@ -1,9 +1,13 @@ package org.toop.framework.eventbus; +import org.apache.logging.log4j.LogManager; import org.toop.framework.eventbus.events.EventType; public class GlobalEventBus { - private static final EventBus INSTANCE = new DisruptorEventBus<>(new DisruptorEventsHolder()); + private static final EventBus INSTANCE = new DisruptorEventBus<>( + LogManager.getLogger(DisruptorEventBus.class), + new DisruptorEventsHolder() + ); private GlobalEventBus() {} diff --git a/framework/src/main/java/org/toop/framework/eventbus/OldGlobalEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/OldGlobalEventBus.java deleted file mode 100644 index 1f281a7..0000000 --- a/framework/src/main/java/org/toop/framework/eventbus/OldGlobalEventBus.java +++ /dev/null @@ -1,159 +0,0 @@ -package org.toop.framework.eventbus; - -import com.lmax.disruptor.*; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; - -import java.util.concurrent.*; -import java.util.function.Consumer; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.toop.framework.eventbus.events.EventType; - -/** - * GlobalEventBus backed by the LMAX Disruptor for ultra-low latency, high-throughput event - * publishing. - */ -public final class OldGlobalEventBus { - private static final Logger logger = LogManager.getLogger(OldGlobalEventBus.class); - - /** Disruptor ring buffer size (must be power of two). */ - private static final int RING_BUFFER_SIZE = 1024 * 64; - - /** Disruptor instance. */ - private static final Disruptor DISRUPTOR; - - /** Ring buffer used for publishing events. */ - private static final RingBuffer RING_BUFFER; - - static { - ThreadFactory threadFactory = - r -> { - Thread t = new Thread(r, "EventBus-Disruptor"); - t.setDaemon(true); - return t; - }; - - DISRUPTOR = - new Disruptor<>( - EventHolder::new, - RING_BUFFER_SIZE, - threadFactory, - ProducerType.MULTI, - new BusySpinWaitStrategy()); - - DISRUPTOR.handleEventsWith( - (holder, seq, endOfBatch) -> { - if (holder.event != null) { - dispatchEvent(holder.event); - holder.event = null; - } - }); - - DISRUPTOR.start(); - RING_BUFFER = DISRUPTOR.getRingBuffer(); - } - - /** Prevent instantiation. */ - private OldGlobalEventBus() {} - - /** Wrapper used inside the ring buffer. */ - private static class EventHolder { - EventType event; - } - - /** Map of event class to type-specific listeners. */ - private static EventsHolder eventsHolder; - - public static void setEventsHolder(EventsHolder eventsHolder) { - if (OldGlobalEventBus.eventsHolder != null) return; - - OldGlobalEventBus.eventsHolder = eventsHolder; - } - - // ------------------------------------------------------------------------ - // Subscription - // ------------------------------------------------------------------------ - public static void subscribe(ListenerHandler listener) { - logger.debug("Subscribing to {}: {}", listener.getListenerClass().getSimpleName(), listener.getListener().getClass().getSimpleName()); - eventsHolder.add(listener); - } - - public static void unsubscribe(ListenerHandler listener) { - logger.debug("Unsubscribing from {}: {}", listener.getListenerClass().getSimpleName(), listener.getListener().getClass().getSimpleName()); - eventsHolder.remove(listener); - } - - // ------------------------------------------------------------------------ - // Posting - // ------------------------------------------------------------------------ - public static void post(T event) { - dispatchEvent(event); // synchronous - } - - public static void postAsync(T event) { - long seq = RING_BUFFER.next(); - try { - EventHolder holder = RING_BUFFER.get(seq); - holder.event = event; - } finally { - RING_BUFFER.publish(seq); - } - } - - @SuppressWarnings("unchecked") - private static void callListener(ListenerHandler raw, EventType event) { - ListenerHandler handler = (ListenerHandler) raw; - Consumer listener = handler.getListener(); - - T casted = (T) event; - - listener.accept(casted); - } - - @SuppressWarnings("unchecked") - private static void dispatchEvent(EventType event) { - Class clazz = event.getClass(); - - logger.debug("Triggered event: {}", event.getClass().getSimpleName()); - - CopyOnWriteArrayList> classListeners = (CopyOnWriteArrayList>) eventsHolder.get(clazz); - if (classListeners != null) { - for (ListenerHandler listener : classListeners) { - try { - callListener(listener, event); - } catch (Throwable e) { - logger.warn("Exception while handling event: {}", event, e); - } - } - } - - CopyOnWriteArrayList> genericListeners = (CopyOnWriteArrayList>) eventsHolder.get(Object.class); - if (genericListeners != null) { - for (ListenerHandler listener : genericListeners) { - try { - callListener(listener, event); - } catch (Throwable e) { - logger.warn("Exception while handling event: {}", event, e); - } - } - } - } - - // ------------------------------------------------------------------------ - // Lifecycle - // ------------------------------------------------------------------------ - public static void shutdown() { - DISRUPTOR.shutdown(); - eventsHolder.reset(); - } - - public static void reset() { - eventsHolder.reset(); - } - -// public static Map, CopyOnWriteArrayList>> getAllListeners() { -//// return LISTENERS; -// } -}