Made events and handlers more generic

This commit is contained in:
lieght
2025-12-06 20:22:04 +01:00
parent d91e3a7457
commit 990cf1675c
12 changed files with 194 additions and 174 deletions

View File

@@ -1,31 +0,0 @@
package org.toop.framework.eventbus;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
public class DisruptorEventsHolder implements EventsHolder {
private final Map<Class<?>, CopyOnWriteArrayList<ListenerHandler<?>>> LISTENERS = new ConcurrentHashMap<>();
@Override
public void add(ListenerHandler<?> listener) {
LISTENERS.computeIfAbsent(listener.getListenerClass(), _ -> new CopyOnWriteArrayList<>()).add(listener);
}
@Override
public void remove(ListenerHandler<?> listener) {
LISTENERS.getOrDefault(listener.getListenerClass(), new CopyOnWriteArrayList<>()).remove(listener);
LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}
@Override
public List<ListenerHandler<?>> get(Class<?> listenerClass) {
return LISTENERS.get(listenerClass);
}
@Override
public void reset() {
LISTENERS.clear();
}
}

View File

@@ -1,11 +0,0 @@
package org.toop.framework.eventbus;
import org.toop.framework.eventbus.events.EventType;
public interface EventBus<E> {
void subscribe(ListenerHandler<? extends EventType> listener);
void unsubscribe(ListenerHandler<? extends EventType> listener);
void post(EventType event);
void shutdown();
void reset();
}

View File

