From a9c99df5d257ae00856ccb854c696faa1ace7984 Mon Sep 17 00:00:00 2001 From: Bas de Jong Date: Tue, 9 Dec 2025 21:07:30 +0100 Subject: [PATCH] Better limits to generic acceptance --- app/src/main/java/org/toop/app/App.java | 12 ++++------ .../toop/framework/eventbus/EventFlow.java | 15 ++++++------ .../framework/eventbus/GlobalEventBus.java | 7 +++--- .../eventbus/bus/DefaultEventBus.java | 8 +++---- .../eventbus/bus/DisruptorEventBus.java | 20 ++++++++-------- .../toop/framework/eventbus/bus/EventBus.java | 7 +++--- .../eventbus/store/AsyncSubscriberStore.java | 23 +++++++++--------- .../store/DefaultSubscriberStore.java | 24 ++++++++++--------- .../eventbus/store/SubscriberStore.java | 7 +++--- .../eventbus/store/SyncSubscriberStore.java | 13 +++++----- .../subscriber/DefaultNamedSubscriber.java | 8 +++++++ .../subscriber/DefaultSubscriber.java | 5 +++- .../framework/eventbus/subscriber/HasId.java | 5 ++++ .../eventbus/subscriber/IdSubscriber.java | 4 +++- .../eventbus/subscriber/LongIdSubscriber.java | 5 +++- .../eventbus/subscriber/NamedSubscriber.java | 4 +++- .../eventbus/subscriber/Subscriber.java | 5 ++-- 17 files changed, 101 insertions(+), 71 deletions(-) create mode 100644 framework/src/main/java/org/toop/framework/eventbus/subscriber/DefaultNamedSubscriber.java create mode 100644 framework/src/main/java/org/toop/framework/eventbus/subscriber/HasId.java diff --git a/app/src/main/java/org/toop/app/App.java b/app/src/main/java/org/toop/app/App.java index 3557848..5d99acd 100644 --- a/app/src/main/java/org/toop/app/App.java +++ b/app/src/main/java/org/toop/app/App.java @@ -3,7 +3,6 @@ package org.toop.app; import javafx.application.Platform; import javafx.scene.input.KeyCode; import javafx.scene.input.KeyCodeCombination; -import javafx.scene.input.KeyCombination; import javafx.scene.input.KeyEvent; import org.toop.app.widget.Primitive; @@ -112,20 +111,19 @@ public final class App extends Application { Platform.runLater(() -> stage.setOpacity(1.0)); } - Platform.runLater(() -> loading.setMaxAmount(e.isLoadingAmount())); - Platform.runLater(() -> { + loading.setMaxAmount(e.isLoadingAmount()); try { loading.setAmount(e.hasLoadedAmount()); } catch (Exception ex) { throw new RuntimeException(ex); } + if (e.hasLoadedAmount() >= e.isLoadingAmount()-1) { + Platform.runLater(loading::triggerSuccess); + loadingFlow.unsubscribe("init_loading"); + } }); - if (e.hasLoadedAmount() >= e.isLoadingAmount()) { - Platform.runLater(loading::triggerSuccess); - loadingFlow.unsubscribe("init_loading"); - } }, false, "init_loading"); diff --git a/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java b/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java index 404db01..a9ffd9d 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java +++ b/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java @@ -14,7 +14,8 @@ import org.toop.framework.eventbus.events.EventType; import org.toop.framework.eventbus.events.ResponseToUniqueEvent; import org.toop.framework.eventbus.events.UniqueEvent; import org.toop.framework.eventbus.bus.EventBus; -import org.toop.framework.eventbus.subscriber.DefaultSubscriber; +import org.toop.framework.eventbus.subscriber.DefaultNamedSubscriber; +import org.toop.framework.eventbus.subscriber.NamedSubscriber; import org.toop.framework.eventbus.subscriber.Subscriber; /** @@ -43,7 +44,7 @@ public class EventFlow { private EventType event = null; /** The listener returned by GlobalEventBus subscription. Used for unsubscription. */ - private final List> listeners = new ArrayList<>(); + private final List> listeners = new ArrayList<>(); /** Holds the results returned from the subscribed event, if any. */ private Map result = null; @@ -161,7 +162,7 @@ public class EventFlow { this.result = eventClass.result(); }; - var subscriber = new DefaultSubscriber<>( + var subscriber = new DefaultNamedSubscriber<>( name, event, newAction @@ -248,7 +249,7 @@ public class EventFlow { } }; - var listener = new DefaultSubscriber<>( + var listener = new DefaultNamedSubscriber<>( name, (Class) action.getClass().getDeclaredMethods()[0].getParameterTypes()[0], newAction @@ -295,7 +296,7 @@ public class EventFlow { if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id)); }; - var listener = new DefaultSubscriber<>( + var listener = new DefaultNamedSubscriber<>( name, event, newAction @@ -378,7 +379,7 @@ public class EventFlow { } }; - var listener = new DefaultSubscriber<>( + var listener = new DefaultNamedSubscriber<>( name, eventClass, newAction @@ -496,7 +497,7 @@ public class EventFlow { * * @return Copy of the list of listeners. */ - public Subscriber[] getListeners() { + public Subscriber[] getListeners() { return listeners.toArray(new Subscriber[0]); } diff --git a/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java index bac36c7..f70f751 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java +++ b/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java @@ -3,6 +3,7 @@ package org.toop.framework.eventbus; import org.apache.logging.log4j.LogManager; import org.toop.framework.eventbus.bus.DisruptorEventBus; import org.toop.framework.eventbus.bus.EventBus; +import org.toop.framework.eventbus.events.EventType; import org.toop.framework.eventbus.store.DefaultSubscriberStore; import org.toop.framework.eventbus.subscriber.Subscriber; @@ -19,17 +20,17 @@ public class GlobalEventBus implements EventBus { } @Override - public void subscribe(Subscriber listener) { + public void subscribe(Subscriber listener) { INSTANCE.subscribe(listener); } @Override - public void unsubscribe(Subscriber listener) { + public void unsubscribe(Subscriber listener) { INSTANCE.unsubscribe(listener); } @Override - public void post(T event) { + public void post(T event) { INSTANCE.post(event); } diff --git a/framework/src/main/java/org/toop/framework/eventbus/bus/DefaultEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/bus/DefaultEventBus.java index 7aed398..7b77ff3 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/bus/DefaultEventBus.java +++ b/framework/src/main/java/org/toop/framework/eventbus/bus/DefaultEventBus.java @@ -17,22 +17,22 @@ public class DefaultEventBus implements EventBus { } @Override - public void subscribe(Subscriber subscriber) { + public void subscribe(Subscriber subscriber) { eventsHolder.add(subscriber); } @Override - public void unsubscribe(Subscriber subscriber) { + public void unsubscribe(Subscriber subscriber) { eventsHolder.remove(subscriber); } @Override @SuppressWarnings("unchecked") - public void post(T event) { + public void post(T event) { Class eventType = (Class) event.getClass(); var subs = eventsHolder.get(eventType); if (subs != null) { - for (Subscriber subscriber : subs) { + for (Subscriber subscriber : subs) { Class eventClass = (Class) subscriber.event(); Consumer action = (Consumer) subscriber.handler(); diff --git a/framework/src/main/java/org/toop/framework/eventbus/bus/DisruptorEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/bus/DisruptorEventBus.java index ec0e1bd..43efc5c 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/bus/DisruptorEventBus.java +++ b/framework/src/main/java/org/toop/framework/eventbus/bus/DisruptorEventBus.java @@ -21,8 +21,8 @@ public class DisruptorEventBus implements EventBus { private final Logger logger; private final SubscriberStore eventsHolder; - private final Disruptor> disruptor; - private final RingBuffer> ringBuffer; + private final Disruptor> disruptor; + private final RingBuffer> ringBuffer; public DisruptorEventBus(Logger logger, SubscriberStore eventsHolder) { this.logger = logger; @@ -41,9 +41,9 @@ public class DisruptorEventBus implements EventBus { this.ringBuffer = disruptor.getRingBuffer(); } - private Disruptor> getEventHolderDisruptor(ThreadFactory threadFactory) { + private Disruptor> getEventHolderDisruptor(ThreadFactory threadFactory) { int RING_BUFFER_SIZE = 1024 * 64; - Disruptor> disruptor = new Disruptor<>( + Disruptor> disruptor = new Disruptor<>( EventHolder::new, RING_BUFFER_SIZE, threadFactory, @@ -61,17 +61,17 @@ public class DisruptorEventBus implements EventBus { } @Override - public void subscribe(Subscriber listener) { + public void subscribe(Subscriber listener) { eventsHolder.add(listener); } @Override - public void unsubscribe(Subscriber listener) { + public void unsubscribe(Subscriber listener) { eventsHolder.remove(listener); } @Override - public void post(T event) { + public void post(T event) { long seq = ringBuffer.next(); try { @SuppressWarnings("unchecked") @@ -93,10 +93,10 @@ public class DisruptorEventBus implements EventBus { eventsHolder.reset(); } - private void dispatchEvent(T event) { + private void dispatchEvent(T event) { var classListeners = eventsHolder.get(event.getClass()); if (classListeners != null) { - for (Subscriber listener : classListeners) { + for (Subscriber listener : classListeners) { try { callListener(listener, event); } catch (Throwable e) { @@ -108,7 +108,7 @@ public class DisruptorEventBus implements EventBus { @SuppressWarnings("unchecked") - private void callListener(Subscriber subscriber, T event) { + private void callListener(Subscriber subscriber, T event) { Class eventClass = (Class) subscriber.event(); Consumer action = (Consumer) subscriber.handler(); diff --git a/framework/src/main/java/org/toop/framework/eventbus/bus/EventBus.java b/framework/src/main/java/org/toop/framework/eventbus/bus/EventBus.java index f023a39..acb35f8 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/bus/EventBus.java +++ b/framework/src/main/java/org/toop/framework/eventbus/bus/EventBus.java @@ -1,11 +1,12 @@ package org.toop.framework.eventbus.bus; +import org.toop.framework.eventbus.events.EventType; import org.toop.framework.eventbus.subscriber.Subscriber; public interface EventBus { - void subscribe(Subscriber subscriber); - void unsubscribe(Subscriber subscriber); - void post(T event); + void subscribe(Subscriber subscriber); + void unsubscribe(Subscriber subscriber); + void post(T event); void shutdown(); void reset(); } diff --git a/framework/src/main/java/org/toop/framework/eventbus/store/AsyncSubscriberStore.java b/framework/src/main/java/org/toop/framework/eventbus/store/AsyncSubscriberStore.java index 234f285..157856c 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/store/AsyncSubscriberStore.java +++ b/framework/src/main/java/org/toop/framework/eventbus/store/AsyncSubscriberStore.java @@ -1,23 +1,24 @@ package org.toop.framework.eventbus.store; +import org.toop.framework.eventbus.events.EventType; import org.toop.framework.eventbus.subscriber.Subscriber; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class AsyncSubscriberStore implements SubscriberStore { - private final ConcurrentHashMap, ConcurrentLinkedQueue>> queues = new ConcurrentHashMap<>(); - private final ConcurrentHashMap, Subscriber[]> snapshots = new ConcurrentHashMap<>(); + private final ConcurrentHashMap, ConcurrentLinkedQueue>> queues = new ConcurrentHashMap<>(); + private final ConcurrentHashMap, Subscriber[]> snapshots = new ConcurrentHashMap<>(); @Override - public void add(Subscriber sub) { + public void add(Subscriber sub) { queues.computeIfAbsent(sub.event(), _ -> new ConcurrentLinkedQueue<>()).add(sub); rebuildSnapshot(sub.event()); } @Override - public void remove(Subscriber sub) { - ConcurrentLinkedQueue> queue = queues.get(sub.event()); + public void remove(Subscriber sub) { + ConcurrentLinkedQueue> queue = queues.get(sub.event()); if (queue != null) { queue.remove(sub); rebuildSnapshot(sub.event()); @@ -25,8 +26,8 @@ public class AsyncSubscriberStore implements SubscriberStore { } @Override - public Subscriber[] get(Class event) { - return snapshots.getOrDefault(event, new Subscriber[0]); + public Subscriber[] get(Class event) { + return snapshots.getOrDefault(event, new Subscriber[0]); } @Override @@ -35,12 +36,12 @@ public class AsyncSubscriberStore implements SubscriberStore { snapshots.clear(); } - private void rebuildSnapshot(Class event) { - ConcurrentLinkedQueue> queue = queues.get(event); + private void rebuildSnapshot(Class event) { + ConcurrentLinkedQueue> queue = queues.get(event); if (queue != null) { - snapshots.put(event, queue.toArray(new Subscriber[0])); + snapshots.put(event, queue.toArray(new Subscriber[0])); } else { - snapshots.put(event, new Subscriber[0]); + snapshots.put(event, new Subscriber[0]); } } } \ No newline at end of file diff --git a/framework/src/main/java/org/toop/framework/eventbus/store/DefaultSubscriberStore.java b/framework/src/main/java/org/toop/framework/eventbus/store/DefaultSubscriberStore.java index 5f4a00b..3db573b 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/store/DefaultSubscriberStore.java +++ b/framework/src/main/java/org/toop/framework/eventbus/store/DefaultSubscriberStore.java @@ -1,25 +1,27 @@ package org.toop.framework.eventbus.store; +import org.toop.framework.eventbus.events.EventType; +import org.toop.framework.eventbus.subscriber.NamedSubscriber; import org.toop.framework.eventbus.subscriber.Subscriber; import java.util.concurrent.ConcurrentHashMap; public class DefaultSubscriberStore implements SubscriberStore { - private static final Subscriber[] EMPTY = new Subscriber[0]; + private static final Subscriber[] EMPTY = new Subscriber[0]; - private final ConcurrentHashMap, Subscriber[]> listeners = - new ConcurrentHashMap<>(); + private final ConcurrentHashMap, Subscriber[]> + listeners = new ConcurrentHashMap<>(); @Override - public void add(Subscriber sub) { + public void add(Subscriber sub) { listeners.compute(sub.event(), (_, arr) -> { if (arr == null || arr.length == 0) { - return new Subscriber[]{sub}; + return new Subscriber[]{sub}; } int len = arr.length; - Subscriber[] newArr = new Subscriber[len + 1]; + Subscriber[] newArr = new Subscriber[len + 1]; System.arraycopy(arr, 0, newArr, 0, len); newArr[len] = sub; return newArr; @@ -27,7 +29,7 @@ public class DefaultSubscriberStore implements SubscriberStore { } @Override - public void remove(Subscriber sub) { + public void remove(Subscriber sub) { listeners.computeIfPresent(sub.event(), (_, arr) -> { int len = arr.length; @@ -36,7 +38,7 @@ public class DefaultSubscriberStore implements SubscriberStore { } int keep = 0; - for (Subscriber s : arr) { + for (Subscriber s : arr) { if (!s.equals(sub)) keep++; } @@ -47,9 +49,9 @@ public class DefaultSubscriberStore implements SubscriberStore { return null; } - Subscriber[] newArr = new Subscriber[keep]; + Subscriber[] newArr = new Subscriber[keep]; int i = 0; - for (Subscriber s : arr) { + for (Subscriber s : arr) { if (!s.equals(sub)) { newArr[i++] = s; } @@ -60,7 +62,7 @@ public class DefaultSubscriberStore implements SubscriberStore { } @Override - public Subscriber[] get(Class event) { + public Subscriber[] get(Class event) { return listeners.getOrDefault(event, EMPTY); } diff --git a/framework/src/main/java/org/toop/framework/eventbus/store/SubscriberStore.java b/framework/src/main/java/org/toop/framework/eventbus/store/SubscriberStore.java index fc38721..0d2da15 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/store/SubscriberStore.java +++ b/framework/src/main/java/org/toop/framework/eventbus/store/SubscriberStore.java @@ -1,10 +1,11 @@ package org.toop.framework.eventbus.store; +import org.toop.framework.eventbus.events.EventType; import org.toop.framework.eventbus.subscriber.Subscriber; public interface SubscriberStore { - void add(Subscriber subscriber); - void remove(Subscriber subscriber); - Subscriber[] get(Class event); + void add(Subscriber subscriber); + void remove(Subscriber subscriber); + Subscriber[] get(Class event); void reset(); } diff --git a/framework/src/main/java/org/toop/framework/eventbus/store/SyncSubscriberStore.java b/framework/src/main/java/org/toop/framework/eventbus/store/SyncSubscriberStore.java index 5de873c..69fcb4e 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/store/SyncSubscriberStore.java +++ b/framework/src/main/java/org/toop/framework/eventbus/store/SyncSubscriberStore.java @@ -1,5 +1,6 @@ package org.toop.framework.eventbus.store; +import org.toop.framework.eventbus.events.EventType; import org.toop.framework.eventbus.subscriber.Subscriber; import java.util.ArrayList; @@ -8,23 +9,23 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class SyncSubscriberStore implements SubscriberStore { - private final Map, List>> LISTENERS = new ConcurrentHashMap<>(); - private static final Subscriber[] EMPTY = new Subscriber[0]; + private final Map, List>> LISTENERS = new ConcurrentHashMap<>(); + private static final Subscriber[] EMPTY = new Subscriber[0]; @Override - public void add(Subscriber sub) { + public void add(Subscriber sub) { LISTENERS.computeIfAbsent(sub.event(), _ -> new ArrayList<>()).add(sub); } @Override - public void remove(Subscriber sub) { + public void remove(Subscriber sub) { LISTENERS.getOrDefault(sub.event(), new ArrayList<>()).remove(sub); LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty()); } @Override - public Subscriber[] get(Class event) { - List> list = LISTENERS.get(event); + public Subscriber[] get(Class event) { + List> list = LISTENERS.get(event); if (list == null || list.isEmpty()) return EMPTY; return list.toArray(EMPTY); } diff --git a/framework/src/main/java/org/toop/framework/eventbus/subscriber/DefaultNamedSubscriber.java b/framework/src/main/java/org/toop/framework/eventbus/subscriber/DefaultNamedSubscriber.java new file mode 100644 index 0000000..0829221 --- /dev/null +++ b/framework/src/main/java/org/toop/framework/eventbus/subscriber/DefaultNamedSubscriber.java @@ -0,0 +1,8 @@ +package org.toop.framework.eventbus.subscriber; + +import org.toop.framework.eventbus.events.EventType; + +import java.util.function.Consumer; + +public record DefaultNamedSubscriber(String id, Class event, Consumer handler) + implements NamedSubscriber {} diff --git a/framework/src/main/java/org/toop/framework/eventbus/subscriber/DefaultSubscriber.java b/framework/src/main/java/org/toop/framework/eventbus/subscriber/DefaultSubscriber.java index 47f5646..5afa71e 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/subscriber/DefaultSubscriber.java +++ b/framework/src/main/java/org/toop/framework/eventbus/subscriber/DefaultSubscriber.java @@ -1,5 +1,8 @@ package org.toop.framework.eventbus.subscriber; +import org.toop.framework.eventbus.events.EventType; + import java.util.function.Consumer; -public record DefaultSubscriber(String id, Class event, Consumer handler) implements NamedSubscriber {} +public record DefaultSubscriber(Class event, Consumer handler) implements Subscriber {} + diff --git a/framework/src/main/java/org/toop/framework/eventbus/subscriber/HasId.java b/framework/src/main/java/org/toop/framework/eventbus/subscriber/HasId.java new file mode 100644 index 0000000..089e206 --- /dev/null +++ b/framework/src/main/java/org/toop/framework/eventbus/subscriber/HasId.java @@ -0,0 +1,5 @@ +package org.toop.framework.eventbus.subscriber; + +public interface HasId { + ID id(); +} diff --git a/framework/src/main/java/org/toop/framework/eventbus/subscriber/IdSubscriber.java b/framework/src/main/java/org/toop/framework/eventbus/subscriber/IdSubscriber.java index 656974a..fc4c7ef 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/subscriber/IdSubscriber.java +++ b/framework/src/main/java/org/toop/framework/eventbus/subscriber/IdSubscriber.java @@ -1,3 +1,5 @@ package org.toop.framework.eventbus.subscriber; -public interface IdSubscriber extends Subscriber {} +import org.toop.framework.eventbus.events.EventType; + +public interface IdSubscriber extends Subscriber, HasId {} diff --git a/framework/src/main/java/org/toop/framework/eventbus/subscriber/LongIdSubscriber.java b/framework/src/main/java/org/toop/framework/eventbus/subscriber/LongIdSubscriber.java index 0b105d4..90ec1ef 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/subscriber/LongIdSubscriber.java +++ b/framework/src/main/java/org/toop/framework/eventbus/subscriber/LongIdSubscriber.java @@ -1,5 +1,8 @@ package org.toop.framework.eventbus.subscriber; +import org.toop.framework.eventbus.events.EventType; + import java.util.function.Consumer; -public record LongIdSubscriber(Long id, Class event, Consumer handler) implements IdSubscriber {} +public record LongIdSubscriber(Long id, Class event, Consumer handler) + implements IdSubscriber {} diff --git a/framework/src/main/java/org/toop/framework/eventbus/subscriber/NamedSubscriber.java b/framework/src/main/java/org/toop/framework/eventbus/subscriber/NamedSubscriber.java index 4b7ad8e..90542d2 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/subscriber/NamedSubscriber.java +++ b/framework/src/main/java/org/toop/framework/eventbus/subscriber/NamedSubscriber.java @@ -1,3 +1,5 @@ package org.toop.framework.eventbus.subscriber; -public interface NamedSubscriber extends Subscriber {} +import org.toop.framework.eventbus.events.EventType; + +public interface NamedSubscriber extends Subscriber, HasId {} diff --git a/framework/src/main/java/org/toop/framework/eventbus/subscriber/Subscriber.java b/framework/src/main/java/org/toop/framework/eventbus/subscriber/Subscriber.java index 30ad23e..2f62890 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/subscriber/Subscriber.java +++ b/framework/src/main/java/org/toop/framework/eventbus/subscriber/Subscriber.java @@ -1,9 +1,10 @@ package org.toop.framework.eventbus.subscriber; +import org.toop.framework.eventbus.events.EventType; + import java.util.function.Consumer; -public interface Subscriber { - ID id(); +public interface Subscriber { Class event(); Consumer handler(); }