2 Commits

Author SHA1 Message Date
lieght
c7f5d0c934 Renaming, refactor and type safety 2025-12-07 15:42:24 +01:00
lieght
f62219605f Rename 2025-12-07 15:04:02 +01:00
12 changed files with 33 additions and 47 deletions

View File

@@ -430,7 +430,7 @@ public class EventFlow {
*/
public void unsubscribe(Consumer<?> action) {
this.listeners.removeIf(handler -> {
if (handler.getAction().equals(action)) {
if (handler.handler().equals(action)) {
eventBus.unsubscribe(handler);
return true;
}
@@ -445,7 +445,7 @@ public class EventFlow {
*/
public void unsubscribe(String name) {
this.listeners.removeIf(handler -> {
if (handler.getId().equals(name)) {
if (handler.id().equals(name)) {
eventBus.unsubscribe(handler);
return true;
}

View File

@@ -3,13 +3,13 @@ package org.toop.framework.eventbus;
import org.apache.logging.log4j.LogManager;
import org.toop.framework.eventbus.bus.DisruptorEventBus;
import org.toop.framework.eventbus.bus.EventBus;
import org.toop.framework.eventbus.holder.AsyncArraySubscriberStore;
import org.toop.framework.eventbus.holder.DefaultSubscriberStore;
import org.toop.framework.eventbus.subscriber.Subscriber;
public class GlobalEventBus implements EventBus {
private static final EventBus INSTANCE = new DisruptorEventBus(
LogManager.getLogger(DisruptorEventBus.class),
new AsyncArraySubscriberStore()
new DefaultSubscriberStore()
);
private GlobalEventBus() {}

View File

@@ -33,8 +33,8 @@ public class DefaultEventBus implements EventBus {
var subs = eventsHolder.get(eventType);
if (subs != null) {
for (Subscriber<?, ?> subscriber : subs) {
Class<T> eventClass = (Class<T>) subscriber.getEvent();
Consumer<EventType> action = (Consumer<EventType>) subscriber.getAction();
Class<T> eventClass = (Class<T>) subscriber.event();
Consumer<EventType> action = (Consumer<EventType>) subscriber.handler();
action.accept((EventType) eventClass.cast(event));
}

View File

@@ -109,8 +109,8 @@ public class DisruptorEventBus implements EventBus {
@SuppressWarnings("unchecked")
private <T> void callListener(Subscriber<?, ?> subscriber, T event) {
Class<T> eventClass = (Class<T>) subscriber.getEvent();
Consumer<EventType> action = (Consumer<EventType>) subscriber.getAction();
Class<T> eventClass = (Class<T>) subscriber.event();
Consumer<EventType> action = (Consumer<EventType>) subscriber.handler();
action.accept((EventType) eventClass.cast(event));
}

View File

@@ -11,16 +11,16 @@ public class AsyncSubscriberStore implements SubscriberStore {
@Override
public void add(Subscriber<?, ?> sub) {
queues.computeIfAbsent(sub.getEvent(), _ -> new ConcurrentLinkedQueue<>()).add(sub);
rebuildSnapshot(sub.getEvent());
queues.computeIfAbsent(sub.event(), _ -> new ConcurrentLinkedQueue<>()).add(sub);
rebuildSnapshot(sub.event());
}
@Override
public void remove(Subscriber<?, ?> sub) {
ConcurrentLinkedQueue<Subscriber<?, ?>> queue = queues.get(sub.getEvent());
ConcurrentLinkedQueue<Subscriber<?, ?>> queue = queues.get(sub.event());
if (queue != null) {
queue.remove(sub);
rebuildSnapshot(sub.getEvent());
rebuildSnapshot(sub.event());
}
}

View File

@@ -4,7 +4,7 @@ import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.concurrent.ConcurrentHashMap;
public class AsyncArraySubscriberStore implements SubscriberStore {
public class DefaultSubscriberStore implements SubscriberStore {
private static final Subscriber<?, ?>[] EMPTY = new Subscriber[0];
@@ -13,7 +13,7 @@ public class AsyncArraySubscriberStore implements SubscriberStore {
@Override
public void add(Subscriber<?, ?> sub) {
listeners.compute(sub.getEvent(), (_, arr) -> {
listeners.compute(sub.event(), (_, arr) -> {
if (arr == null || arr.length == 0) {
return new Subscriber<?, ?>[]{sub};
}
@@ -28,7 +28,7 @@ public class AsyncArraySubscriberStore implements SubscriberStore {
@Override
public void remove(Subscriber<?, ?> sub) {
listeners.computeIfPresent(sub.getEvent(), (_, arr) -> {
listeners.computeIfPresent(sub.event(), (_, arr) -> {
int len = arr.length;
if (len == 1) {

View File

@@ -13,12 +13,12 @@ public class SyncSubscriberStore implements SubscriberStore {
@Override
public void add(Subscriber<?, ?> sub) {
LISTENERS.computeIfAbsent(sub.getEvent(), _ -> new ArrayList<>()).add(sub);
LISTENERS.computeIfAbsent(sub.event(), _ -> new ArrayList<>()).add(sub);
}
@Override
public void remove(Subscriber<?, ?> sub) {
LISTENERS.getOrDefault(sub.getEvent(), new ArrayList<>()).remove(sub);
LISTENERS.getOrDefault(sub.event(), new ArrayList<>()).remove(sub);
LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}

View File

@@ -2,29 +2,4 @@ package org.toop.framework.eventbus.subscriber;
import java.util.function.Consumer;
public class DefaultSubscriber<T, K> implements Subscriber<K, T> {
private final K id;
private final Class<T> event;
private final Consumer<T> action;
public DefaultSubscriber(K id, Class<T> eventClass, Consumer<T> action) {
this.id = id;
this.event = eventClass;
this.action = action;
}
@Override
public K getId() {
return id;
}
@Override
public Class<T> getEvent() {
return event;
}
@Override
public Consumer<T> getAction() {
return action;
}
}
public record DefaultSubscriber<K>(String id, Class<K> event, Consumer<K> handler) implements NamedSubscriber<K> {}

View File

@@ -0,0 +1,3 @@
package org.toop.framework.eventbus.subscriber;
public interface IdSubscriber<T> extends Subscriber<Long, T> {}

View File

@@ -0,0 +1,5 @@
package org.toop.framework.eventbus.subscriber;
import java.util.function.Consumer;
public record LongIdSubscriber<K>(Long id, Class<K> event, Consumer<K> handler) implements IdSubscriber<K> {}

View File

@@ -0,0 +1,3 @@
package org.toop.framework.eventbus.subscriber;
public interface NamedSubscriber<T> extends Subscriber<String, T> {}

View File

@@ -2,8 +2,8 @@ package org.toop.framework.eventbus.subscriber;
import java.util.function.Consumer;
public interface Subscriber<T, K> {
T getId();
Class<K> getEvent();
Consumer<K> getAction();
public interface Subscriber<ID, K> {
ID id();
Class<K> event();
Consumer<K> handler();
}