Better limits to generic acceptance

This commit is contained in:
Bas de Jong
2025-12-09 21:07:30 +01:00
parent 912d25c01f
commit a9c99df5d2
17 changed files with 101 additions and 71 deletions

View File

@@ -3,7 +3,6 @@ package org.toop.app;
import javafx.application.Platform;
import javafx.scene.input.KeyCode;
import javafx.scene.input.KeyCodeCombination;
import javafx.scene.input.KeyCombination;
import javafx.scene.input.KeyEvent;
import org.toop.app.widget.Primitive;
@@ -112,20 +111,19 @@ public final class App extends Application {
Platform.runLater(() -> stage.setOpacity(1.0));
}
Platform.runLater(() -> loading.setMaxAmount(e.isLoadingAmount()));
Platform.runLater(() -> {
loading.setMaxAmount(e.isLoadingAmount());
try {
loading.setAmount(e.hasLoadedAmount());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
if (e.hasLoadedAmount() >= e.isLoadingAmount()-1) {
Platform.runLater(loading::triggerSuccess);
loadingFlow.unsubscribe("init_loading");
}
});
if (e.hasLoadedAmount() >= e.isLoadingAmount()) {
Platform.runLater(loading::triggerSuccess);
loadingFlow.unsubscribe("init_loading");
}
}, false, "init_loading");

View File