@@ -13,10 +13,13 @@ import org.toop.framework.SnowflakeGenerator;
import org.toop.framework.eventbus.events.EventType; import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.events.ResponseToUniqueEvent; import org.toop.framework.eventbus.events.ResponseToUniqueEvent;
import org.toop.framework.eventbus.events.UniqueEvent; import org.toop.framework.eventbus.events.UniqueEvent;
import org.toop.framework.eventbus.bus.EventBus;
import org.toop.framework.eventbus.subscriber.DefaultSubscriber;
import org.toop.framework.eventbus.subscriber.Subscriber;
/** /**
* EventFlow is a utility class for creating, posting, and optionally subscribing to events in a * EventFlow is a utility class for creating, posting, and optionally subscribing to events in a
* type-safe and chainable manner. It is designed to work with the {@link GlobalEventBus}. * type-safe and chainable manner. It is designed to work with the {@link EventBus}.
* *
* <p>This class supports automatic UUID assignment for {@link UniqueEvent} events, and * <p>This class supports automatic UUID assignment for {@link UniqueEvent} events, and
* allows filtering subscribers so they only respond to events with a specific UUID. All * allows filtering subscribers so they only respond to events with a specific UUID. All
@@ -31,6 +34,8 @@ public class EventFlow {
/** Cache of constructor handles for event classes to avoid repeated reflection lookups. */ /** Cache of constructor handles for event classes to avoid repeated reflection lookups. */
private static final Map<Class<?>, MethodHandle> CONSTRUCTOR_CACHE = new ConcurrentHashMap<>(); private static final Map<Class<?>, MethodHandle> CONSTRUCTOR_CACHE = new ConcurrentHashMap<>();
private final EventBus eventBus;
/** Automatically assigned UUID for {@link UniqueEvent} events. */ /** Automatically assigned UUID for {@link UniqueEvent} events. */
private long eventSnowflake = -1; private long eventSnowflake = -1;
@@ -38,13 +43,19 @@ public class EventFlow {
private EventType event = null; private EventType event = null;
/** The listener returned by GlobalEventBus subscription. Used for unsubscription. */ /** The listener returned by GlobalEventBus subscription. Used for unsubscription. */
private final List<ListenerHandler<?>> listeners = new ArrayList<>(); private final List<Subscriber<?, ?>> listeners = new ArrayList<>();
/** Holds the results returned from the subscribed event, if any. */ /** Holds the results returned from the subscribed event, if any. */
private Map<String, ?> result = null; private Map<String, ?> result = null;
/** Empty constructor (event must be added via {@link #addPostEvent(Class, Object...)}). */ /** Empty constructor (event must be added via {@link #addPostEvent(Class, Object...)}). */
public EventFlow() {} public EventFlow(EventBus eventBus) {
this.eventBus = eventBus;
}
public EventFlow() {
this.eventBus = GlobalEventBus.get();
}
/** /**
* *
@@ -145,21 +156,19 @@ public class EventFlow {
action.accept(eventClass); action.accept(eventClass);
if (unsubscribeAfterSuccess) unsubscribe(id); if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id));
this.result = eventClass.result(); this.result = eventClass.result();
}; };
// TODO Remove casts var subscriber = new DefaultSubscriber<>(
var listener = new ListenerHandler<>(
id,
name, name,
(Class<ResponseToUniqueEvent>) event, event,
(Consumer<ResponseToUniqueEvent>) newAction newAction
); );
GlobalEventBus.get().subscribe(listener); eventBus.subscribe(subscriber);
this.listeners.add(listener); this.listeners.add(subscriber);
return this; return this;
} }
@@ -227,7 +236,7 @@ public class EventFlow {
TT typedEvent = (TT) uuidEvent; TT typedEvent = (TT) uuidEvent;
action.accept(typedEvent); action.accept(typedEvent);
if (unsubscribeAfterSuccess) unsubscribe(id); if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id));
this.result = typedEvent.result(); this.result = typedEvent.result();
} catch (ClassCastException _) { } catch (ClassCastException _) {
@@ -239,14 +248,13 @@ public class EventFlow {
} }
}; };
var listener = new ListenerHandler<>( var listener = new DefaultSubscriber<>(
id,
name, name,
(Class<TT>) action.getClass().getDeclaredMethods()[0].getParameterTypes()[0], (Class<TT>) action.getClass().getDeclaredMethods()[0].getParameterTypes()[0],
newAction newAction
); );
GlobalEventBus.get().subscribe(listener); eventBus.subscribe(listener);
this.listeners.add(listener); this.listeners.add(listener);
return this; return this;
} }
@@ -284,17 +292,16 @@ public class EventFlow {
Consumer<TT> newAction = eventc -> { Consumer<TT> newAction = eventc -> {
action.accept(eventc); action.accept(eventc);
if (unsubscribeAfterSuccess) unsubscribe(id); if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id));
}; };
var listener = new ListenerHandler<>( var listener = new DefaultSubscriber<>(
id,
name, name,
event, event,
newAction newAction
); );
GlobalEventBus.get().subscribe(listener); eventBus.subscribe(listener);
this.listeners.add(listener); this.listeners.add(listener);
return this; return this;
} }
@@ -362,7 +369,7 @@ public class EventFlow {
try { try {
TT typedEvent = (TT) nonUuidEvent; TT typedEvent = (TT) nonUuidEvent;
action.accept(typedEvent); action.accept(typedEvent);
if (unsubscribeAfterSuccess) unsubscribe(id); if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id));
} catch (ClassCastException _) { } catch (ClassCastException _) {
throw new ClassCastException( throw new ClassCastException(
"Cannot cast " "Cannot cast "
@@ -371,14 +378,13 @@ public class EventFlow {
} }
}; };
var listener = new ListenerHandler<>( var listener = new DefaultSubscriber<>(
id,
name, name,
eventClass, eventClass,
newAction newAction
); );
GlobalEventBus.get().subscribe(listener); eventBus.subscribe(listener);
this.listeners.add(listener); this.listeners.add(listener);
return this; return this;
} }
@@ -401,7 +407,7 @@ public class EventFlow {
* Posts the event added through {@link #addPostEvent}. * Posts the event added through {@link #addPostEvent}.
*/ */
public EventFlow postEvent() { public EventFlow postEvent() {
GlobalEventBus.get().post(this.event); eventBus.post(this.event);
return this; return this;
} }
@@ -412,7 +418,7 @@ public class EventFlow {
*/ */
@Deprecated @Deprecated
public EventFlow asyncPostEvent() { public EventFlow asyncPostEvent() {
GlobalEventBus.get().post(this.event); eventBus.post(this.event);
return this; return this;
} }
@@ -420,28 +426,12 @@ public class EventFlow {
* *
* Unsubscribe from an event. * Unsubscribe from an event.
* *
* @param listenerObject The listener object to remove and unsubscribe. * @param action The listener object to remove and unsubscribe.
*/ */
public void unsubscribe(Object listenerObject) { public void unsubscribe(Consumer<?> action) {
this.listeners.removeIf(handler -> { this.listeners.removeIf(handler -> {
if (handler.getListener() == listenerObject) { if (handler.getAction().equals(action)) {
GlobalEventBus.get().unsubscribe(handler); eventBus.unsubscribe(handler);
return true;
}
return false;
});
}
/**
*
* Unsubscribe from an event.
*
* @param listenerId The id given to the {@link ListenerHandler}.
*/
public void unsubscribe(long listenerId) {
this.listeners.removeIf(handler -> {
if (handler.getId() == listenerId) {
GlobalEventBus.get().unsubscribe(handler);
return true; return true;
} }
return false; return false;
@@ -455,8 +445,8 @@ public class EventFlow {
*/ */
public void unsubscribe(String name) { public void unsubscribe(String name) {
this.listeners.removeIf(handler -> { this.listeners.removeIf(handler -> {
if (handler.getName().equals(name)) { if (handler.getId().equals(name)) {
GlobalEventBus.get().unsubscribe(handler); eventBus.unsubscribe(handler);
return true; return true;
} }
return false; return false;
@@ -468,7 +458,7 @@ public class EventFlow {
*/ */
public void unsubscribeAll() { public void unsubscribeAll() {
listeners.removeIf(handler -> { listeners.removeIf(handler -> {
GlobalEventBus.get().unsubscribe(handler); eventBus.unsubscribe(handler);
return true; return true;
}); });
} }
@@ -506,8 +496,8 @@ public class EventFlow {
* *
* @return Copy of the list of listeners. * @return Copy of the list of listeners.
*/ */
public ListenerHandler[] getListeners() { public Subscriber<?, ?>[] getListeners() {
return listeners.toArray(new ListenerHandler[0]); return listeners.toArray(new Subscriber[0]);
} }
/** /**

View File

@@ -1,10 +0,0 @@
package org.toop.framework.eventbus;
import java.util.List;
public interface EventsHolder {
void add(ListenerHandler<?> listener);
void remove(ListenerHandler<?> listener);
List<ListenerHandler<?>> get(Class<?> listenerClass);
void reset();
}

View File

@@ -1,17 +1,45 @@
package org.toop.framework.eventbus; package org.toop.framework.eventbus;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.toop.framework.eventbus.events.EventType; import org.toop.framework.eventbus.bus.DisruptorEventBus;
import org.toop.framework.eventbus.holder.DisruptorEventsHolder;
import org.toop.framework.eventbus.bus.EventBus;
import org.toop.framework.eventbus.subscriber.Subscriber;
public class GlobalEventBus { public class GlobalEventBus implements EventBus {
private static final EventBus<EventType> INSTANCE = new DisruptorEventBus<>( private static final EventBus INSTANCE = new DisruptorEventBus(
LogManager.getLogger(DisruptorEventBus.class), LogManager.getLogger(DisruptorEventBus.class),
new DisruptorEventsHolder() new DisruptorEventsHolder()
); );
private GlobalEventBus() {} private GlobalEventBus() {}
public static EventBus<EventType> get() { public static EventBus get() {
return INSTANCE; return INSTANCE;
} }
@Override
public void subscribe(Subscriber<?, ?> listener) {
get().subscribe(listener);
}
@Override
public void unsubscribe(Subscriber<?, ?> listener) {
get().unsubscribe(listener);
}
@Override
public <T> void post(T event) {
get().post(event);
}
@Override
public void shutdown() {
get().shutdown();
}
@Override
public void reset() {
get().reset();
}
} }

View File

@@ -1,48 +0,0 @@
package org.toop.framework.eventbus;
import org.toop.framework.SnowflakeGenerator;
import org.toop.framework.eventbus.events.EventType;
import java.util.function.Consumer;
public class ListenerHandler<T extends EventType> {
private final long id;
private final String name;
private final Class<T> clazz;
private final Consumer<T> listener;
public ListenerHandler(long id, String name, Class<T> clazz, Consumer<T> listener) {
this.id = id;
this.name = name;
this.clazz = clazz;
this.listener = listener;
}
public ListenerHandler(String name, Class<T> clazz, Consumer<T> listener) {
this(SnowflakeGenerator.nextId(), name, clazz, listener);
}
public ListenerHandler(long id, Class<T> clazz, Consumer<T> listener) {
this(id, String.valueOf(id), clazz, listener);
}
public ListenerHandler(Class<T> clazz, Consumer<T> listener) {
this(SnowflakeGenerator.nextId(), clazz, listener);
}
public long getId() {
return id;
}
public Consumer<T> getListener() {
return listener;
}
public Class<T> getListenerClass() {
return clazz;
}
public String getName() {
return name;
}
}

View File

@@ -1,26 +1,29 @@
package org.toop.framework.eventbus; package org.toop.framework.eventbus.bus;
import com.lmax.disruptor.BusySpinWaitStrategy; import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.dsl.ProducerType;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.subscriber.Subscriber;
import org.toop.framework.eventbus.events.EventType; import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.holder.EventsHolder;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
public class DisruptorEventBus<T extends EventType> implements EventBus<T> { public class DisruptorEventBus implements EventBus {
/** Wrapper used inside the ring buffer. */ /** Wrapper used inside the ring buffer. */
private static class EventHolder { private static class EventHolder<T> {
EventType event; T event;
} }
private final Logger logger; private final Logger logger;
private final EventsHolder eventsHolder; private final EventsHolder eventsHolder;
private final Disruptor<EventHolder> disruptor; private final Disruptor<EventHolder<?>> disruptor;
private final RingBuffer<EventHolder> ringBuffer; private final RingBuffer<EventHolder<?>> ringBuffer;
public DisruptorEventBus(Logger logger, EventsHolder eventsHolder) { public DisruptorEventBus(Logger logger, EventsHolder eventsHolder) {
this.logger = logger; this.logger = logger;
@@ -39,9 +42,9 @@ public class DisruptorEventBus<T extends EventType> implements EventBus<T> {
this.ringBuffer = disruptor.getRingBuffer(); this.ringBuffer = disruptor.getRingBuffer();
} }
private Disruptor<EventHolder> getEventHolderDisruptor(ThreadFactory threadFactory) { private Disruptor<EventHolder<?>> getEventHolderDisruptor(ThreadFactory threadFactory) {
int RING_BUFFER_SIZE = 1024 * 64; int RING_BUFFER_SIZE = 1024 * 64;
Disruptor<EventHolder> disruptor = new Disruptor<>( Disruptor<EventHolder<?>> disruptor = new Disruptor<>(
EventHolder::new, EventHolder::new,
RING_BUFFER_SIZE, RING_BUFFER_SIZE,
threadFactory, threadFactory,
@@ -59,20 +62,20 @@ public class DisruptorEventBus<T extends EventType> implements EventBus<T> {
} }
@Override @Override
public void subscribe(ListenerHandler<? extends EventType> listener) { public void subscribe(Subscriber<?, ?> listener) {
eventsHolder.add(listener); eventsHolder.add(listener);
} }
@Override @Override
public void unsubscribe(ListenerHandler<? extends EventType> listener) { public void unsubscribe(Subscriber<?, ?> listener) {
eventsHolder.remove(listener); eventsHolder.remove(listener);
} }
@Override @Override
public void post(EventType event) { public <T> void post(T event) {
long seq = ringBuffer.next(); long seq = ringBuffer.next();
try { try {
EventHolder holder = ringBuffer.get(seq); EventHolder<T> holder = (EventHolder<T>) ringBuffer.get(seq);
holder.event = event; holder.event = event;
} finally { } finally {
ringBuffer.publish(seq); ringBuffer.publish(seq);
@@ -90,10 +93,10 @@ public class DisruptorEventBus<T extends EventType> implements EventBus<T> {
eventsHolder.reset(); eventsHolder.reset();
} }
private void dispatchEvent(EventType event) { private <T> void dispatchEvent(T event) {
CopyOnWriteArrayList<ListenerHandler<?>> classListeners = (CopyOnWriteArrayList<ListenerHandler<?>>) eventsHolder.get(event.getClass()); CopyOnWriteArrayList<Subscriber<?, ?>> classListeners = (CopyOnWriteArrayList<Subscriber<?, ?>>) eventsHolder.get(event.getClass());
if (classListeners != null) { if (classListeners != null) {
for (ListenerHandler<?> listener : classListeners) { for (Subscriber<?, ?> listener : classListeners) {
try { try {
callListener(listener, event); callListener(listener, event);
} catch (Throwable e) { } catch (Throwable e) {
@@ -102,9 +105,9 @@ public class DisruptorEventBus<T extends EventType> implements EventBus<T> {
} }
} }
CopyOnWriteArrayList<ListenerHandler<?>> genericListeners = (CopyOnWriteArrayList<ListenerHandler<?>>) eventsHolder.get(Object.class); CopyOnWriteArrayList<Subscriber<?, ?>> genericListeners = (CopyOnWriteArrayList<Subscriber<?, ?>>) eventsHolder.get(Object.class);
if (genericListeners != null) { if (genericListeners != null) {
for (ListenerHandler<?> listener : genericListeners) { for (Subscriber<?, ?> listener : genericListeners) {
try { try {
callListener(listener, event); callListener(listener, event);
} catch (Throwable e) { } catch (Throwable e) {
@@ -115,8 +118,11 @@ public class DisruptorEventBus<T extends EventType> implements EventBus<T> {
} }
private static <T extends EventType> void callListener(ListenerHandler<T> handler, EventType event) { @SuppressWarnings("unchecked")
T casted = handler.getListenerClass().cast(event); private <T> void callListener(Subscriber<?, ?> subscriber, T event) {
handler.getListener().accept(casted); Class<T> eventClass = (Class<T>) subscriber.getEvent();
Consumer<EventType> action = (Consumer<EventType>) subscriber.getAction();
action.accept((EventType) eventClass.cast(event));
} }
} }

View File

@@ -0,0 +1,12 @@
package org.toop.framework.eventbus.bus;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.subscriber.Subscriber;
public interface EventBus {
void subscribe(Subscriber<?, ?> listener);
void unsubscribe(Subscriber<?, ?> listener);
<T> void post(T event);
void shutdown();
void reset();
}

View File

@@ -0,0 +1,33 @@
package org.toop.framework.eventbus.holder;
import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
public class DisruptorEventsHolder implements EventsHolder {
private final Map<Class<?>, CopyOnWriteArrayList<Subscriber<?, ?>>> LISTENERS = new ConcurrentHashMap<>();
@Override
public void add(Subscriber<?, ?> sub) {
LISTENERS.computeIfAbsent(sub.getEvent(), _ -> new CopyOnWriteArrayList<>()).add(sub);
}
@Override
public void remove(Subscriber<?, ?> sub) {
LISTENERS.getOrDefault(sub.getEvent(), new CopyOnWriteArrayList<>()).remove(sub);
LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}
@Override
public List<Subscriber<?, ?>> get(Class<?> event) {
return LISTENERS.get(event);
}
@Override
public void reset() {
LISTENERS.clear();
}
}

View File

@@ -0,0 +1,12 @@
package org.toop.framework.eventbus.holder;
import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.List;
public interface EventsHolder {
void add(Subscriber<?, ?> listener);
void remove(Subscriber<?, ?> listener);
List<Subscriber<?, ?>> get(Class<?> listenerClass);
void reset();
}

View File

@@ -0,0 +1,30 @@
package org.toop.framework.eventbus.subscriber;
import java.util.function.Consumer;
public class DefaultSubscriber<T, K> implements Subscriber<K, T> {
private final K id;
private final Class<T> event;
private final Consumer<T> action;
public DefaultSubscriber(K id, Class<T> eventClass, Consumer<T> action) {
this.id = id;
this.event = eventClass;
this.action = action;
}
@Override
public K getId() {
return id;
}
@Override
public Class<T> getEvent() {
return event;
}
@Override
public Consumer<T> getAction() {
return action;
}
}

View File

@@ -0,0 +1,9 @@
package org.toop.framework.eventbus.subscriber;
import java.util.function.Consumer;
public interface Subscriber<T, K> {
T getId();
Class<K> getEvent();
Consumer<K> getAction();
}