diff --git a/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java index 4737db2..1b3fac8 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java +++ b/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java @@ -1,15 +1,16 @@ package org.toop.framework.eventbus; import org.apache.logging.log4j.LogManager; +import org.toop.framework.eventbus.bus.DefaultEventBus; import org.toop.framework.eventbus.bus.DisruptorEventBus; -import org.toop.framework.eventbus.holder.DisruptorEventsHolder; import org.toop.framework.eventbus.bus.EventBus; +import org.toop.framework.eventbus.holder.SyncEventsHolder; import org.toop.framework.eventbus.subscriber.Subscriber; public class GlobalEventBus implements EventBus { - private static final EventBus INSTANCE = new DisruptorEventBus( + private static final EventBus INSTANCE = new DefaultEventBus( LogManager.getLogger(DisruptorEventBus.class), - new DisruptorEventsHolder() + new SyncEventsHolder() ); private GlobalEventBus() {} diff --git a/framework/src/main/java/org/toop/framework/eventbus/bus/DefaultEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/bus/DefaultEventBus.java new file mode 100644 index 0000000..43a4244 --- /dev/null +++ b/framework/src/main/java/org/toop/framework/eventbus/bus/DefaultEventBus.java @@ -0,0 +1,58 @@ +package org.toop.framework.eventbus.bus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.toop.framework.eventbus.events.EventType; +import org.toop.framework.eventbus.holder.EventsHolder; +import org.toop.framework.eventbus.subscriber.Subscriber; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +public class DefaultEventBus implements EventBus { + private final Logger logger; + private final EventsHolder eventsHolder; + + public DefaultEventBus(Logger logger, EventsHolder eventsHolder) { + this.logger = logger; + this.eventsHolder = eventsHolder; + } + + @Override + public void subscribe(Subscriber subscriber) { + eventsHolder.add(subscriber); + } + + @Override + public void unsubscribe(Subscriber subscriber) { + eventsHolder.remove(subscriber); + } + + @Override + @SuppressWarnings("unchecked") + public void post(T event) { + Class eventType = (Class) event.getClass(); + var subs = eventsHolder.get(eventType); + if (subs != null) { + List> snapshot = new ArrayList<>(subs); + + for (Subscriber subscriber : snapshot) { + Class eventClass = (Class) subscriber.getEvent(); + Consumer action = (Consumer) subscriber.getAction(); + + action.accept((EventType) eventClass.cast(event)); + } + } + } + + @Override + public void shutdown() { + eventsHolder.reset(); + } + + @Override + public void reset() { + eventsHolder.reset(); + } +} diff --git a/framework/src/main/java/org/toop/framework/eventbus/bus/EventBus.java b/framework/src/main/java/org/toop/framework/eventbus/bus/EventBus.java index 0a79c4e..5fef49a 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/bus/EventBus.java +++ b/framework/src/main/java/org/toop/framework/eventbus/bus/EventBus.java @@ -4,8 +4,8 @@ import org.toop.framework.eventbus.events.EventType; import org.toop.framework.eventbus.subscriber.Subscriber; public interface EventBus { - void subscribe(Subscriber listener); - void unsubscribe(Subscriber listener); + void subscribe(Subscriber subscriber); + void unsubscribe(Subscriber subscriber); void post(T event); void shutdown(); void reset(); diff --git a/framework/src/main/java/org/toop/framework/eventbus/holder/DisruptorEventsHolder.java b/framework/src/main/java/org/toop/framework/eventbus/holder/AsyncEventsHolder.java similarity index 93% rename from framework/src/main/java/org/toop/framework/eventbus/holder/DisruptorEventsHolder.java rename to framework/src/main/java/org/toop/framework/eventbus/holder/AsyncEventsHolder.java index d58fea4..cd7d0d3 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/holder/DisruptorEventsHolder.java +++ b/framework/src/main/java/org/toop/framework/eventbus/holder/AsyncEventsHolder.java @@ -7,7 +7,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -public class DisruptorEventsHolder implements EventsHolder { +public class AsyncEventsHolder implements EventsHolder { private final Map, CopyOnWriteArrayList>> LISTENERS = new ConcurrentHashMap<>(); @Override diff --git a/framework/src/main/java/org/toop/framework/eventbus/holder/EventsHolder.java b/framework/src/main/java/org/toop/framework/eventbus/holder/EventsHolder.java index 0c1976c..51493e3 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/holder/EventsHolder.java +++ b/framework/src/main/java/org/toop/framework/eventbus/holder/EventsHolder.java @@ -5,8 +5,8 @@ import org.toop.framework.eventbus.subscriber.Subscriber; import java.util.List; public interface EventsHolder { - void add(Subscriber listener); - void remove(Subscriber listener); - List> get(Class listenerClass); + void add(Subscriber subscriber); + void remove(Subscriber subscriber); + List> get(Class event); void reset(); } diff --git a/framework/src/main/java/org/toop/framework/eventbus/holder/SyncEventsHolder.java b/framework/src/main/java/org/toop/framework/eventbus/holder/SyncEventsHolder.java new file mode 100644 index 0000000..7620a33 --- /dev/null +++ b/framework/src/main/java/org/toop/framework/eventbus/holder/SyncEventsHolder.java @@ -0,0 +1,33 @@ +package org.toop.framework.eventbus.holder; + +import org.toop.framework.eventbus.subscriber.Subscriber; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class SyncEventsHolder implements EventsHolder { + private final Map, List>> LISTENERS = new ConcurrentHashMap<>(); + + @Override + public void add(Subscriber sub) { + LISTENERS.computeIfAbsent(sub.getEvent(), _ -> new ArrayList<>()).add(sub); + } + + @Override + public void remove(Subscriber sub) { + LISTENERS.getOrDefault(sub.getEvent(), new ArrayList<>()).remove(sub); + LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty()); + } + + @Override + public List> get(Class event) { + return LISTENERS.get(event); + } + + @Override + public void reset() { + LISTENERS.clear(); + } +}