@@ -14,7 +14,8 @@ import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.events.ResponseToUniqueEvent;
import org.toop.framework.eventbus.events.UniqueEvent;
import org.toop.framework.eventbus.bus.EventBus;
import org.toop.framework.eventbus.subscriber.DefaultSubscriber;
import org.toop.framework.eventbus.subscriber.DefaultNamedSubscriber;
import org.toop.framework.eventbus.subscriber.NamedSubscriber;
import org.toop.framework.eventbus.subscriber.Subscriber;
/**
@@ -43,7 +44,7 @@ public class EventFlow {
private EventType event = null;
/** The listener returned by GlobalEventBus subscription. Used for unsubscription. */
private final List<Subscriber<?, ?>> listeners = new ArrayList<>();
private final List<NamedSubscriber<?>> listeners = new ArrayList<>();
/** Holds the results returned from the subscribed event, if any. */
private Map<String, ?> result = null;
@@ -161,7 +162,7 @@ public class EventFlow {
this.result = eventClass.result();
};
var subscriber = new DefaultSubscriber<>(
var subscriber = new DefaultNamedSubscriber<>(
name,
event,
newAction
@@ -248,7 +249,7 @@ public class EventFlow {
}
};
var listener = new DefaultSubscriber<>(
var listener = new DefaultNamedSubscriber<>(
name,
(Class<TT>) action.getClass().getDeclaredMethods()[0].getParameterTypes()[0],
newAction
@@ -295,7 +296,7 @@ public class EventFlow {
if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id));
};
var listener = new DefaultSubscriber<>(
var listener = new DefaultNamedSubscriber<>(
name,
event,
newAction
@@ -378,7 +379,7 @@ public class EventFlow {
}
};
var listener = new DefaultSubscriber<>(
var listener = new DefaultNamedSubscriber<>(
name,
eventClass,
newAction
@@ -496,7 +497,7 @@ public class EventFlow {
*
* @return Copy of the list of listeners.
*/
public Subscriber<?, ?>[] getListeners() {
public Subscriber<?>[] getListeners() {
return listeners.toArray(new Subscriber[0]);
}

View File

@@ -3,6 +3,7 @@ package org.toop.framework.eventbus;
import org.apache.logging.log4j.LogManager;
import org.toop.framework.eventbus.bus.DisruptorEventBus;
import org.toop.framework.eventbus.bus.EventBus;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.store.DefaultSubscriberStore;
import org.toop.framework.eventbus.subscriber.Subscriber;
@@ -19,17 +20,17 @@ public class GlobalEventBus implements EventBus {
}
@Override
public void subscribe(Subscriber<?, ?> listener) {
public void subscribe(Subscriber<? extends EventType> listener) {
INSTANCE.subscribe(listener);
}
@Override
public void unsubscribe(Subscriber<?, ?> listener) {
public void unsubscribe(Subscriber<? extends EventType> listener) {
INSTANCE.unsubscribe(listener);
}
@Override
public <T> void post(T event) {
public <T extends EventType> void post(T event) {
INSTANCE.post(event);
}

View File

@@ -17,22 +17,22 @@ public class DefaultEventBus implements EventBus {
}
@Override
public void subscribe(Subscriber<?, ?> subscriber) {
public void subscribe(Subscriber<? extends EventType> subscriber) {
eventsHolder.add(subscriber);
}
@Override
public void unsubscribe(Subscriber<?, ?> subscriber) {
public void unsubscribe(Subscriber<? extends EventType> subscriber) {
eventsHolder.remove(subscriber);
}
@Override
@SuppressWarnings("unchecked")
public <T> void post(T event) {
public <T extends EventType> void post(T event) {
Class<T> eventType = (Class<T>) event.getClass();
var subs = eventsHolder.get(eventType);
if (subs != null) {
for (Subscriber<?, ?> subscriber : subs) {
for (Subscriber<?> subscriber : subs) {
Class<T> eventClass = (Class<T>) subscriber.event();
Consumer<EventType> action = (Consumer<EventType>) subscriber.handler();

View File

@@ -21,8 +21,8 @@ public class DisruptorEventBus implements EventBus {
private final Logger logger;
private final SubscriberStore eventsHolder;
private final Disruptor<EventHolder<?>> disruptor;
private final RingBuffer<EventHolder<?>> ringBuffer;
private final Disruptor<EventHolder<? extends EventType>> disruptor;
private final RingBuffer<EventHolder<? extends EventType>> ringBuffer;
public DisruptorEventBus(Logger logger, SubscriberStore eventsHolder) {
this.logger = logger;
@@ -41,9 +41,9 @@ public class DisruptorEventBus implements EventBus {
this.ringBuffer = disruptor.getRingBuffer();
}
private Disruptor<EventHolder<?>> getEventHolderDisruptor(ThreadFactory threadFactory) {
private Disruptor<EventHolder<? extends EventType>> getEventHolderDisruptor(ThreadFactory threadFactory) {
int RING_BUFFER_SIZE = 1024 * 64;
Disruptor<EventHolder<?>> disruptor = new Disruptor<>(
Disruptor<EventHolder<? extends EventType>> disruptor = new Disruptor<>(
EventHolder::new,
RING_BUFFER_SIZE,
threadFactory,
@@ -61,17 +61,17 @@ public class DisruptorEventBus implements EventBus {
}
@Override
public void subscribe(Subscriber<?, ?> listener) {
public void subscribe(Subscriber<? extends EventType> listener) {
eventsHolder.add(listener);
}
@Override
public void unsubscribe(Subscriber<?, ?> listener) {
public void unsubscribe(Subscriber<? extends EventType> listener) {
eventsHolder.remove(listener);
}
@Override
public <T> void post(T event) {
public <T extends EventType> void post(T event) {
long seq = ringBuffer.next();
try {
@SuppressWarnings("unchecked")
@@ -93,10 +93,10 @@ public class DisruptorEventBus implements EventBus {
eventsHolder.reset();
}
private <T> void dispatchEvent(T event) {
private <T extends EventType> void dispatchEvent(T event) {
var classListeners = eventsHolder.get(event.getClass());
if (classListeners != null) {
for (Subscriber<?, ?> listener : classListeners) {
for (Subscriber<?> listener : classListeners) {
try {
callListener(listener, event);
} catch (Throwable e) {
@@ -108,7 +108,7 @@ public class DisruptorEventBus implements EventBus {
@SuppressWarnings("unchecked")
private <T> void callListener(Subscriber<?, ?> subscriber, T event) {
private <T> void callListener(Subscriber<?> subscriber, T event) {
Class<T> eventClass = (Class<T>) subscriber.event();
Consumer<EventType> action = (Consumer<EventType>) subscriber.handler();

View File

@@ -1,11 +1,12 @@
package org.toop.framework.eventbus.bus;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.subscriber.Subscriber;
public interface EventBus {
void subscribe(Subscriber<?, ?> subscriber);
void unsubscribe(Subscriber<?, ?> subscriber);
<T> void post(T event);
void subscribe(Subscriber<? extends EventType> subscriber);
void unsubscribe(Subscriber<? extends EventType> subscriber);
<T extends EventType> void post(T event);
void shutdown();
void reset();
}

View File

@@ -1,23 +1,24 @@
package org.toop.framework.eventbus.store;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class AsyncSubscriberStore implements SubscriberStore {
private final ConcurrentHashMap<Class<?>, ConcurrentLinkedQueue<Subscriber<?, ?>>> queues = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Class<?>, Subscriber<?, ?>[]> snapshots = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Class<? extends EventType>, ConcurrentLinkedQueue<Subscriber<? extends EventType>>> queues = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Class<? extends EventType>, Subscriber<? extends EventType>[]> snapshots = new ConcurrentHashMap<>();
@Override
public void add(Subscriber<?, ?> sub) {
public void add(Subscriber<? extends EventType> sub) {
queues.computeIfAbsent(sub.event(), _ -> new ConcurrentLinkedQueue<>()).add(sub);
rebuildSnapshot(sub.event());
}
@Override
public void remove(Subscriber<?, ?> sub) {
ConcurrentLinkedQueue<Subscriber<?, ?>> queue = queues.get(sub.event());
public void remove(Subscriber<? extends EventType> sub) {
ConcurrentLinkedQueue<Subscriber<?>> queue = queues.get(sub.event());
if (queue != null) {
queue.remove(sub);
rebuildSnapshot(sub.event());
@@ -25,8 +26,8 @@ public class AsyncSubscriberStore implements SubscriberStore {
}
@Override
public Subscriber<?, ?>[] get(Class<?> event) {
return snapshots.getOrDefault(event, new Subscriber[0]);
public Subscriber<? extends EventType>[] get(Class<? extends EventType> event) {
return snapshots.getOrDefault(event, new Subscriber<?>[0]);
}
@Override
@@ -35,12 +36,12 @@ public class AsyncSubscriberStore implements SubscriberStore {
snapshots.clear();
}
private void rebuildSnapshot(Class<?> event) {
ConcurrentLinkedQueue<Subscriber<?, ?>> queue = queues.get(event);
private void rebuildSnapshot(Class<? extends EventType> event) {
ConcurrentLinkedQueue<Subscriber<?>> queue = queues.get(event);
if (queue != null) {
snapshots.put(event, queue.toArray(new Subscriber[0]));
snapshots.put(event, queue.toArray(new Subscriber<?>[0]));
} else {
snapshots.put(event, new Subscriber[0]);
snapshots.put(event, new Subscriber<?>[0]);
}
}
}

View File

@@ -1,25 +1,27 @@
package org.toop.framework.eventbus.store;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.subscriber.NamedSubscriber;
import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.concurrent.ConcurrentHashMap;
public class DefaultSubscriberStore implements SubscriberStore {
private static final Subscriber<?, ?>[] EMPTY = new Subscriber[0];
private static final Subscriber<? extends EventType>[] EMPTY = new Subscriber<?>[0];
private final ConcurrentHashMap<Class<?>, Subscriber<?, ?>[]> listeners =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<Class<? extends EventType>, Subscriber<? extends EventType>[]>
listeners = new ConcurrentHashMap<>();
@Override
public void add(Subscriber<?, ?> sub) {
public void add(Subscriber<? extends EventType> sub) {
listeners.compute(sub.event(), (_, arr) -> {
if (arr == null || arr.length == 0) {
return new Subscriber<?, ?>[]{sub};
return new Subscriber<?>[]{sub};
}
int len = arr.length;
Subscriber<?, ?>[] newArr = new Subscriber[len + 1];
Subscriber<?>[] newArr = new Subscriber[len + 1];
System.arraycopy(arr, 0, newArr, 0, len);
newArr[len] = sub;
return newArr;
@@ -27,7 +29,7 @@ public class DefaultSubscriberStore implements SubscriberStore {
}
@Override
public void remove(Subscriber<?, ?> sub) {
public void remove(Subscriber<? extends EventType> sub) {
listeners.computeIfPresent(sub.event(), (_, arr) -> {
int len = arr.length;
@@ -36,7 +38,7 @@ public class DefaultSubscriberStore implements SubscriberStore {
}
int keep = 0;
for (Subscriber<?, ?> s : arr) {
for (Subscriber<?> s : arr) {
if (!s.equals(sub)) keep++;
}
@@ -47,9 +49,9 @@ public class DefaultSubscriberStore implements SubscriberStore {
return null;
}
Subscriber<?, ?>[] newArr = new Subscriber[keep];
Subscriber<?>[] newArr = new Subscriber[keep];
int i = 0;
for (Subscriber<?, ?> s : arr) {
for (Subscriber<?> s : arr) {
if (!s.equals(sub)) {
newArr[i++] = s;
}
@@ -60,7 +62,7 @@ public class DefaultSubscriberStore implements SubscriberStore {
}
@Override
public Subscriber<?, ?>[] get(Class<?> event) {
public Subscriber<? extends EventType>[] get(Class<? extends EventType> event) {
return listeners.getOrDefault(event, EMPTY);
}

View File

@@ -1,10 +1,11 @@
package org.toop.framework.eventbus.store;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.subscriber.Subscriber;
public interface SubscriberStore {
void add(Subscriber<?, ?> subscriber);
void remove(Subscriber<?, ?> subscriber);
Subscriber<?, ?>[] get(Class<?> event);
void add(Subscriber<? extends EventType> subscriber);
void remove(Subscriber<? extends EventType> subscriber);
Subscriber<? extends EventType>[] get(Class<? extends EventType> event);
void reset();
}

View File

@@ -1,5 +1,6 @@
package org.toop.framework.eventbus.store;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.ArrayList;
@@ -8,23 +9,23 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SyncSubscriberStore implements SubscriberStore {
private final Map<Class<?>, List<Subscriber<?, ?>>> LISTENERS = new ConcurrentHashMap<>();
private static final Subscriber<?, ?>[] EMPTY = new Subscriber[0];
private final Map<Class<? extends EventType>, List<Subscriber<? extends EventType>>> LISTENERS = new ConcurrentHashMap<>();
private static final Subscriber<? extends EventType>[] EMPTY = new Subscriber<?>[0];
@Override
public void add(Subscriber<?, ?> sub) {
public void add(Subscriber<? extends EventType> sub) {
LISTENERS.computeIfAbsent(sub.event(), _ -> new ArrayList<>()).add(sub);
}
@Override
public void remove(Subscriber<?, ?> sub) {
public void remove(Subscriber<? extends EventType> sub) {
LISTENERS.getOrDefault(sub.event(), new ArrayList<>()).remove(sub);
LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}
@Override
public Subscriber<?, ?>[] get(Class<?> event) {
List<Subscriber<?, ?>> list = LISTENERS.get(event);
public Subscriber<? extends EventType>[] get(Class<? extends EventType> event) {
List<Subscriber<? extends EventType>> list = LISTENERS.get(event);
if (list == null || list.isEmpty()) return EMPTY;
return list.toArray(EMPTY);
}

View File

@@ -0,0 +1,8 @@
package org.toop.framework.eventbus.subscriber;
import org.toop.framework.eventbus.events.EventType;
import java.util.function.Consumer;
public record DefaultNamedSubscriber<K extends EventType>(String id, Class<K> event, Consumer<K> handler)
implements NamedSubscriber<K> {}

View File

@@ -1,5 +1,8 @@
package org.toop.framework.eventbus.subscriber;
import org.toop.framework.eventbus.events.EventType;
import java.util.function.Consumer;
public record DefaultSubscriber<K>(String id, Class<K> event, Consumer<K> handler) implements NamedSubscriber<K> {}
public record DefaultSubscriber<K extends EventType>(Class<K> event, Consumer<K> handler) implements Subscriber<K> {}

View File

@@ -0,0 +1,5 @@
package org.toop.framework.eventbus.subscriber;
public interface HasId<ID> {
ID id();
}

View File

@@ -1,3 +1,5 @@
package org.toop.framework.eventbus.subscriber;
public interface IdSubscriber<T> extends Subscriber<Long, T> {}
import org.toop.framework.eventbus.events.EventType;
public interface IdSubscriber<K extends EventType> extends Subscriber<K>, HasId<Long> {}

View File

@@ -1,5 +1,8 @@
package org.toop.framework.eventbus.subscriber;
import org.toop.framework.eventbus.events.EventType;
import java.util.function.Consumer;
public record LongIdSubscriber<K>(Long id, Class<K> event, Consumer<K> handler) implements IdSubscriber<K> {}
public record LongIdSubscriber<K extends EventType>(Long id, Class<K> event, Consumer<K> handler)
implements IdSubscriber<K> {}

View File

@@ -1,3 +1,5 @@
package org.toop.framework.eventbus.subscriber;
public interface NamedSubscriber<T> extends Subscriber<String, T> {}
import org.toop.framework.eventbus.events.EventType;
public interface NamedSubscriber<K extends EventType> extends Subscriber<K>, HasId<String> {}

View File

@@ -1,9 +1,10 @@
package org.toop.framework.eventbus.subscriber;
import org.toop.framework.eventbus.events.EventType;
import java.util.function.Consumer;
public interface Subscriber<ID, K> {
ID id();
public interface Subscriber<K extends EventType> {
Class<K> event();
Consumer<K> handler();
}