From 6a395cc40b7f441c83f0e9713d6c9a524cdf4927 Mon Sep 17 00:00:00 2001 From: Bas de Jong Date: Fri, 9 Jan 2026 23:34:29 +0100 Subject: [PATCH] GlobalEventBus is now async instead --- .../org/toop/framework/eventbus/EventFlow.java | 4 +--- .../toop/framework/eventbus/GlobalEventBus.java | 17 ++++++++++++++--- .../framework/eventbus/bus/AsyncEventBus.java | 7 +++++++ .../framework/eventbus/bus/DefaultEventBus.java | 14 ++++++++++++-- .../game/gameThreads/OnlineThreadBehaviour.java | 10 ++++------ 5 files changed, 38 insertions(+), 14 deletions(-) create mode 100644 framework/src/main/java/org/toop/framework/eventbus/bus/AsyncEventBus.java diff --git a/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java b/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java index a9ffd9d..b9631b2 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java +++ b/framework/src/main/java/org/toop/framework/eventbus/EventFlow.java @@ -415,11 +415,9 @@ public class EventFlow { /** * Posts the event added through {@link #addPostEvent} asynchronously. * - * @deprecated use {@link #postEvent()} instead. */ - @Deprecated public EventFlow asyncPostEvent() { - eventBus.post(this.event); + GlobalEventBus.get().post(this.event); return this; } diff --git a/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java index f70f751..96c3c2d 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java +++ b/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java @@ -1,15 +1,20 @@ package org.toop.framework.eventbus; import org.apache.logging.log4j.LogManager; +import org.toop.framework.eventbus.bus.AsyncEventBus; +import org.toop.framework.eventbus.bus.DefaultEventBus; import org.toop.framework.eventbus.bus.DisruptorEventBus; import org.toop.framework.eventbus.bus.EventBus; import org.toop.framework.eventbus.events.EventType; import org.toop.framework.eventbus.store.DefaultSubscriberStore; import org.toop.framework.eventbus.subscriber.Subscriber; -public class GlobalEventBus implements EventBus { - private static final EventBus INSTANCE = new DisruptorEventBus( - LogManager.getLogger(DisruptorEventBus.class), +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class GlobalEventBus implements AsyncEventBus { + private static final AsyncEventBus INSTANCE = new DefaultEventBus( + LogManager.getLogger(DefaultEventBus.class), new DefaultSubscriberStore() ); @@ -34,6 +39,11 @@ public class GlobalEventBus implements EventBus { INSTANCE.post(event); } + @Override + public void asyncPost(T event) { + INSTANCE.asyncPost(event); + } + @Override public void shutdown() { INSTANCE.shutdown(); @@ -43,4 +53,5 @@ public class GlobalEventBus implements EventBus { public void reset() { INSTANCE.reset(); } + } diff --git a/framework/src/main/java/org/toop/framework/eventbus/bus/AsyncEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/bus/AsyncEventBus.java new file mode 100644 index 0000000..1bcdd5e --- /dev/null +++ b/framework/src/main/java/org/toop/framework/eventbus/bus/AsyncEventBus.java @@ -0,0 +1,7 @@ +package org.toop.framework.eventbus.bus; + +import org.toop.framework.eventbus.events.EventType; + +public interface AsyncEventBus extends EventBus { + void asyncPost(T event); +} diff --git a/framework/src/main/java/org/toop/framework/eventbus/bus/DefaultEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/bus/DefaultEventBus.java index 7b77ff3..7afc3a2 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/bus/DefaultEventBus.java +++ b/framework/src/main/java/org/toop/framework/eventbus/bus/DefaultEventBus.java @@ -5,12 +5,16 @@ import org.toop.framework.eventbus.events.EventType; import org.toop.framework.eventbus.store.SubscriberStore; import org.toop.framework.eventbus.subscriber.Subscriber; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Consumer; -public class DefaultEventBus implements EventBus { +public class DefaultEventBus implements AsyncEventBus { private final Logger logger; private final SubscriberStore eventsHolder; + private final ExecutorService asyncExecutor = Executors.newCachedThreadPool(); + public DefaultEventBus(Logger logger, SubscriberStore eventsHolder) { this.logger = logger; this.eventsHolder = eventsHolder; @@ -36,11 +40,16 @@ public class DefaultEventBus implements EventBus { Class eventClass = (Class) subscriber.event(); Consumer action = (Consumer) subscriber.handler(); - action.accept((EventType) eventClass.cast(event)); + action.accept(eventClass.cast(event)); } } } + @Override + public void asyncPost(T event) { + asyncExecutor.submit(() -> post(event)); + } + @Override public void shutdown() { eventsHolder.reset(); @@ -50,4 +59,5 @@ public class DefaultEventBus implements EventBus { public void reset() { eventsHolder.reset(); } + } diff --git a/framework/src/main/java/org/toop/framework/game/gameThreads/OnlineThreadBehaviour.java b/framework/src/main/java/org/toop/framework/game/gameThreads/OnlineThreadBehaviour.java index 6a7c5d2..b4d1cac 100644 --- a/framework/src/main/java/org/toop/framework/game/gameThreads/OnlineThreadBehaviour.java +++ b/framework/src/main/java/org/toop/framework/game/gameThreads/OnlineThreadBehaviour.java @@ -77,13 +77,11 @@ public class OnlineThreadBehaviour extends AbstractThreadBehaviour implements Su } logger.info("Successfully collected current turn's player"); - moveExecutor.submit(() -> { - long move = player.getMove(gameCopy); - logger.info("Move set: {}", move); - logger.info("Completed onYourTurn"); + long move = player.getMove(gameCopy); + logger.info("Move set: {}", move); + logger.info("Completed onYourTurn"); - sendMove(clientId, move); - }); + sendMove(clientId, move); } /**