Single threaded eventbus

This commit is contained in:
lieght
2025-12-06 22:35:17 +01:00
parent e5ce3aa2d9
commit 43a39295dd
6 changed files with 101 additions and 9 deletions

View File

@@ -1,15 +1,16 @@
package org.toop.framework.eventbus;
import org.apache.logging.log4j.LogManager;
import org.toop.framework.eventbus.bus.DefaultEventBus;
import org.toop.framework.eventbus.bus.DisruptorEventBus;
import org.toop.framework.eventbus.holder.DisruptorEventsHolder;
import org.toop.framework.eventbus.bus.EventBus;
import org.toop.framework.eventbus.holder.SyncEventsHolder;
import org.toop.framework.eventbus.subscriber.Subscriber;
public class GlobalEventBus implements EventBus {
private static final EventBus INSTANCE = new DisruptorEventBus(
private static final EventBus INSTANCE = new DefaultEventBus(
LogManager.getLogger(DisruptorEventBus.class),
new DisruptorEventsHolder()
new SyncEventsHolder()
);
private GlobalEventBus() {}

View File

@@ -0,0 +1,58 @@
package org.toop.framework.eventbus.bus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.holder.EventsHolder;
import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
public class DefaultEventBus implements EventBus {
private final Logger logger;
private final EventsHolder eventsHolder;
public DefaultEventBus(Logger logger, EventsHolder eventsHolder) {
this.logger = logger;
this.eventsHolder = eventsHolder;
}
@Override
public void subscribe(Subscriber<?, ?> subscriber) {
eventsHolder.add(subscriber);
}
@Override
public void unsubscribe(Subscriber<?, ?> subscriber) {
eventsHolder.remove(subscriber);
}
@Override
@SuppressWarnings("unchecked")
public <T> void post(T event) {
Class<T> eventType = (Class<T>) event.getClass();
var subs = eventsHolder.get(eventType);
if (subs != null) {
List<Subscriber<?, ?>> snapshot = new ArrayList<>(subs);
for (Subscriber<?, ?> subscriber : snapshot) {
Class<T> eventClass = (Class<T>) subscriber.getEvent();
Consumer<EventType> action = (Consumer<EventType>) subscriber.getAction();
action.accept((EventType) eventClass.cast(event));
}
}
}
@Override
public void shutdown() {
eventsHolder.reset();
}
@Override
public void reset() {
eventsHolder.reset();
}
}

View File

@@ -4,8 +4,8 @@ import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.subscriber.Subscriber;
public interface EventBus {
void subscribe(Subscriber<?, ?> listener);
void unsubscribe(Subscriber<?, ?> listener);
void subscribe(Subscriber<?, ?> subscriber);
void unsubscribe(Subscriber<?, ?> subscriber);
<T> void post(T event);
void shutdown();
void reset();

View File

@@ -7,7 +7,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
public class DisruptorEventsHolder implements EventsHolder {
public class AsyncEventsHolder implements EventsHolder {
private final Map<Class<?>, CopyOnWriteArrayList<Subscriber<?, ?>>> LISTENERS = new ConcurrentHashMap<>();
@Override

View File

@@ -5,8 +5,8 @@ import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.List;
public interface EventsHolder {
void add(Subscriber<?, ?> listener);
void remove(Subscriber<?, ?> listener);
List<Subscriber<?, ?>> get(Class<?> listenerClass);
void add(Subscriber<?, ?> subscriber);
void remove(Subscriber<?, ?> subscriber);
List<Subscriber<?, ?>> get(Class<?> event);
void reset();
}

View File

@@ -0,0 +1,33 @@
package org.toop.framework.eventbus.holder;
import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SyncEventsHolder implements EventsHolder {
private final Map<Class<?>, List<Subscriber<?, ?>>> LISTENERS = new ConcurrentHashMap<>();
@Override
public void add(Subscriber<?, ?> sub) {
LISTENERS.computeIfAbsent(sub.getEvent(), _ -> new ArrayList<>()).add(sub);
}
@Override
public void remove(Subscriber<?, ?> sub) {
LISTENERS.getOrDefault(sub.getEvent(), new ArrayList<>()).remove(sub);
LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}
@Override
public List<Subscriber<?, ?>> get(Class<?> event) {
return LISTENERS.get(event);
}
@Override
public void reset() {
LISTENERS.clear();
}
}