diff --git a/framework/src/main/java/org/toop/framework/eventbus/DisruptorEventsHolder.java b/framework/src/main/java/org/toop/framework/eventbus/DisruptorEventsHolder.java deleted file mode 100644 index 53b4e4a..0000000 --- a/framework/src/main/java/org/toop/framework/eventbus/DisruptorEventsHolder.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.toop.framework.eventbus; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; - -public class DisruptorEventsHolder implements EventsHolder { - private final Map, CopyOnWriteArrayList>> LISTENERS = new ConcurrentHashMap<>(); - - @Override - public void add(ListenerHandler listener) { - LISTENERS.computeIfAbsent(listener.getListenerClass(), _ -> new CopyOnWriteArrayList<>()).add(listener); - } - - @Override - public void remove(ListenerHandler listener) { - LISTENERS.getOrDefault(listener.getListenerClass(), new CopyOnWriteArrayList<>()).remove(listener); - LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty()); - } - - @Override - public List> get(Class listenerClass) { - return LISTENERS.get(listenerClass); - } - - @Override - public void reset() { - LISTENERS.clear(); - } -} diff --git a/framework/src/main/java/org/toop/framework/eventbus/EventBus.java b/framework/src/main/java/org/toop/framework/eventbus/EventBus.java deleted file mode 100644 index 8e6d92f..0000000 --- a/framework/src/main/java/org/toop/framework/eventbus/EventBus.java +++ /dev/null @@ -1,11 +0,0 @@ -package org.toop.framework.eventbus; - -import org.toop.framework.eventbus.events.EventType; - -public interface EventBus { - void subscribe(ListenerHandler listener); - void unsubscribe(ListenerHandler listener); - void post(EventType event); - void shutdown(); - void reset(); -} diff --git a/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java b/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java index b6aacc6..dac2344 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java +++ b/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java @@ -13,10 +13,13 @@ import org.toop.framework.SnowflakeGenerator; import org.toop.framework.eventbus.events.EventType; import org.toop.framework.eventbus.events.ResponseToUniqueEvent; import org.toop.framework.eventbus.events.UniqueEvent; +import org.toop.framework.eventbus.bus.EventBus; +import org.toop.framework.eventbus.subscriber.DefaultSubscriber; +import org.toop.framework.eventbus.subscriber.Subscriber; /** * EventFlow is a utility class for creating, posting, and optionally subscribing to events in a - * type-safe and chainable manner. It is designed to work with the {@link GlobalEventBus}. + * type-safe and chainable manner. It is designed to work with the {@link EventBus}. * *

This class supports automatic UUID assignment for {@link UniqueEvent} events, and * allows filtering subscribers so they only respond to events with a specific UUID. All @@ -31,6 +34,8 @@ public class EventFlow { /** Cache of constructor handles for event classes to avoid repeated reflection lookups. */ private static final Map, MethodHandle> CONSTRUCTOR_CACHE = new ConcurrentHashMap<>(); + private final EventBus eventBus; + /** Automatically assigned UUID for {@link UniqueEvent} events. */ private long eventSnowflake = -1; @@ -38,13 +43,19 @@ public class EventFlow { private EventType event = null; /** The listener returned by GlobalEventBus subscription. Used for unsubscription. */ - private final List> listeners = new ArrayList<>(); + private final List> listeners = new ArrayList<>(); /** Holds the results returned from the subscribed event, if any. */ private Map result = null; /** Empty constructor (event must be added via {@link #addPostEvent(Class, Object...)}). */ - public EventFlow() {} + public EventFlow(EventBus eventBus) { + this.eventBus = eventBus; + } + + public EventFlow() { + this.eventBus = GlobalEventBus.get(); + } /** * @@ -145,21 +156,19 @@ public class EventFlow { action.accept(eventClass); - if (unsubscribeAfterSuccess) unsubscribe(id); + if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id)); this.result = eventClass.result(); }; - // TODO Remove casts - var listener = new ListenerHandler<>( - id, + var subscriber = new DefaultSubscriber<>( name, - (Class) event, - (Consumer) newAction + event, + newAction ); - GlobalEventBus.get().subscribe(listener); - this.listeners.add(listener); + eventBus.subscribe(subscriber); + this.listeners.add(subscriber); return this; } @@ -227,7 +236,7 @@ public class EventFlow { TT typedEvent = (TT) uuidEvent; action.accept(typedEvent); - if (unsubscribeAfterSuccess) unsubscribe(id); + if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id)); this.result = typedEvent.result(); } catch (ClassCastException _) { @@ -239,14 +248,13 @@ public class EventFlow { } }; - var listener = new ListenerHandler<>( - id, + var listener = new DefaultSubscriber<>( name, (Class) action.getClass().getDeclaredMethods()[0].getParameterTypes()[0], newAction ); - GlobalEventBus.get().subscribe(listener); + eventBus.subscribe(listener); this.listeners.add(listener); return this; } @@ -284,17 +292,16 @@ public class EventFlow { Consumer newAction = eventc -> { action.accept(eventc); - if (unsubscribeAfterSuccess) unsubscribe(id); + if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id)); }; - var listener = new ListenerHandler<>( - id, + var listener = new DefaultSubscriber<>( name, event, newAction ); - GlobalEventBus.get().subscribe(listener); + eventBus.subscribe(listener); this.listeners.add(listener); return this; } @@ -362,7 +369,7 @@ public class EventFlow { try { TT typedEvent = (TT) nonUuidEvent; action.accept(typedEvent); - if (unsubscribeAfterSuccess) unsubscribe(id); + if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id)); } catch (ClassCastException _) { throw new ClassCastException( "Cannot cast " @@ -371,14 +378,13 @@ public class EventFlow { } }; - var listener = new ListenerHandler<>( - id, + var listener = new DefaultSubscriber<>( name, eventClass, newAction ); - GlobalEventBus.get().subscribe(listener); + eventBus.subscribe(listener); this.listeners.add(listener); return this; } @@ -401,7 +407,7 @@ public class EventFlow { * Posts the event added through {@link #addPostEvent}. */ public EventFlow postEvent() { - GlobalEventBus.get().post(this.event); + eventBus.post(this.event); return this; } @@ -412,7 +418,7 @@ public class EventFlow { */ @Deprecated public EventFlow asyncPostEvent() { - GlobalEventBus.get().post(this.event); + eventBus.post(this.event); return this; } @@ -420,28 +426,12 @@ public class EventFlow { * * Unsubscribe from an event. * - * @param listenerObject The listener object to remove and unsubscribe. + * @param action The listener object to remove and unsubscribe. */ - public void unsubscribe(Object listenerObject) { + public void unsubscribe(Consumer action) { this.listeners.removeIf(handler -> { - if (handler.getListener() == listenerObject) { - GlobalEventBus.get().unsubscribe(handler); - return true; - } - return false; - }); - } - - /** - * - * Unsubscribe from an event. - * - * @param listenerId The id given to the {@link ListenerHandler}. - */ - public void unsubscribe(long listenerId) { - this.listeners.removeIf(handler -> { - if (handler.getId() == listenerId) { - GlobalEventBus.get().unsubscribe(handler); + if (handler.getAction().equals(action)) { + eventBus.unsubscribe(handler); return true; } return false; @@ -455,8 +445,8 @@ public class EventFlow { */ public void unsubscribe(String name) { this.listeners.removeIf(handler -> { - if (handler.getName().equals(name)) { - GlobalEventBus.get().unsubscribe(handler); + if (handler.getId().equals(name)) { + eventBus.unsubscribe(handler); return true; } return false; @@ -468,7 +458,7 @@ public class EventFlow { */ public void unsubscribeAll() { listeners.removeIf(handler -> { - GlobalEventBus.get().unsubscribe(handler); + eventBus.unsubscribe(handler); return true; }); } @@ -506,8 +496,8 @@ public class EventFlow { * * @return Copy of the list of listeners. */ - public ListenerHandler[] getListeners() { - return listeners.toArray(new ListenerHandler[0]); + public Subscriber[] getListeners() { + return listeners.toArray(new Subscriber[0]); } /** diff --git a/framework/src/main/java/org/toop/framework/eventbus/EventsHolder.java b/framework/src/main/java/org/toop/framework/eventbus/EventsHolder.java deleted file mode 100644 index fda14cc..0000000 --- a/framework/src/main/java/org/toop/framework/eventbus/EventsHolder.java +++ /dev/null @@ -1,10 +0,0 @@ -package org.toop.framework.eventbus; - -import java.util.List; - -public interface EventsHolder { - void add(ListenerHandler listener); - void remove(ListenerHandler listener); - List> get(Class listenerClass); - void reset(); -} diff --git a/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java index 1f8e480..4737db2 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java +++ b/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java @@ -1,17 +1,45 @@ package org.toop.framework.eventbus; import org.apache.logging.log4j.LogManager; -import org.toop.framework.eventbus.events.EventType; +import org.toop.framework.eventbus.bus.DisruptorEventBus; +import org.toop.framework.eventbus.holder.DisruptorEventsHolder; +import org.toop.framework.eventbus.bus.EventBus; +import org.toop.framework.eventbus.subscriber.Subscriber; -public class GlobalEventBus { - private static final EventBus INSTANCE = new DisruptorEventBus<>( +public class GlobalEventBus implements EventBus { + private static final EventBus INSTANCE = new DisruptorEventBus( LogManager.getLogger(DisruptorEventBus.class), new DisruptorEventsHolder() ); private GlobalEventBus() {} - public static EventBus get() { + public static EventBus get() { return INSTANCE; } + + @Override + public void subscribe(Subscriber listener) { + get().subscribe(listener); + } + + @Override + public void unsubscribe(Subscriber listener) { + get().unsubscribe(listener); + } + + @Override + public void post(T event) { + get().post(event); + } + + @Override + public void shutdown() { + get().shutdown(); + } + + @Override + public void reset() { + get().reset(); + } } diff --git a/framework/src/main/java/org/toop/framework/eventbus/ListenerHandler.java b/framework/src/main/java/org/toop/framework/eventbus/ListenerHandler.java deleted file mode 100644 index fe721ea..0000000 --- a/framework/src/main/java/org/toop/framework/eventbus/ListenerHandler.java +++ /dev/null @@ -1,48 +0,0 @@ -package org.toop.framework.eventbus; - -import org.toop.framework.SnowflakeGenerator; -import org.toop.framework.eventbus.events.EventType; - -import java.util.function.Consumer; - -public class ListenerHandler { - private final long id; - private final String name; - private final Class clazz; - private final Consumer listener; - - public ListenerHandler(long id, String name, Class clazz, Consumer listener) { - this.id = id; - this.name = name; - this.clazz = clazz; - this.listener = listener; - } - - public ListenerHandler(String name, Class clazz, Consumer listener) { - this(SnowflakeGenerator.nextId(), name, clazz, listener); - } - - public ListenerHandler(long id, Class clazz, Consumer listener) { - this(id, String.valueOf(id), clazz, listener); - } - - public ListenerHandler(Class clazz, Consumer listener) { - this(SnowflakeGenerator.nextId(), clazz, listener); - } - - public long getId() { - return id; - } - - public Consumer getListener() { - return listener; - } - - public Class getListenerClass() { - return clazz; - } - - public String getName() { - return name; - } -} diff --git a/framework/src/main/java/org/toop/framework/eventbus/DisruptorEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/bus/DisruptorEventBus.java similarity index 62% rename from framework/src/main/java/org/toop/framework/eventbus/DisruptorEventBus.java rename to framework/src/main/java/org/toop/framework/eventbus/bus/DisruptorEventBus.java index 8456a66..fc296db 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/DisruptorEventBus.java +++ b/framework/src/main/java/org/toop/framework/eventbus/bus/DisruptorEventBus.java @@ -1,26 +1,29 @@ -package org.toop.framework.eventbus; +package org.toop.framework.eventbus.bus; import com.lmax.disruptor.BusySpinWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import org.apache.logging.log4j.Logger; +import org.toop.framework.eventbus.subscriber.Subscriber; import org.toop.framework.eventbus.events.EventType; +import org.toop.framework.eventbus.holder.EventsHolder; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadFactory; +import java.util.function.Consumer; -public class DisruptorEventBus implements EventBus { +public class DisruptorEventBus implements EventBus { /** Wrapper used inside the ring buffer. */ - private static class EventHolder { - EventType event; + private static class EventHolder { + T event; } private final Logger logger; private final EventsHolder eventsHolder; - private final Disruptor disruptor; - private final RingBuffer ringBuffer; + private final Disruptor> disruptor; + private final RingBuffer> ringBuffer; public DisruptorEventBus(Logger logger, EventsHolder eventsHolder) { this.logger = logger; @@ -39,9 +42,9 @@ public class DisruptorEventBus implements EventBus { this.ringBuffer = disruptor.getRingBuffer(); } - private Disruptor getEventHolderDisruptor(ThreadFactory threadFactory) { + private Disruptor> getEventHolderDisruptor(ThreadFactory threadFactory) { int RING_BUFFER_SIZE = 1024 * 64; - Disruptor disruptor = new Disruptor<>( + Disruptor> disruptor = new Disruptor<>( EventHolder::new, RING_BUFFER_SIZE, threadFactory, @@ -59,20 +62,20 @@ public class DisruptorEventBus implements EventBus { } @Override - public void subscribe(ListenerHandler listener) { + public void subscribe(Subscriber listener) { eventsHolder.add(listener); } @Override - public void unsubscribe(ListenerHandler listener) { + public void unsubscribe(Subscriber listener) { eventsHolder.remove(listener); } @Override - public void post(EventType event) { + public void post(T event) { long seq = ringBuffer.next(); try { - EventHolder holder = ringBuffer.get(seq); + EventHolder holder = (EventHolder) ringBuffer.get(seq); holder.event = event; } finally { ringBuffer.publish(seq); @@ -90,10 +93,10 @@ public class DisruptorEventBus implements EventBus { eventsHolder.reset(); } - private void dispatchEvent(EventType event) { - CopyOnWriteArrayList> classListeners = (CopyOnWriteArrayList>) eventsHolder.get(event.getClass()); + private void dispatchEvent(T event) { + CopyOnWriteArrayList> classListeners = (CopyOnWriteArrayList>) eventsHolder.get(event.getClass()); if (classListeners != null) { - for (ListenerHandler listener : classListeners) { + for (Subscriber listener : classListeners) { try { callListener(listener, event); } catch (Throwable e) { @@ -102,9 +105,9 @@ public class DisruptorEventBus implements EventBus { } } - CopyOnWriteArrayList> genericListeners = (CopyOnWriteArrayList>) eventsHolder.get(Object.class); + CopyOnWriteArrayList> genericListeners = (CopyOnWriteArrayList>) eventsHolder.get(Object.class); if (genericListeners != null) { - for (ListenerHandler listener : genericListeners) { + for (Subscriber listener : genericListeners) { try { callListener(listener, event); } catch (Throwable e) { @@ -115,8 +118,11 @@ public class DisruptorEventBus implements EventBus { } - private static void callListener(ListenerHandler handler, EventType event) { - T casted = handler.getListenerClass().cast(event); - handler.getListener().accept(casted); + @SuppressWarnings("unchecked") + private void callListener(Subscriber subscriber, T event) { + Class eventClass = (Class) subscriber.getEvent(); + Consumer action = (Consumer) subscriber.getAction(); + + action.accept((EventType) eventClass.cast(event)); } } diff --git a/framework/src/main/java/org/toop/framework/eventbus/bus/EventBus.java b/framework/src/main/java/org/toop/framework/eventbus/bus/EventBus.java new file mode 100644 index 0000000..0a79c4e --- /dev/null +++ b/framework/src/main/java/org/toop/framework/eventbus/bus/EventBus.java @@ -0,0 +1,12 @@ +package org.toop.framework.eventbus.bus; + +import org.toop.framework.eventbus.events.EventType; +import org.toop.framework.eventbus.subscriber.Subscriber; + +public interface EventBus { + void subscribe(Subscriber listener); + void unsubscribe(Subscriber listener); + void post(T event); + void shutdown(); + void reset(); +} diff --git a/framework/src/main/java/org/toop/framework/eventbus/holder/DisruptorEventsHolder.java b/framework/src/main/java/org/toop/framework/eventbus/holder/DisruptorEventsHolder.java new file mode 100644 index 0000000..d58fea4 --- /dev/null +++ b/framework/src/main/java/org/toop/framework/eventbus/holder/DisruptorEventsHolder.java @@ -0,0 +1,33 @@ +package org.toop.framework.eventbus.holder; + +import org.toop.framework.eventbus.subscriber.Subscriber; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +public class DisruptorEventsHolder implements EventsHolder { + private final Map, CopyOnWriteArrayList>> LISTENERS = new ConcurrentHashMap<>(); + + @Override + public void add(Subscriber sub) { + LISTENERS.computeIfAbsent(sub.getEvent(), _ -> new CopyOnWriteArrayList<>()).add(sub); + } + + @Override + public void remove(Subscriber sub) { + LISTENERS.getOrDefault(sub.getEvent(), new CopyOnWriteArrayList<>()).remove(sub); + LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty()); + } + + @Override + public List> get(Class event) { + return LISTENERS.get(event); + } + + @Override + public void reset() { + LISTENERS.clear(); + } +} diff --git a/framework/src/main/java/org/toop/framework/eventbus/holder/EventsHolder.java b/framework/src/main/java/org/toop/framework/eventbus/holder/EventsHolder.java new file mode 100644 index 0000000..0c1976c --- /dev/null +++ b/framework/src/main/java/org/toop/framework/eventbus/holder/EventsHolder.java @@ -0,0 +1,12 @@ +package org.toop.framework.eventbus.holder; + +import org.toop.framework.eventbus.subscriber.Subscriber; + +import java.util.List; + +public interface EventsHolder { + void add(Subscriber listener); + void remove(Subscriber listener); + List> get(Class listenerClass); + void reset(); +} diff --git a/framework/src/main/java/org/toop/framework/eventbus/subscriber/DefaultSubscriber.java b/framework/src/main/java/org/toop/framework/eventbus/subscriber/DefaultSubscriber.java new file mode 100644 index 0000000..b6a02fc --- /dev/null +++ b/framework/src/main/java/org/toop/framework/eventbus/subscriber/DefaultSubscriber.java @@ -0,0 +1,30 @@ +package org.toop.framework.eventbus.subscriber; + +import java.util.function.Consumer; + +public class DefaultSubscriber implements Subscriber { + private final K id; + private final Class event; + private final Consumer action; + + public DefaultSubscriber(K id, Class eventClass, Consumer action) { + this.id = id; + this.event = eventClass; + this.action = action; + } + + @Override + public K getId() { + return id; + } + + @Override + public Class getEvent() { + return event; + } + + @Override + public Consumer getAction() { + return action; + } +} diff --git a/framework/src/main/java/org/toop/framework/eventbus/subscriber/Subscriber.java b/framework/src/main/java/org/toop/framework/eventbus/subscriber/Subscriber.java new file mode 100644 index 0000000..b921836 --- /dev/null +++ b/framework/src/main/java/org/toop/framework/eventbus/subscriber/Subscriber.java @@ -0,0 +1,9 @@ +package org.toop.framework.eventbus.subscriber; + +import java.util.function.Consumer; + +public interface Subscriber { + T getId(); + Class getEvent(); + Consumer getAction(); +}