Event bus now testable, improved UI (#284)

* turn updates

* smalle fixes aan turn updates

* better human/ai selector with bot selection and depth on TicTacToeAIR

* depth + thinktime back to AIs, along with a a specific TicTacToeAIRSleep

* fixed overlapping back and disconnect buttons

* Changed to debug instead of info

* changed the transitionNextCustom to be easier to use

* added getAllWidgets to WidgetContainer

* Correct back view

* added replacePrevious in ViewWidget

* added removeIndexFromPreviousChain

* fixed incorrect index counting

* Fixt wrong view order

* Removed todo

* Challenge popups "Fixed"

* Popups now remove themselves

* localize the ChallengePopup text

* made the game text a header instead

* fixed getAllWidgets

* Escape popup

* fixed redundant container

* Escape remove popup

* Working escape menu

* Added find functionality

* Tutorials moved to escape menu

* Escape can't be opened in mainview now

* Can now test the event bus, created testable interfaces

* Logging errors

* Made events and handlers more generic

* Suppress

* Managers now have changeable eventbus

* Tutorials fixed

* Removed import

* Single threaded eventbus

* Fixed wrong eventbus

* Removed get

* Removed old code

* Renaming

* Optimization

* Removed useless comment

* Removed unnecessary imports

* Rename

* Renaming, refactor and type safety

* Rename

* Removed import

---------

Co-authored-by: michiel301b <m.brands.3@st.hanze.nl>
Co-authored-by: ramollia <>
This commit is contained in:
Bas Antonius de Jong
2025-12-07 17:38:34 +01:00
committed by GitHub
parent f60df73b66
commit 38f50cc16d
68 changed files with 1100 additions and 538 deletions

View File

@@ -6,25 +6,29 @@ import org.toop.framework.audio.interfaces.SoundEffectManager;
import org.toop.framework.audio.interfaces.VolumeManager;
import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.eventbus.GlobalEventBus;
import org.toop.framework.eventbus.bus.EventBus;
import org.toop.framework.resource.types.AudioResource;
public class AudioEventListener<T extends AudioResource, K extends AudioResource> {
private final EventBus eventBus;
private final MusicManager<T> musicManager;
private final SoundEffectManager<K> soundEffectManager;
private final VolumeManager audioVolumeManager;
public AudioEventListener(
EventBus eventBus,
MusicManager<T> musicManager,
SoundEffectManager<K> soundEffectManager,
VolumeManager audioVolumeManager
) {
this.eventBus = eventBus;
this.musicManager = musicManager;
this.soundEffectManager = soundEffectManager;
this.audioVolumeManager = audioVolumeManager;
}
public AudioEventListener<?, ?> initListeners(String buttonSoundToPlay) {
new EventFlow()
new EventFlow(eventBus)
.listen(AudioEvents.StopAudioManager.class, this::handleStopMusicManager, false)
.listen(AudioEvents.PlayEffect.class, this::handlePlaySound, false)
.listen(AudioEvents.SkipMusic.class, this::handleSkipSong, false)
@@ -73,7 +77,7 @@ public class AudioEventListener<T extends AudioResource, K extends AudioResource
}
private void handleGetVolume(AudioEvents.GetVolume event) {
GlobalEventBus.postAsync(new AudioEvents.GetVolumeResponse(
eventBus.post(new AudioEvents.GetVolumeResponse(
audioVolumeManager.getVolume(event.controlType()),
event.identifier()));
}

View File

@@ -6,8 +6,7 @@ import org.toop.framework.audio.events.AudioEvents;
import org.toop.framework.dispatch.interfaces.Dispatcher;
import org.toop.framework.dispatch.JavaFXDispatcher;
import org.toop.annotations.TestsOnly;
import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.eventbus.GlobalEventBus;
import org.toop.framework.eventbus.bus.EventBus;
import org.toop.framework.resource.types.AudioResource;
import java.util.*;
@@ -18,6 +17,7 @@ import java.util.concurrent.TimeUnit;
public class MusicManager<T extends AudioResource> implements org.toop.framework.audio.interfaces.MusicManager<T> {
private static final Logger logger = LogManager.getLogger(MusicManager.class);
private final EventBus eventBus;
private final List<T> backgroundMusic = new ArrayList<>();
private final Dispatcher dispatcher;
private final List<T> resources;
@@ -27,7 +27,8 @@ public class MusicManager<T extends AudioResource> implements org.toop.framework
private ScheduledExecutorService scheduler;
public MusicManager(List<T> resources, boolean shuffleMusic) {
public MusicManager(EventBus eventbus, List<T> resources, boolean shuffleMusic) {
this.eventBus = eventbus;
this.dispatcher = new JavaFXDispatcher();
this.resources = resources;
// Shuffle if wanting to shuffle
@@ -40,7 +41,8 @@ public class MusicManager<T extends AudioResource> implements org.toop.framework
* {@code @TestsOnly} DO NOT USE
*/
@TestsOnly
public MusicManager(List<T> resources, Dispatcher dispatcher) {
public MusicManager(EventBus eventBus, List<T> resources, Dispatcher dispatcher) {
this.eventBus = eventBus;
this.dispatcher = dispatcher;
this.resources = new ArrayList<>(resources);
backgroundMusic.addAll(resources);
@@ -124,7 +126,7 @@ public class MusicManager<T extends AudioResource> implements org.toop.framework
Runnable currentMusicTask = new Runnable() {
@Override
public void run() {
GlobalEventBus.post(new AudioEvents.PlayingMusic(track.getName(), track.currentPosition(), track.duration()));
eventBus.post(new AudioEvents.PlayingMusic(track.getName(), track.currentPosition(), track.duration()));
scheduler.schedule(this, 1, TimeUnit.SECONDS);
}
};

View File

@@ -2,18 +2,10 @@ package org.toop.framework.audio;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.framework.resource.ResourceManager;
import org.toop.framework.resource.ResourceMeta;
import org.toop.framework.resource.resources.BaseResource;
import org.toop.framework.resource.resources.MusicAsset;
import org.toop.framework.resource.resources.SoundEffectAsset;
import org.toop.framework.resource.types.AudioResource;
import javax.sound.sampled.Clip;
import javax.sound.sampled.LineEvent;
import javax.sound.sampled.LineUnavailableException;
import javax.sound.sampled.UnsupportedAudioFileException;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;

View File

@@ -13,10 +13,13 @@ import org.toop.framework.SnowflakeGenerator;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.events.ResponseToUniqueEvent;
import org.toop.framework.eventbus.events.UniqueEvent;
import org.toop.framework.eventbus.bus.EventBus;
import org.toop.framework.eventbus.subscriber.DefaultSubscriber;
import org.toop.framework.eventbus.subscriber.Subscriber;
/**
* EventFlow is a utility class for creating, posting, and optionally subscribing to events in a
* type-safe and chainable manner. It is designed to work with the {@link GlobalEventBus}.
* type-safe and chainable manner. It is designed to work with the {@link EventBus}.
*
* <p>This class supports automatic UUID assignment for {@link UniqueEvent} events, and
* allows filtering subscribers so they only respond to events with a specific UUID. All
@@ -31,6 +34,8 @@ public class EventFlow {
/** Cache of constructor handles for event classes to avoid repeated reflection lookups. */
private static final Map<Class<?>, MethodHandle> CONSTRUCTOR_CACHE = new ConcurrentHashMap<>();
private final EventBus eventBus;
/** Automatically assigned UUID for {@link UniqueEvent} events. */
private long eventSnowflake = -1;
@@ -38,13 +43,19 @@ public class EventFlow {
private EventType event = null;
/** The listener returned by GlobalEventBus subscription. Used for unsubscription. */
private final List<ListenerHandler<?>> listeners = new ArrayList<>();
private final List<Subscriber<?, ?>> listeners = new ArrayList<>();
/** Holds the results returned from the subscribed event, if any. */
private Map<String, ?> result = null;
/** Empty constructor (event must be added via {@link #addPostEvent(Class, Object...)}). */
public EventFlow() {}
public EventFlow(EventBus eventBus) {
this.eventBus = eventBus;
}
public EventFlow() {
this.eventBus = GlobalEventBus.get();
}
/**
*
@@ -145,21 +156,19 @@ public class EventFlow {
action.accept(eventClass);
if (unsubscribeAfterSuccess) unsubscribe(id);
if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id));
this.result = eventClass.result();
};
// TODO Remove casts
var listener = new ListenerHandler<>(
id,
var subscriber = new DefaultSubscriber<>(
name,
(Class<ResponseToUniqueEvent>) event,
(Consumer<ResponseToUniqueEvent>) newAction
event,
newAction
);
GlobalEventBus.subscribe(listener);
this.listeners.add(listener);
eventBus.subscribe(subscriber);
this.listeners.add(subscriber);
return this;
}
@@ -227,7 +236,7 @@ public class EventFlow {
TT typedEvent = (TT) uuidEvent;
action.accept(typedEvent);
if (unsubscribeAfterSuccess) unsubscribe(id);
if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id));
this.result = typedEvent.result();
} catch (ClassCastException _) {
@@ -239,14 +248,13 @@ public class EventFlow {
}
};
var listener = new ListenerHandler<>(
id,
var listener = new DefaultSubscriber<>(
name,
(Class<TT>) action.getClass().getDeclaredMethods()[0].getParameterTypes()[0],
newAction
);
GlobalEventBus.subscribe(listener);
eventBus.subscribe(listener);
this.listeners.add(listener);
return this;
}
@@ -284,17 +292,16 @@ public class EventFlow {
Consumer<TT> newAction = eventc -> {
action.accept(eventc);
if (unsubscribeAfterSuccess) unsubscribe(id);
if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id));
};
var listener = new ListenerHandler<>(
id,
var listener = new DefaultSubscriber<>(
name,
event,
newAction
);
GlobalEventBus.subscribe(listener);
eventBus.subscribe(listener);
this.listeners.add(listener);
return this;
}
@@ -362,7 +369,7 @@ public class EventFlow {
try {
TT typedEvent = (TT) nonUuidEvent;
action.accept(typedEvent);
if (unsubscribeAfterSuccess) unsubscribe(id);
if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id));
} catch (ClassCastException _) {
throw new ClassCastException(
"Cannot cast "
@@ -371,14 +378,13 @@ public class EventFlow {
}
};
var listener = new ListenerHandler<>(
id,
var listener = new DefaultSubscriber<>(
name,
eventClass,
newAction
);
GlobalEventBus.subscribe(listener);
eventBus.subscribe(listener);
this.listeners.add(listener);
return this;
}
@@ -401,15 +407,18 @@ public class EventFlow {
* Posts the event added through {@link #addPostEvent}.
*/
public EventFlow postEvent() {
GlobalEventBus.post(this.event);
eventBus.post(this.event);
return this;
}
/**
* Posts the event added through {@link #addPostEvent} asynchronously.
*
* @deprecated use {@link #postEvent()} instead.
*/
@Deprecated
public EventFlow asyncPostEvent() {
GlobalEventBus.postAsync(this.event);
eventBus.post(this.event);
return this;
}
@@ -417,28 +426,12 @@ public class EventFlow {
*
* Unsubscribe from an event.
*
* @param listenerObject The listener object to remove and unsubscribe.
* @param action The listener object to remove and unsubscribe.
*/
public void unsubscribe(Object listenerObject) {
public void unsubscribe(Consumer<?> action) {
this.listeners.removeIf(handler -> {
if (handler.getListener() == listenerObject) {
GlobalEventBus.unsubscribe(handler);
return true;
}
return false;
});
}
/**
*
* Unsubscribe from an event.
*
* @param listenerId The id given to the {@link ListenerHandler}.
*/
public void unsubscribe(long listenerId) {
this.listeners.removeIf(handler -> {
if (handler.getId() == listenerId) {
GlobalEventBus.unsubscribe(handler);
if (handler.handler().equals(action)) {
eventBus.unsubscribe(handler);
return true;
}
return false;
@@ -452,8 +445,8 @@ public class EventFlow {
*/
public void unsubscribe(String name) {
this.listeners.removeIf(handler -> {
if (handler.getName().equals(name)) {
GlobalEventBus.unsubscribe(handler);
if (handler.id().equals(name)) {
eventBus.unsubscribe(handler);
return true;
}
return false;
@@ -465,7 +458,7 @@ public class EventFlow {
*/
public void unsubscribeAll() {
listeners.removeIf(handler -> {
GlobalEventBus.unsubscribe(handler);
eventBus.unsubscribe(handler);
return true;
});
}
@@ -503,8 +496,8 @@ public class EventFlow {
*
* @return Copy of the list of listeners.
*/
public ListenerHandler[] getListeners() {
return listeners.toArray(new ListenerHandler[0]);
public Subscriber<?, ?>[] getListeners() {
return listeners.toArray(new Subscriber[0]);
}
/**

View File

@@ -1,193 +1,45 @@
package org.toop.framework.eventbus;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.events.UniqueEvent;
import org.toop.framework.eventbus.bus.DisruptorEventBus;
import org.toop.framework.eventbus.bus.EventBus;
import org.toop.framework.eventbus.store.DefaultSubscriberStore;
import org.toop.framework.eventbus.subscriber.Subscriber;
/**
* GlobalEventBus backed by the LMAX Disruptor for ultra-low latency, high-throughput event
* publishing.
*/
public final class GlobalEventBus {
private static final Logger logger = LogManager.getLogger(GlobalEventBus.class);
public class GlobalEventBus implements EventBus {
private static final EventBus INSTANCE = new DisruptorEventBus(
LogManager.getLogger(DisruptorEventBus.class),
new DefaultSubscriberStore()
);
/** Map of event class to type-specific listeners. */
private static final Map<Class<?>, CopyOnWriteArrayList<ListenerHandler<?>>>
LISTENERS = new ConcurrentHashMap<>();
/** Map of event class to Snowflake-ID-specific listeners. */
private static final Map<
Class<?>, ConcurrentHashMap<Long, Consumer<? extends UniqueEvent>>>
UUID_LISTENERS = new ConcurrentHashMap<>();
/** Disruptor ring buffer size (must be power of two). */
private static final int RING_BUFFER_SIZE = 1024 * 64;
/** Disruptor instance. */
private static final Disruptor<EventHolder> DISRUPTOR;
/** Ring buffer used for publishing events. */
private static final RingBuffer<EventHolder> RING_BUFFER;
static {
ThreadFactory threadFactory =
r -> {
Thread t = new Thread(r, "EventBus-Disruptor");
t.setDaemon(true);
return t;
};
DISRUPTOR =
new Disruptor<>(
EventHolder::new,
RING_BUFFER_SIZE,
threadFactory,
ProducerType.MULTI,
new BusySpinWaitStrategy());
DISRUPTOR.handleEventsWith(
(holder, seq, endOfBatch) -> {
if (holder.event != null) {
dispatchEvent(holder.event);
holder.event = null;
}
});
DISRUPTOR.start();
RING_BUFFER = DISRUPTOR.getRingBuffer();
}
/** Prevent instantiation. */
private GlobalEventBus() {}
/** Wrapper used inside the ring buffer. */
private static class EventHolder {
EventType event;
public static EventBus get() {
return INSTANCE;
}
// ------------------------------------------------------------------------
// Subscription
// ------------------------------------------------------------------------
public static <T extends EventType> void subscribe(ListenerHandler<T> listener) {
logger.debug("Subscribing to {}: {}", listener.getListenerClass().getSimpleName(), listener.getListener().getClass().getSimpleName());
LISTENERS.computeIfAbsent(listener.getListenerClass(), _ -> new CopyOnWriteArrayList<>()).add(listener);
@Override
public void subscribe(Subscriber<?, ?> listener) {
INSTANCE.subscribe(listener);
}
// TODO
public static <T extends UniqueEvent> void subscribeById(
Class<T> eventClass, long eventId, Consumer<T> listener) {
UUID_LISTENERS
.computeIfAbsent(eventClass, _ -> new ConcurrentHashMap<>())
.put(eventId, listener);
@Override
public void unsubscribe(Subscriber<?, ?> listener) {
INSTANCE.unsubscribe(listener);
}
public static void unsubscribe(ListenerHandler<?> listener) {
logger.debug("Unsubscribing from {}: {}", listener.getListenerClass().getSimpleName(), listener.getListener().getClass().getSimpleName());
LISTENERS.getOrDefault(listener.getListenerClass(), new CopyOnWriteArrayList<>())
.remove(listener);
LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty());
@Override
public <T> void post(T event) {
INSTANCE.post(event);
}
// TODO
public static <T extends UniqueEvent> void unsubscribeById(
Class<T> eventClass, long eventId) {
Map<Long, Consumer<? extends UniqueEvent>> map = UUID_LISTENERS.get(eventClass);
if (map != null) map.remove(eventId);
@Override
public void shutdown() {
INSTANCE.shutdown();
}
// ------------------------------------------------------------------------
// Posting
// ------------------------------------------------------------------------
public static <T extends EventType> void post(T event) {
dispatchEvent(event); // synchronous
}
public static <T extends EventType> void postAsync(T event) {
long seq = RING_BUFFER.next();
try {
EventHolder holder = RING_BUFFER.get(seq);
holder.event = event;
} finally {
RING_BUFFER.publish(seq);
}
}
@SuppressWarnings("unchecked")
private static <T extends EventType> void callListener(ListenerHandler<?> raw, EventType event) {
ListenerHandler<T> handler = (ListenerHandler<T>) raw;
Consumer<T> listener = handler.getListener();
T casted = (T) event;
listener.accept(casted);
}
@SuppressWarnings("unchecked")
private static void dispatchEvent(EventType event) {
Class<?> clazz = event.getClass();
logger.debug("Triggered event: {}", event.getClass().getSimpleName());
CopyOnWriteArrayList<ListenerHandler<?>> classListeners = LISTENERS.get(clazz);
if (classListeners != null) {
for (ListenerHandler<?> listener : classListeners) {
try {
callListener(listener, event);
} catch (Throwable e) {
e.printStackTrace();
}
}
}
CopyOnWriteArrayList<ListenerHandler<?>> genericListeners = LISTENERS.get(Object.class);
if (genericListeners != null) {
for (ListenerHandler<?> listener : genericListeners) {
try {
callListener(listener, event);
} catch (Throwable e) {
e.printStackTrace();
}
}
}
if (event instanceof UniqueEvent snowflakeEvent) {
Map<Long, Consumer<? extends UniqueEvent>> map = UUID_LISTENERS.get(clazz);
if (map != null) {
Consumer<UniqueEvent> listener =
(Consumer<UniqueEvent>) map.remove(snowflakeEvent.getIdentifier());
if (listener != null) {
try {
listener.accept(snowflakeEvent);
} catch (Throwable ignored) {
}
}
}
}
}
// ------------------------------------------------------------------------
// Lifecycle
// ------------------------------------------------------------------------
public static void shutdown() {
DISRUPTOR.shutdown();
LISTENERS.clear();
UUID_LISTENERS.clear();
}
public static void reset() {
LISTENERS.clear();
UUID_LISTENERS.clear();
}
public static Map<Class<?>, CopyOnWriteArrayList<ListenerHandler<?>>> getAllListeners() {
return LISTENERS;
@Override
public void reset() {
INSTANCE.reset();
}
}

View File

@@ -1,48 +0,0 @@
package org.toop.framework.eventbus;
import org.toop.framework.SnowflakeGenerator;
import org.toop.framework.eventbus.events.EventType;
import java.util.function.Consumer;
public class ListenerHandler<T extends EventType> {
private final long id;
private final String name;
private final Class<T> clazz;
private final Consumer<T> listener;
public ListenerHandler(long id, String name, Class<T> clazz, Consumer<T> listener) {
this.id = id;
this.name = name;
this.clazz = clazz;
this.listener = listener;
}
public ListenerHandler(String name, Class<T> clazz, Consumer<T> listener) {
this(SnowflakeGenerator.nextId(), name, clazz, listener);
}
public ListenerHandler(long id, Class<T> clazz, Consumer<T> listener) {
this(id, String.valueOf(id), clazz, listener);
}
public ListenerHandler(Class<T> clazz, Consumer<T> listener) {
this(SnowflakeGenerator.nextId(), clazz, listener);
}
public long getId() {
return id;
}
public Consumer<T> getListener() {
return listener;
}
public Class<T> getListenerClass() {
return clazz;
}
public String getName() {
return name;
}
}

View File

@@ -0,0 +1,53 @@
package org.toop.framework.eventbus.bus;
import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.store.SubscriberStore;
import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.function.Consumer;
public class DefaultEventBus implements EventBus {
private final Logger logger;
private final SubscriberStore eventsHolder;
public DefaultEventBus(Logger logger, SubscriberStore eventsHolder) {
this.logger = logger;
this.eventsHolder = eventsHolder;
}
@Override
public void subscribe(Subscriber<?, ?> subscriber) {
eventsHolder.add(subscriber);
}
@Override
public void unsubscribe(Subscriber<?, ?> subscriber) {
eventsHolder.remove(subscriber);
}
@Override
@SuppressWarnings("unchecked")
public <T> void post(T event) {
Class<T> eventType = (Class<T>) event.getClass();
var subs = eventsHolder.get(eventType);
if (subs != null) {
for (Subscriber<?, ?> subscriber : subs) {
Class<T> eventClass = (Class<T>) subscriber.event();
Consumer<EventType> action = (Consumer<EventType>) subscriber.handler();
action.accept((EventType) eventClass.cast(event));
}
}
}
@Override
public void shutdown() {
eventsHolder.reset();
}
@Override
public void reset() {
eventsHolder.reset();
}
}

View File

@@ -0,0 +1,117 @@
package org.toop.framework.eventbus.bus;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.subscriber.Subscriber;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.store.SubscriberStore;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
public class DisruptorEventBus implements EventBus {
/** Wrapper used inside the ring buffer. */
private static class EventHolder<T> {
T event;
}
private final Logger logger;
private final SubscriberStore eventsHolder;
private final Disruptor<EventHolder<?>> disruptor;
private final RingBuffer<EventHolder<?>> ringBuffer;
public DisruptorEventBus(Logger logger, SubscriberStore eventsHolder) {
this.logger = logger;
this.eventsHolder = eventsHolder;
ThreadFactory threadFactory =
r -> {
Thread t = new Thread(r, "EventBus-Disruptor");
t.setDaemon(true);
return t;
};
disruptor = getEventHolderDisruptor(threadFactory);
disruptor.start();
this.ringBuffer = disruptor.getRingBuffer();
}
private Disruptor<EventHolder<?>> getEventHolderDisruptor(ThreadFactory threadFactory) {
int RING_BUFFER_SIZE = 1024 * 64;
Disruptor<EventHolder<?>> disruptor = new Disruptor<>(
EventHolder::new,
RING_BUFFER_SIZE,
threadFactory,
ProducerType.MULTI,
new BusySpinWaitStrategy());
disruptor.handleEventsWith(
(holder, _, _) -> {
if (holder.event != null) {
dispatchEvent(holder.event);
holder.event = null;
}
});
return disruptor;
}
@Override
public void subscribe(Subscriber<?, ?> listener) {
eventsHolder.add(listener);
}
@Override
public void unsubscribe(Subscriber<?, ?> listener) {
eventsHolder.remove(listener);
}
@Override
public <T> void post(T event) {
long seq = ringBuffer.next();
try {
@SuppressWarnings("unchecked")
EventHolder<T> holder = (EventHolder<T>) ringBuffer.get(seq);
holder.event = event;
} finally {
ringBuffer.publish(seq);
}
}
@Override
public void shutdown() {
disruptor.shutdown();
eventsHolder.reset();
}
@Override
public void reset() {
eventsHolder.reset();
}
private <T> void dispatchEvent(T event) {
var classListeners = eventsHolder.get(event.getClass());
if (classListeners != null) {
for (Subscriber<?, ?> listener : classListeners) {
try {
callListener(listener, event);
} catch (Throwable e) {
logger.warn("Exception while handling event: {}", event, e);
}
}
}
}
@SuppressWarnings("unchecked")
private <T> void callListener(Subscriber<?, ?> subscriber, T event) {
Class<T> eventClass = (Class<T>) subscriber.event();
Consumer<EventType> action = (Consumer<EventType>) subscriber.handler();
action.accept((EventType) eventClass.cast(event));
}
}

View File

@@ -0,0 +1,11 @@
package org.toop.framework.eventbus.bus;
import org.toop.framework.eventbus.subscriber.Subscriber;
public interface EventBus {
void subscribe(Subscriber<?, ?> subscriber);
void unsubscribe(Subscriber<?, ?> subscriber);
<T> void post(T event);
void shutdown();
void reset();
}

View File

@@ -0,0 +1,46 @@
package org.toop.framework.eventbus.store;
import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class AsyncSubscriberStore implements SubscriberStore {
private final ConcurrentHashMap<Class<?>, ConcurrentLinkedQueue<Subscriber<?, ?>>> queues = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Class<?>, Subscriber<?, ?>[]> snapshots = new ConcurrentHashMap<>();
@Override
public void add(Subscriber<?, ?> sub) {
queues.computeIfAbsent(sub.event(), _ -> new ConcurrentLinkedQueue<>()).add(sub);
rebuildSnapshot(sub.event());
}
@Override
public void remove(Subscriber<?, ?> sub) {
ConcurrentLinkedQueue<Subscriber<?, ?>> queue = queues.get(sub.event());
if (queue != null) {
queue.remove(sub);
rebuildSnapshot(sub.event());
}
}
@Override
public Subscriber<?, ?>[] get(Class<?> event) {
return snapshots.getOrDefault(event, new Subscriber[0]);
}
@Override
public void reset() {
queues.clear();
snapshots.clear();
}
private void rebuildSnapshot(Class<?> event) {
ConcurrentLinkedQueue<Subscriber<?, ?>> queue = queues.get(event);
if (queue != null) {
snapshots.put(event, queue.toArray(new Subscriber[0]));
} else {
snapshots.put(event, new Subscriber[0]);
}
}
}

View File

@@ -0,0 +1,71 @@
package org.toop.framework.eventbus.store;
import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.concurrent.ConcurrentHashMap;
public class DefaultSubscriberStore implements SubscriberStore {
private static final Subscriber<?, ?>[] EMPTY = new Subscriber[0];
private final ConcurrentHashMap<Class<?>, Subscriber<?, ?>[]> listeners =
new ConcurrentHashMap<>();
@Override
public void add(Subscriber<?, ?> sub) {
listeners.compute(sub.event(), (_, arr) -> {
if (arr == null || arr.length == 0) {
return new Subscriber<?, ?>[]{sub};
}
int len = arr.length;
Subscriber<?, ?>[] newArr = new Subscriber[len + 1];
System.arraycopy(arr, 0, newArr, 0, len);
newArr[len] = sub;
return newArr;
});
}
@Override
public void remove(Subscriber<?, ?> sub) {
listeners.computeIfPresent(sub.event(), (_, arr) -> {
int len = arr.length;
if (len == 1) {
return arr[0].equals(sub) ? null : arr;
}
int keep = 0;
for (Subscriber<?, ?> s : arr) {
if (!s.equals(sub)) keep++;
}
if (keep == len) {
return arr;
}
if (keep == 0) {
return null;
}
Subscriber<?, ?>[] newArr = new Subscriber[keep];
int i = 0;
for (Subscriber<?, ?> s : arr) {
if (!s.equals(sub)) {
newArr[i++] = s;
}
}
return newArr;
});
}
@Override
public Subscriber<?, ?>[] get(Class<?> event) {
return listeners.getOrDefault(event, EMPTY);
}
@Override
public void reset() {
listeners.clear();
}
}

View File

@@ -0,0 +1,10 @@
package org.toop.framework.eventbus.store;
import org.toop.framework.eventbus.subscriber.Subscriber;
public interface SubscriberStore {
void add(Subscriber<?, ?> subscriber);
void remove(Subscriber<?, ?> subscriber);
Subscriber<?, ?>[] get(Class<?> event);
void reset();
}

View File

@@ -0,0 +1,36 @@
package org.toop.framework.eventbus.store;
import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SyncSubscriberStore implements SubscriberStore {
private final Map<Class<?>, List<Subscriber<?, ?>>> LISTENERS = new ConcurrentHashMap<>();
private static final Subscriber<?, ?>[] EMPTY = new Subscriber[0];
@Override
public void add(Subscriber<?, ?> sub) {
LISTENERS.computeIfAbsent(sub.event(), _ -> new ArrayList<>()).add(sub);
}
@Override
public void remove(Subscriber<?, ?> sub) {
LISTENERS.getOrDefault(sub.event(), new ArrayList<>()).remove(sub);
LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}
@Override
public Subscriber<?, ?>[] get(Class<?> event) {
List<Subscriber<?, ?>> list = LISTENERS.get(event);
if (list == null || list.isEmpty()) return EMPTY;
return list.toArray(EMPTY);
}
@Override
public void reset() {
LISTENERS.clear();
}
}

View File

@@ -0,0 +1,5 @@
package org.toop.framework.eventbus.subscriber;
import java.util.function.Consumer;
public record DefaultSubscriber<K>(String id, Class<K> event, Consumer<K> handler) implements NamedSubscriber<K> {}

View File

@@ -0,0 +1,3 @@
package org.toop.framework.eventbus.subscriber;
public interface IdSubscriber<T> extends Subscriber<Long, T> {}

View File

@@ -0,0 +1,5 @@
package org.toop.framework.eventbus.subscriber;
import java.util.function.Consumer;
public record LongIdSubscriber<K>(Long id, Class<K> event, Consumer<K> handler) implements IdSubscriber<K> {}

View File

@@ -0,0 +1,3 @@
package org.toop.framework.eventbus.subscriber;
public interface NamedSubscriber<T> extends Subscriber<String, T> {}

View File

@@ -0,0 +1,9 @@
package org.toop.framework.eventbus.subscriber;
import java.util.function.Consumer;
public interface Subscriber<ID, K> {
ID id();
Class<K> event();
Consumer<K> handler();
}

View File

@@ -4,19 +4,20 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.framework.SnowflakeGenerator;
import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.eventbus.bus.EventBus;
import org.toop.framework.networking.events.NetworkEvents;
import org.toop.framework.networking.exceptions.ClientNotFoundException;
import org.toop.framework.networking.interfaces.NetworkingClientManager;
public class NetworkingClientEventListener {
private static final Logger logger = LogManager.getLogger(NetworkingClientEventListener.class);
private final NetworkingClientManager clientManager;
/** Starts a connection manager, to manage, connections. */
public NetworkingClientEventListener(NetworkingClientManager clientManager) {
public NetworkingClientEventListener(EventBus eventBus, NetworkingClientManager clientManager) {
this.clientManager = clientManager;
new EventFlow()
new EventFlow(eventBus)
.listen(NetworkEvents.StartClient.class, this::handleStartClient, false)
.listen(NetworkEvents.SendCommand.class, this::handleCommand, false)
.listen(NetworkEvents.SendLogin.class, this::handleSendLogin, false)
@@ -40,6 +41,7 @@ public class NetworkingClientEventListener {
void handleStartClient(NetworkEvents.StartClient event) {
long clientId = SnowflakeGenerator.nextId();
new EventFlow().addPostEvent(new NetworkEvents.CreatedIdForClient(clientId, event.identifier())).postEvent();
clientManager.startClient(
clientId,
event.networkingClient(),

View File

@@ -8,7 +8,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.GlobalEventBus;
import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.eventbus.bus.EventBus;
import org.toop.framework.networking.events.NetworkEvents;
import org.toop.framework.networking.exceptions.ClientNotFoundException;
import org.toop.framework.networking.exceptions.CouldNotConnectException;
@@ -17,9 +18,13 @@ import org.toop.framework.networking.types.NetworkingConnector;
public class NetworkingClientManager implements org.toop.framework.networking.interfaces.NetworkingClientManager {
private static final Logger logger = LogManager.getLogger(NetworkingClientManager.class);
private final EventBus eventBus;
private final Map<Long, NetworkingClient> networkClients = new ConcurrentHashMap<>();
public NetworkingClientManager() {}
public NetworkingClientManager(EventBus eventBus) {
this.eventBus = eventBus;
}
private void connectHelper(
long id,
@@ -28,8 +33,16 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
Runnable onSuccess,
Runnable onFailure
) {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
EventFlow closeEvent = new EventFlow()
.listen(
NetworkEvents.CloseClient.class,
e -> {
if (e.clientId() == id) scheduler.shutdownNow();
}, "close");
Runnable connectTask = new Runnable() {
int attempts = 0;
@@ -46,7 +59,7 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
nClient.connect(id, nConnector.host(), nConnector.port());
networkClients.put(id, nClient);
logger.info("New client started successfully for {}:{}", nConnector.host(), nConnector.port());
GlobalEventBus.post(new NetworkEvents.ConnectTry(id, attempts, nConnector.reconnectAttempts(), true));
eventBus.post(new NetworkEvents.ConnectTry(id, attempts, nConnector.reconnectAttempts(), true));
onSuccess.run();
scheduler.shutdown();
} catch (CouldNotConnectException e) {
@@ -54,17 +67,17 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
if (attempts < nConnector.reconnectAttempts()) {
logger.warn("Could not connect to {}:{}. Retrying in {} {}",
nConnector.host(), nConnector.port(), nConnector.timeout(), nConnector.timeUnit());
GlobalEventBus.post(new NetworkEvents.ConnectTry(id, attempts, nConnector.reconnectAttempts(), false));
eventBus.post(new NetworkEvents.ConnectTry(id, attempts, nConnector.reconnectAttempts(), false));
scheduler.schedule(this, nConnector.timeout(), nConnector.timeUnit());
} else {
logger.error("Failed to start client for {}:{} after {} attempts", nConnector.host(), nConnector.port(), attempts);
GlobalEventBus.post(new NetworkEvents.ConnectTry(id, -1, nConnector.reconnectAttempts(), false));
eventBus.post(new NetworkEvents.ConnectTry(id, -1, nConnector.reconnectAttempts(), false));
onFailure.run();
scheduler.shutdown();
}
} catch (Exception e) {
logger.error("Unexpected exception during startClient", e);
GlobalEventBus.post(new NetworkEvents.ConnectTry(id, -1, nConnector.reconnectAttempts(), false));
eventBus.post(new NetworkEvents.ConnectTry(id, -1, nConnector.reconnectAttempts(), false));
onFailure.run();
scheduler.shutdown();
}
@@ -72,6 +85,8 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
};
scheduler.schedule(connectTask, 0, TimeUnit.MILLISECONDS);
//
// closeEvent.unsubscribe("close");
}
@Override

View File

@@ -11,6 +11,7 @@ import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.bus.EventBus;
import org.toop.framework.networking.exceptions.CouldNotConnectException;
import org.toop.framework.networking.handlers.NetworkingGameClientHandler;
import org.toop.framework.networking.interfaces.NetworkingClient;
@@ -19,9 +20,13 @@ import java.net.InetSocketAddress;
public class TournamentNetworkingClient implements NetworkingClient {
private static final Logger logger = LogManager.getLogger(TournamentNetworkingClient.class);
private final EventBus eventBus;
private Channel channel;
public TournamentNetworkingClient() {}
public TournamentNetworkingClient(EventBus eventBus) {
this.eventBus = eventBus;
}
@Override
public InetSocketAddress getAddress() {
@@ -40,7 +45,7 @@ public class TournamentNetworkingClient implements NetworkingClient {
new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
NetworkingGameClientHandler handler = new NetworkingGameClientHandler(clientId);
NetworkingGameClientHandler handler = new NetworkingGameClientHandler(eventBus, clientId);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(1024)); // split at \n

View File

@@ -4,6 +4,7 @@ import java.util.*;
import java.util.concurrent.CompletableFuture;
import org.toop.annotations.AutoResponseResult;
import org.toop.framework.eventbus.GlobalEventBus;
import org.toop.framework.eventbus.events.*;
import org.toop.framework.networking.interfaces.NetworkingClient;
import org.toop.framework.networking.types.NetworkingConnector;
@@ -11,7 +12,7 @@ import org.toop.framework.networking.types.NetworkingConnector;
/**
* Defines all event types related to the networking subsystem.
* <p>
* These events are used in conjunction with the {@link org.toop.framework.eventbus.GlobalEventBus}
* These events are used in conjunction with the {@link GlobalEventBus}
* and {@link org.toop.framework.eventbus.EventFlow} to communicate between components
* such as networking clients, managers, and listeners.
* </p>
@@ -166,6 +167,10 @@ public class NetworkEvents extends EventsBase {
long identifier)
implements UniqueEvent {}
public record CreatedIdForClient(long clientId, long identifier) implements ResponseToUniqueEvent {}
public record ConnectTry(long clientId, int amount, int maxAmount, boolean success) implements GenericEvent {}
/**
* Response confirming that a client has been successfully started.
* <p>
@@ -181,8 +186,6 @@ public class NetworkEvents extends EventsBase {
public record StartClientResponse(long clientId, boolean successful, long identifier)
implements ResponseToUniqueEvent {}
public record ConnectTry(long clientId, int amount, int maxAmount, boolean success) implements GenericEvent {}
/**
* Requests reconnection of an existing client using its previous configuration.
* <p>

View File

@@ -8,15 +8,17 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.eventbus.bus.EventBus;
import org.toop.framework.networking.events.NetworkEvents;
public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(NetworkingGameClientHandler.class);
private final EventBus eventBus;
private final long connectionId;
public NetworkingGameClientHandler(long connectionId) {
public NetworkingGameClientHandler(EventBus eventBus, long connectionId) {
this.eventBus = eventBus;
this.connectionId = connectionId;
}
@@ -40,9 +42,7 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
"Received SVR message from server-{}, data: {}",
ctx.channel().remoteAddress(),
msg);
new EventFlow()
.addPostEvent(new NetworkEvents.ServerResponse(this.connectionId))
.asyncPostEvent();
eventBus.post(new NetworkEvents.ServerResponse(this.connectionId));
parseServerReturn(rec);
return;
}
@@ -113,11 +113,7 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
.map(m -> m.group(1).trim())
.toArray(String[]::new);
new EventFlow()
.addPostEvent(
new NetworkEvents.GameMoveResponse(
this.connectionId, msg[0], msg[1], msg[2]))
.asyncPostEvent();
eventBus.post(new NetworkEvents.GameMoveResponse(this.connectionId, msg[0], msg[1], msg[2]));
}
private void gameWinConditionHandler(String rec) {
@@ -128,9 +124,7 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
.findFirst()
.orElse("");
new EventFlow()
.addPostEvent(new NetworkEvents.GameResultResponse(this.connectionId, condition))
.asyncPostEvent();
eventBus.post(new NetworkEvents.GameResultResponse(this.connectionId, condition));
}
private void gameChallengeHandler(String rec) {
@@ -145,17 +139,9 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
.toArray(String[]::new);
if (isCancelled)
new EventFlow()
.addPostEvent(
new NetworkEvents.ChallengeCancelledResponse(
this.connectionId, msg[0]))
.asyncPostEvent();
eventBus.post(new NetworkEvents.GameResultResponse(this.connectionId, msg[0]));
else
new EventFlow()
.addPostEvent(
new NetworkEvents.ChallengeResponse(
this.connectionId, msg[0], msg[1], msg[2]))
.asyncPostEvent();
eventBus.post(new NetworkEvents.ChallengeResponse(this.connectionId, msg[0], msg[1], msg[2]));
} catch (ArrayIndexOutOfBoundsException e) {
logger.error("Array out of bounds for: {}", rec, e);
}
@@ -171,11 +157,7 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
.toArray(String[]::new);
// [0] playerToMove, [1] gameType, [2] opponent
new EventFlow()
.addPostEvent(
new NetworkEvents.GameMatchResponse(
this.connectionId, msg[0], msg[1], msg[2]))
.asyncPostEvent();
eventBus.post(new NetworkEvents.GameMatchResponse(this.connectionId, msg[0], msg[1], msg[2]));
} catch (ArrayIndexOutOfBoundsException e) {
logger.error("Array out of bounds for: {}", rec, e);
}
@@ -190,9 +172,7 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
.toString()
.trim();
new EventFlow()
.addPostEvent(new NetworkEvents.YourTurnResponse(this.connectionId, msg))
.asyncPostEvent();
eventBus.post(new NetworkEvents.YourTurnResponse(this.connectionId, msg));
}
private void playerlistHandler(String rec) {
@@ -203,9 +183,7 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
.map(m -> m.group(1).trim())
.toArray(String[]::new);
new EventFlow()
.addPostEvent(new NetworkEvents.PlayerlistResponse(this.connectionId, players))
.asyncPostEvent();
eventBus.post(new NetworkEvents.PlayerlistResponse(this.connectionId, players));
}
private void gamelistHandler(String rec) {
@@ -216,9 +194,7 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
.map(m -> m.group(1).trim())
.toArray(String[]::new);
new EventFlow()
.addPostEvent(new NetworkEvents.GamelistResponse(this.connectionId, gameTypes))
.asyncPostEvent();
eventBus.post(new NetworkEvents.GamelistResponse(this.connectionId, gameTypes));
}
private void helpHandler(String rec) {

View File

@@ -3,6 +3,7 @@ package org.toop.framework.audio;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.toop.framework.dispatch.interfaces.Dispatcher;
import org.toop.framework.eventbus.GlobalEventBus;
import org.toop.framework.resource.resources.BaseResource;
import org.toop.framework.resource.types.AudioResource;
@@ -94,7 +95,7 @@ public class MusicManagerTest {
List<MockAudioResource> resources = List.of(track1, track2, track3);
manager = new MusicManager<>(resources, dispatcher);
manager = new MusicManager<>(GlobalEventBus.get(), resources, dispatcher);
}
@Test
@@ -188,7 +189,7 @@ public class MusicManagerTest {
manyTracks.add(new MockAudioResource("track" + i));
}
MusicManager<MockAudioResource> multiManager = new MusicManager<>(manyTracks, dispatcher);
MusicManager<MockAudioResource> multiManager = new MusicManager<>(GlobalEventBus.get(), manyTracks, dispatcher);
for (int i = 0; i < manyTracks.size() - 1; i++) {
multiManager.play();