Can now test the event bus, created testable interfaces

This commit is contained in:
lieght
2025-12-06 17:26:00 +01:00
parent 816a0b953e
commit fd357fd27d
15 changed files with 394 additions and 232 deletions

View File

@@ -1,13 +1,6 @@
package org.toop; package org.toop;
import org.toop.app.App; import org.toop.app.App;
import org.toop.framework.audio.*;
import org.toop.framework.networking.NetworkingClientEventListener;
import org.toop.framework.networking.NetworkingClientManager;
import org.toop.framework.resource.ResourceLoader;
import org.toop.framework.resource.ResourceManager;
import org.toop.framework.resource.resources.MusicAsset;
import org.toop.framework.resource.resources.SoundEffectAsset;
public final class Main { public final class Main {
static void main(String[] args) { static void main(String[] args) {

View File

@@ -88,7 +88,7 @@ public final class Server {
Primitive.text("connecting"), 0, 0, reconnectAttempts, true, true Primitive.text("connecting"), 0, 0, reconnectAttempts, true, true
); );
WidgetContainer.getCurrentView().transitionNext(loading); WidgetContainer.getCurrentView().transitionNextCustom(loading, "disconnect", this::disconnect);
var a = new EventFlow() var a = new EventFlow()
.addPostEvent(NetworkEvents.StartClient.class, .addPostEvent(NetworkEvents.StartClient.class,
@@ -105,8 +105,9 @@ public final class Server {
); );
}); });
a.onResponse(NetworkEvents.StartClientResponse.class, e -> { a.onResponse(NetworkEvents.CreatedIdForClient.class, e -> clientId = e.clientId(), true);
a.onResponse(NetworkEvents.StartClientResponse.class, e -> {
if (!e.successful()) { if (!e.successful()) {
return; return;
} }
@@ -118,7 +119,6 @@ public final class Server {
a.unsubscribe("startclient"); a.unsubscribe("startclient");
this.user = user; this.user = user;
clientId = e.clientId();
new EventFlow().addPostEvent(new NetworkEvents.SendLogin(clientId, user)).postEvent(); new EventFlow().addPostEvent(new NetworkEvents.SendLogin(clientId, user)).postEvent();
@@ -129,21 +129,24 @@ public final class Server {
}, false, "startclient") }, false, "startclient")
.listen( .listen(
NetworkEvents.ConnectTry.class, NetworkEvents.ConnectTry.class,
e -> Platform.runLater( e -> {
() -> { if (clientId != e.clientId()) return;
try { Platform.runLater(
loading.setAmount(e.amount()); () -> {
if (e.amount() >= loading.getMaxAmount()) { try {
loading.triggerFailure(); loading.setAmount(e.amount());
} if (e.amount() >= loading.getMaxAmount()) {
} catch (Exception ex) { loading.triggerFailure();
throw new RuntimeException(ex); }
} } catch (Exception ex) {
} throw new RuntimeException(ex);
), }
false, "connecting" }
) );
},
false, "connecting"
)
.postEvent(); .postEvent();
a.listen(NetworkEvents.ChallengeResponse.class, this::handleReceivedChallenge, false, "challenge") a.listen(NetworkEvents.ChallengeResponse.class, this::handleReceivedChallenge, false, "challenge")

View File

@@ -49,11 +49,11 @@ public class SongDisplay extends VBox implements Widget {
previousButton.getStyleClass().setAll("previous-button"); previousButton.getStyleClass().setAll("previous-button");
skipButton.setOnAction( event -> { skipButton.setOnAction( event -> {
GlobalEventBus.post(new AudioEvents.SkipMusic()); GlobalEventBus.get().post(new AudioEvents.SkipMusic());
}); });
pauseButton.setOnAction(event -> { pauseButton.setOnAction(event -> {
GlobalEventBus.post(new AudioEvents.PauseMusic()); GlobalEventBus.get().post(new AudioEvents.PauseMusic());
if (pauseButton.getText().equals("")) { if (pauseButton.getText().equals("")) {
pauseButton.setText(""); pauseButton.setText("");
} }
@@ -63,7 +63,7 @@ public class SongDisplay extends VBox implements Widget {
}); });
previousButton.setOnAction( event -> { previousButton.setOnAction( event -> {
GlobalEventBus.post(new AudioEvents.PreviousMusic()); GlobalEventBus.get().post(new AudioEvents.PreviousMusic());
}); });
HBox control = new HBox(10, previousButton, pauseButton, skipButton); HBox control = new HBox(10, previousButton, pauseButton, skipButton);

View File

@@ -73,7 +73,7 @@ public class AudioEventListener<T extends AudioResource, K extends AudioResource
} }
private void handleGetVolume(AudioEvents.GetVolume event) { private void handleGetVolume(AudioEvents.GetVolume event) {
GlobalEventBus.postAsync(new AudioEvents.GetVolumeResponse( GlobalEventBus.get().post(new AudioEvents.GetVolumeResponse(
audioVolumeManager.getVolume(event.controlType()), audioVolumeManager.getVolume(event.controlType()),
event.identifier())); event.identifier()));
} }

View File

@@ -6,7 +6,6 @@ import org.toop.framework.audio.events.AudioEvents;
import org.toop.framework.dispatch.interfaces.Dispatcher; import org.toop.framework.dispatch.interfaces.Dispatcher;
import org.toop.framework.dispatch.JavaFXDispatcher; import org.toop.framework.dispatch.JavaFXDispatcher;
import org.toop.annotations.TestsOnly; import org.toop.annotations.TestsOnly;
import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.eventbus.GlobalEventBus; import org.toop.framework.eventbus.GlobalEventBus;
import org.toop.framework.resource.types.AudioResource; import org.toop.framework.resource.types.AudioResource;
@@ -124,7 +123,7 @@ public class MusicManager<T extends AudioResource> implements org.toop.framework
Runnable currentMusicTask = new Runnable() { Runnable currentMusicTask = new Runnable() {
@Override @Override
public void run() { public void run() {
GlobalEventBus.post(new AudioEvents.PlayingMusic(track.getName(), track.currentPosition(), track.duration())); GlobalEventBus.get().post(new AudioEvents.PlayingMusic(track.getName(), track.currentPosition(), track.duration()));
scheduler.schedule(this, 1, TimeUnit.SECONDS); scheduler.schedule(this, 1, TimeUnit.SECONDS);
} }
}; };

View File

@@ -0,0 +1,118 @@
package org.toop.framework.eventbus;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.toop.framework.eventbus.events.EventType;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadFactory;
public class DisruptorEventBus<T extends EventType> implements EventBus<T> {
/** Wrapper used inside the ring buffer. */
private class EventHolder {
EventType event;
}
EventsHolder eventsHolder;
private final Disruptor<EventHolder> disruptor;
private final RingBuffer<EventHolder> ringBuffer;
public DisruptorEventBus(EventsHolder eventsHolder) {
this.eventsHolder = eventsHolder;
ThreadFactory threadFactory =
r -> {
Thread t = new Thread(r, "EventBus-Disruptor");
t.setDaemon(true);
return t;
};
disruptor = getEventHolderDisruptor(threadFactory);
disruptor.start();
this.ringBuffer = disruptor.getRingBuffer();
}
private Disruptor<EventHolder> getEventHolderDisruptor(ThreadFactory threadFactory) {
int RING_BUFFER_SIZE = 1024 * 64;
Disruptor<EventHolder> disruptor = new Disruptor<>(
EventHolder::new,
RING_BUFFER_SIZE,
threadFactory,
ProducerType.MULTI,
new BusySpinWaitStrategy());
disruptor.handleEventsWith(
(holder, _, _) -> {
if (holder.event != null) {
dispatchEvent(holder.event);
holder.event = null;
}
});
return disruptor;
}
@Override
public void subscribe(ListenerHandler<? extends EventType> listener) {
eventsHolder.add(listener);
}
@Override
public void unsubscribe(ListenerHandler<? extends EventType> listener) {
eventsHolder.remove(listener);
}
@Override
public void post(EventType event) {
long seq = ringBuffer.next();
try {
EventHolder holder = ringBuffer.get(seq);
holder.event = event;
} finally {
ringBuffer.publish(seq);
}
}
@Override
public void shutdown() {
disruptor.shutdown();
eventsHolder.reset();
}
@Override
public void reset() {
eventsHolder.reset();
}
private void dispatchEvent(EventType event) {
CopyOnWriteArrayList<ListenerHandler<?>> classListeners = (CopyOnWriteArrayList<ListenerHandler<?>>) eventsHolder.get(event.getClass());
if (classListeners != null) {
for (ListenerHandler<?> listener : classListeners) {
try {
callListener(listener, event);
} catch (Throwable e) {
// logger.warn("Exception while handling event: {}", event, e); TODO
}
}
}
CopyOnWriteArrayList<ListenerHandler<?>> genericListeners = (CopyOnWriteArrayList<ListenerHandler<?>>) eventsHolder.get(Object.class);
if (genericListeners != null) {
for (ListenerHandler<?> listener : genericListeners) {
try {
callListener(listener, event);
} catch (Throwable e) {
// logger.warn("Exception while handling event: {}", event, e); TODO
}
}
}
}
private static <T extends EventType> void callListener(ListenerHandler<T> handler, EventType event) {
handler.getListener().accept((T) event);
}
}

View File

@@ -0,0 +1,31 @@
package org.toop.framework.eventbus;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
public class DisruptorEventsHolder implements EventsHolder {
private final Map<Class<?>, CopyOnWriteArrayList<ListenerHandler<?>>> LISTENERS = new ConcurrentHashMap<>();
@Override
public void add(ListenerHandler<?> listener) {
LISTENERS.computeIfAbsent(listener.getListenerClass(), _ -> new CopyOnWriteArrayList<>()).add(listener);
}
@Override
public void remove(ListenerHandler<?> listener) {
LISTENERS.getOrDefault(listener.getListenerClass(), new CopyOnWriteArrayList<>()).remove(listener);
LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}
@Override
public List<ListenerHandler<?>> get(Class<?> listenerClass) {
return LISTENERS.get(listenerClass);
}
@Override
public void reset() {
LISTENERS.clear();
}
}

View File

@@ -0,0 +1,11 @@
package org.toop.framework.eventbus;
import org.toop.framework.eventbus.events.EventType;
public interface EventBus<E> {
void subscribe(ListenerHandler<? extends EventType> listener);
void unsubscribe(ListenerHandler<? extends EventType> listener);
void post(EventType event);
void shutdown();
void reset();
}

View File

@@ -158,7 +158,7 @@ public class EventFlow {
(Consumer<ResponseToUniqueEvent>) newAction (Consumer<ResponseToUniqueEvent>) newAction
); );
GlobalEventBus.subscribe(listener); GlobalEventBus.get().subscribe(listener);
this.listeners.add(listener); this.listeners.add(listener);
return this; return this;
} }
@@ -246,7 +246,7 @@ public class EventFlow {
newAction newAction
); );
GlobalEventBus.subscribe(listener); GlobalEventBus.get().subscribe(listener);
this.listeners.add(listener); this.listeners.add(listener);
return this; return this;
} }
@@ -294,7 +294,7 @@ public class EventFlow {
newAction newAction
); );
GlobalEventBus.subscribe(listener); GlobalEventBus.get().subscribe(listener);
this.listeners.add(listener); this.listeners.add(listener);
return this; return this;
} }
@@ -378,7 +378,7 @@ public class EventFlow {
newAction newAction
); );
GlobalEventBus.subscribe(listener); GlobalEventBus.get().subscribe(listener);
this.listeners.add(listener); this.listeners.add(listener);
return this; return this;
} }
@@ -401,15 +401,18 @@ public class EventFlow {
* Posts the event added through {@link #addPostEvent}. * Posts the event added through {@link #addPostEvent}.
*/ */
public EventFlow postEvent() { public EventFlow postEvent() {
GlobalEventBus.post(this.event); GlobalEventBus.get().post(this.event);
return this; return this;
} }
/** /**
* Posts the event added through {@link #addPostEvent} asynchronously. * Posts the event added through {@link #addPostEvent} asynchronously.
*
* @deprecated use {@link #postEvent()} instead.
*/ */
@Deprecated
public EventFlow asyncPostEvent() { public EventFlow asyncPostEvent() {
GlobalEventBus.postAsync(this.event); GlobalEventBus.get().post(this.event);
return this; return this;
} }
@@ -422,7 +425,7 @@ public class EventFlow {
public void unsubscribe(Object listenerObject) { public void unsubscribe(Object listenerObject) {
this.listeners.removeIf(handler -> { this.listeners.removeIf(handler -> {
if (handler.getListener() == listenerObject) { if (handler.getListener() == listenerObject) {
GlobalEventBus.unsubscribe(handler); GlobalEventBus.get().unsubscribe(handler);
return true; return true;
} }
return false; return false;
@@ -438,7 +441,7 @@ public class EventFlow {
public void unsubscribe(long listenerId) { public void unsubscribe(long listenerId) {
this.listeners.removeIf(handler -> { this.listeners.removeIf(handler -> {
if (handler.getId() == listenerId) { if (handler.getId() == listenerId) {
GlobalEventBus.unsubscribe(handler); GlobalEventBus.get().unsubscribe(handler);
return true; return true;
} }
return false; return false;
@@ -453,7 +456,7 @@ public class EventFlow {
public void unsubscribe(String name) { public void unsubscribe(String name) {
this.listeners.removeIf(handler -> { this.listeners.removeIf(handler -> {
if (handler.getName().equals(name)) { if (handler.getName().equals(name)) {
GlobalEventBus.unsubscribe(handler); GlobalEventBus.get().unsubscribe(handler);
return true; return true;
} }
return false; return false;
@@ -465,7 +468,7 @@ public class EventFlow {
*/ */
public void unsubscribeAll() { public void unsubscribeAll() {
listeners.removeIf(handler -> { listeners.removeIf(handler -> {
GlobalEventBus.unsubscribe(handler); GlobalEventBus.get().unsubscribe(handler);
return true; return true;
}); });
} }

View File

@@ -0,0 +1,10 @@
package org.toop.framework.eventbus;
import java.util.List;
public interface EventsHolder {
void add(ListenerHandler<?> listener);
void remove(ListenerHandler<?> listener);
List<ListenerHandler<?>> get(Class<?> listenerClass);
void reset();
}

View File

@@ -1,193 +1,13 @@
package org.toop.framework.eventbus; package org.toop.framework.eventbus;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.events.EventType; import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.events.UniqueEvent;
/** public class GlobalEventBus {
* GlobalEventBus backed by the LMAX Disruptor for ultra-low latency, high-throughput event private static final EventBus<EventType> INSTANCE = new DisruptorEventBus<>(new DisruptorEventsHolder());
* publishing.
*/
public final class GlobalEventBus {
private static final Logger logger = LogManager.getLogger(GlobalEventBus.class);
/** Map of event class to type-specific listeners. */
private static final Map<Class<?>, CopyOnWriteArrayList<ListenerHandler<?>>>
LISTENERS = new ConcurrentHashMap<>();
/** Map of event class to Snowflake-ID-specific listeners. */
private static final Map<
Class<?>, ConcurrentHashMap<Long, Consumer<? extends UniqueEvent>>>
UUID_LISTENERS = new ConcurrentHashMap<>();
/** Disruptor ring buffer size (must be power of two). */
private static final int RING_BUFFER_SIZE = 1024 * 64;
/** Disruptor instance. */
private static final Disruptor<EventHolder> DISRUPTOR;
/** Ring buffer used for publishing events. */
private static final RingBuffer<EventHolder> RING_BUFFER;
static {
ThreadFactory threadFactory =
r -> {
Thread t = new Thread(r, "EventBus-Disruptor");
t.setDaemon(true);
return t;
};
DISRUPTOR =
new Disruptor<>(
EventHolder::new,
RING_BUFFER_SIZE,
threadFactory,
ProducerType.MULTI,
new BusySpinWaitStrategy());
DISRUPTOR.handleEventsWith(
(holder, seq, endOfBatch) -> {
if (holder.event != null) {
dispatchEvent(holder.event);
holder.event = null;
}
});
DISRUPTOR.start();
RING_BUFFER = DISRUPTOR.getRingBuffer();
}
/** Prevent instantiation. */
private GlobalEventBus() {} private GlobalEventBus() {}
/** Wrapper used inside the ring buffer. */ public static EventBus<EventType> get() {
private static class EventHolder { return INSTANCE;
EventType event;
}
// ------------------------------------------------------------------------
// Subscription
// ------------------------------------------------------------------------
public static <T extends EventType> void subscribe(ListenerHandler<T> listener) {
logger.debug("Subscribing to {}: {}", listener.getListenerClass().getSimpleName(), listener.getListener().getClass().getSimpleName());
LISTENERS.computeIfAbsent(listener.getListenerClass(), _ -> new CopyOnWriteArrayList<>()).add(listener);
}
// TODO
public static <T extends UniqueEvent> void subscribeById(
Class<T> eventClass, long eventId, Consumer<T> listener) {
UUID_LISTENERS
.computeIfAbsent(eventClass, _ -> new ConcurrentHashMap<>())
.put(eventId, listener);
}
public static void unsubscribe(ListenerHandler<?> listener) {
logger.debug("Unsubscribing from {}: {}", listener.getListenerClass().getSimpleName(), listener.getListener().getClass().getSimpleName());
LISTENERS.getOrDefault(listener.getListenerClass(), new CopyOnWriteArrayList<>())
.remove(listener);
LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}
// TODO
public static <T extends UniqueEvent> void unsubscribeById(
Class<T> eventClass, long eventId) {
Map<Long, Consumer<? extends UniqueEvent>> map = UUID_LISTENERS.get(eventClass);
if (map != null) map.remove(eventId);
}
// ------------------------------------------------------------------------
// Posting
// ------------------------------------------------------------------------
public static <T extends EventType> void post(T event) {
dispatchEvent(event); // synchronous
}
public static <T extends EventType> void postAsync(T event) {
long seq = RING_BUFFER.next();
try {
EventHolder holder = RING_BUFFER.get(seq);
holder.event = event;
} finally {
RING_BUFFER.publish(seq);
}
}
@SuppressWarnings("unchecked")
private static <T extends EventType> void callListener(ListenerHandler<?> raw, EventType event) {
ListenerHandler<T> handler = (ListenerHandler<T>) raw;
Consumer<T> listener = handler.getListener();
T casted = (T) event;
listener.accept(casted);
}
@SuppressWarnings("unchecked")
private static void dispatchEvent(EventType event) {
Class<?> clazz = event.getClass();
logger.debug("Triggered event: {}", event.getClass().getSimpleName());
CopyOnWriteArrayList<ListenerHandler<?>> classListeners = LISTENERS.get(clazz);
if (classListeners != null) {
for (ListenerHandler<?> listener : classListeners) {
try {
callListener(listener, event);
} catch (Throwable e) {
logger.warn("Exception while handling event: {}", event, e);
}
}
}
CopyOnWriteArrayList<ListenerHandler<?>> genericListeners = LISTENERS.get(Object.class);
if (genericListeners != null) {
for (ListenerHandler<?> listener : genericListeners) {
try {
callListener(listener, event);
} catch (Throwable e) {
logger.warn("Exception while handling event: {}", event, e);
}
}
}
if (event instanceof UniqueEvent snowflakeEvent) {
Map<Long, Consumer<? extends UniqueEvent>> map = UUID_LISTENERS.get(clazz);
if (map != null) {
Consumer<UniqueEvent> listener =
(Consumer<UniqueEvent>) map.remove(snowflakeEvent.getIdentifier());
if (listener != null) {
try {
listener.accept(snowflakeEvent);
} catch (Throwable ignored) {
}
}
}
}
}
// ------------------------------------------------------------------------
// Lifecycle
// ------------------------------------------------------------------------
public static void shutdown() {
DISRUPTOR.shutdown();
LISTENERS.clear();
UUID_LISTENERS.clear();
}
public static void reset() {
LISTENERS.clear();
UUID_LISTENERS.clear();
}
public static Map<Class<?>, CopyOnWriteArrayList<ListenerHandler<?>>> getAllListeners() {
return LISTENERS;
} }
} }

View File

@@ -0,0 +1,159 @@
package org.toop.framework.eventbus;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.*;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.events.EventType;
/**
* GlobalEventBus backed by the LMAX Disruptor for ultra-low latency, high-throughput event
* publishing.
*/
public final class OldGlobalEventBus {
private static final Logger logger = LogManager.getLogger(OldGlobalEventBus.class);
/** Disruptor ring buffer size (must be power of two). */
private static final int RING_BUFFER_SIZE = 1024 * 64;
/** Disruptor instance. */
private static final Disruptor<EventHolder> DISRUPTOR;
/** Ring buffer used for publishing events. */
private static final RingBuffer<EventHolder> RING_BUFFER;
static {
ThreadFactory threadFactory =
r -> {
Thread t = new Thread(r, "EventBus-Disruptor");
t.setDaemon(true);
return t;
};
DISRUPTOR =
new Disruptor<>(
EventHolder::new,
RING_BUFFER_SIZE,
threadFactory,
ProducerType.MULTI,
new BusySpinWaitStrategy());
DISRUPTOR.handleEventsWith(
(holder, seq, endOfBatch) -> {
if (holder.event != null) {
dispatchEvent(holder.event);
holder.event = null;
}
});
DISRUPTOR.start();
RING_BUFFER = DISRUPTOR.getRingBuffer();
}
/** Prevent instantiation. */
private OldGlobalEventBus() {}
/** Wrapper used inside the ring buffer. */
private static class EventHolder {
EventType event;
}
/** Map of event class to type-specific listeners. */
private static EventsHolder eventsHolder;
public static void setEventsHolder(EventsHolder eventsHolder) {
if (OldGlobalEventBus.eventsHolder != null) return;
OldGlobalEventBus.eventsHolder = eventsHolder;
}
// ------------------------------------------------------------------------
// Subscription
// ------------------------------------------------------------------------
public static <T extends EventType> void subscribe(ListenerHandler<T> listener) {
logger.debug("Subscribing to {}: {}", listener.getListenerClass().getSimpleName(), listener.getListener().getClass().getSimpleName());
eventsHolder.add(listener);
}
public static void unsubscribe(ListenerHandler<?> listener) {
logger.debug("Unsubscribing from {}: {}", listener.getListenerClass().getSimpleName(), listener.getListener().getClass().getSimpleName());
eventsHolder.remove(listener);
}
// ------------------------------------------------------------------------
// Posting
// ------------------------------------------------------------------------
public static <T extends EventType> void post(T event) {
dispatchEvent(event); // synchronous
}
public static <T extends EventType> void postAsync(T event) {
long seq = RING_BUFFER.next();
try {
EventHolder holder = RING_BUFFER.get(seq);
holder.event = event;
} finally {
RING_BUFFER.publish(seq);
}
}
@SuppressWarnings("unchecked")
private static <T extends EventType> void callListener(ListenerHandler<?> raw, EventType event) {
ListenerHandler<T> handler = (ListenerHandler<T>) raw;
Consumer<T> listener = handler.getListener();
T casted = (T) event;
listener.accept(casted);
}
@SuppressWarnings("unchecked")
private static void dispatchEvent(EventType event) {
Class<?> clazz = event.getClass();
logger.debug("Triggered event: {}", event.getClass().getSimpleName());
CopyOnWriteArrayList<ListenerHandler<?>> classListeners = (CopyOnWriteArrayList<ListenerHandler<?>>) eventsHolder.get(clazz);
if (classListeners != null) {
for (ListenerHandler<?> listener : classListeners) {
try {
callListener(listener, event);
} catch (Throwable e) {
logger.warn("Exception while handling event: {}", event, e);
}
}
}
CopyOnWriteArrayList<ListenerHandler<?>> genericListeners = (CopyOnWriteArrayList<ListenerHandler<?>>) eventsHolder.get(Object.class);
if (genericListeners != null) {
for (ListenerHandler<?> listener : genericListeners) {
try {
callListener(listener, event);
} catch (Throwable e) {
logger.warn("Exception while handling event: {}", event, e);
}
}
}
}
// ------------------------------------------------------------------------
// Lifecycle
// ------------------------------------------------------------------------
public static void shutdown() {
DISRUPTOR.shutdown();
eventsHolder.reset();
}
public static void reset() {
eventsHolder.reset();
}
// public static Map<Class<?>, CopyOnWriteArrayList<ListenerHandler<?>>> getAllListeners() {
//// return LISTENERS;
// }
}

View File

@@ -40,6 +40,7 @@ public class NetworkingClientEventListener {
void handleStartClient(NetworkEvents.StartClient event) { void handleStartClient(NetworkEvents.StartClient event) {
long clientId = SnowflakeGenerator.nextId(); long clientId = SnowflakeGenerator.nextId();
new EventFlow().addPostEvent(new NetworkEvents.CreatedIdForClient(clientId, event.identifier())).postEvent();
clientManager.startClient( clientManager.startClient(
clientId, clientId,
event.networkingClient(), event.networkingClient(),

View File

@@ -8,6 +8,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.eventbus.GlobalEventBus; import org.toop.framework.eventbus.GlobalEventBus;
import org.toop.framework.networking.events.NetworkEvents; import org.toop.framework.networking.events.NetworkEvents;
import org.toop.framework.networking.exceptions.ClientNotFoundException; import org.toop.framework.networking.exceptions.ClientNotFoundException;
@@ -28,8 +29,16 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
Runnable onSuccess, Runnable onSuccess,
Runnable onFailure Runnable onFailure
) { ) {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
EventFlow closeEvent = new EventFlow()
.listen(
NetworkEvents.CloseClient.class,
e -> {
if (e.clientId() == id) scheduler.shutdownNow();
}, "close");
Runnable connectTask = new Runnable() { Runnable connectTask = new Runnable() {
int attempts = 0; int attempts = 0;
@@ -46,7 +55,7 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
nClient.connect(id, nConnector.host(), nConnector.port()); nClient.connect(id, nConnector.host(), nConnector.port());
networkClients.put(id, nClient); networkClients.put(id, nClient);
logger.info("New client started successfully for {}:{}", nConnector.host(), nConnector.port()); logger.info("New client started successfully for {}:{}", nConnector.host(), nConnector.port());
GlobalEventBus.post(new NetworkEvents.ConnectTry(id, attempts, nConnector.reconnectAttempts(), true)); GlobalEventBus.get().post(new NetworkEvents.ConnectTry(id, attempts, nConnector.reconnectAttempts(), true));
onSuccess.run(); onSuccess.run();
scheduler.shutdown(); scheduler.shutdown();
} catch (CouldNotConnectException e) { } catch (CouldNotConnectException e) {
@@ -54,17 +63,17 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
if (attempts < nConnector.reconnectAttempts()) { if (attempts < nConnector.reconnectAttempts()) {
logger.warn("Could not connect to {}:{}. Retrying in {} {}", logger.warn("Could not connect to {}:{}. Retrying in {} {}",
nConnector.host(), nConnector.port(), nConnector.timeout(), nConnector.timeUnit()); nConnector.host(), nConnector.port(), nConnector.timeout(), nConnector.timeUnit());
GlobalEventBus.post(new NetworkEvents.ConnectTry(id, attempts, nConnector.reconnectAttempts(), false)); GlobalEventBus.get().post(new NetworkEvents.ConnectTry(id, attempts, nConnector.reconnectAttempts(), false));
scheduler.schedule(this, nConnector.timeout(), nConnector.timeUnit()); scheduler.schedule(this, nConnector.timeout(), nConnector.timeUnit());
} else { } else {
logger.error("Failed to start client for {}:{} after {} attempts", nConnector.host(), nConnector.port(), attempts); logger.error("Failed to start client for {}:{} after {} attempts", nConnector.host(), nConnector.port(), attempts);
GlobalEventBus.post(new NetworkEvents.ConnectTry(id, -1, nConnector.reconnectAttempts(), false)); GlobalEventBus.get().post(new NetworkEvents.ConnectTry(id, -1, nConnector.reconnectAttempts(), false));
onFailure.run(); onFailure.run();
scheduler.shutdown(); scheduler.shutdown();
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("Unexpected exception during startClient", e); logger.error("Unexpected exception during startClient", e);
GlobalEventBus.post(new NetworkEvents.ConnectTry(id, -1, nConnector.reconnectAttempts(), false)); GlobalEventBus.get().post(new NetworkEvents.ConnectTry(id, -1, nConnector.reconnectAttempts(), false));
onFailure.run(); onFailure.run();
scheduler.shutdown(); scheduler.shutdown();
} }
@@ -72,6 +81,8 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
}; };
scheduler.schedule(connectTask, 0, TimeUnit.MILLISECONDS); scheduler.schedule(connectTask, 0, TimeUnit.MILLISECONDS);
//
// closeEvent.unsubscribe("close");
} }
@Override @Override

View File

@@ -4,6 +4,7 @@ import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.toop.annotations.AutoResponseResult; import org.toop.annotations.AutoResponseResult;
import org.toop.framework.eventbus.GlobalEventBus;
import org.toop.framework.eventbus.events.*; import org.toop.framework.eventbus.events.*;
import org.toop.framework.networking.interfaces.NetworkingClient; import org.toop.framework.networking.interfaces.NetworkingClient;
import org.toop.framework.networking.types.NetworkingConnector; import org.toop.framework.networking.types.NetworkingConnector;
@@ -11,7 +12,7 @@ import org.toop.framework.networking.types.NetworkingConnector;
/** /**
* Defines all event types related to the networking subsystem. * Defines all event types related to the networking subsystem.
* <p> * <p>
* These events are used in conjunction with the {@link org.toop.framework.eventbus.GlobalEventBus} * These events are used in conjunction with the {@link GlobalEventBus}
* and {@link org.toop.framework.eventbus.EventFlow} to communicate between components * and {@link org.toop.framework.eventbus.EventFlow} to communicate between components
* such as networking clients, managers, and listeners. * such as networking clients, managers, and listeners.
* </p> * </p>
@@ -166,6 +167,10 @@ public class NetworkEvents extends EventsBase {
long identifier) long identifier)
implements UniqueEvent {} implements UniqueEvent {}
public record CreatedIdForClient(long clientId, long identifier) implements ResponseToUniqueEvent {}
public record ConnectTry(long clientId, int amount, int maxAmount, boolean success) implements GenericEvent {}
/** /**
* Response confirming that a client has been successfully started. * Response confirming that a client has been successfully started.
* <p> * <p>
@@ -181,8 +186,6 @@ public class NetworkEvents extends EventsBase {
public record StartClientResponse(long clientId, boolean successful, long identifier) public record StartClientResponse(long clientId, boolean successful, long identifier)
implements ResponseToUniqueEvent {} implements ResponseToUniqueEvent {}
public record ConnectTry(long clientId, int amount, int maxAmount, boolean success) implements GenericEvent {}
/** /**
* Requests reconnection of an existing client using its previous configuration. * Requests reconnection of an existing client using its previous configuration.
* <p> * <p>