GlobalEventBus is now async instead

This commit is contained in:
Bas de Jong
2026-01-09 23:34:29 +01:00
parent 5e5948d1fe
commit 6a395cc40b
5 changed files with 38 additions and 14 deletions

View File

@@ -415,11 +415,9 @@ public class EventFlow {
/** /**
* Posts the event added through {@link #addPostEvent} asynchronously. * Posts the event added through {@link #addPostEvent} asynchronously.
* *
* @deprecated use {@link #postEvent()} instead.
*/ */
@Deprecated
public EventFlow asyncPostEvent() { public EventFlow asyncPostEvent() {
eventBus.post(this.event); GlobalEventBus.get().post(this.event);
return this; return this;
} }

View File

@@ -1,15 +1,20 @@
package org.toop.framework.eventbus; package org.toop.framework.eventbus;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.toop.framework.eventbus.bus.AsyncEventBus;
import org.toop.framework.eventbus.bus.DefaultEventBus;
import org.toop.framework.eventbus.bus.DisruptorEventBus; import org.toop.framework.eventbus.bus.DisruptorEventBus;
import org.toop.framework.eventbus.bus.EventBus; import org.toop.framework.eventbus.bus.EventBus;
import org.toop.framework.eventbus.events.EventType; import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.store.DefaultSubscriberStore; import org.toop.framework.eventbus.store.DefaultSubscriberStore;
import org.toop.framework.eventbus.subscriber.Subscriber; import org.toop.framework.eventbus.subscriber.Subscriber;
public class GlobalEventBus implements EventBus { import java.util.concurrent.ExecutorService;
private static final EventBus INSTANCE = new DisruptorEventBus( import java.util.concurrent.Executors;
LogManager.getLogger(DisruptorEventBus.class),
public class GlobalEventBus implements AsyncEventBus {
private static final AsyncEventBus INSTANCE = new DefaultEventBus(
LogManager.getLogger(DefaultEventBus.class),
new DefaultSubscriberStore() new DefaultSubscriberStore()
); );
@@ -34,6 +39,11 @@ public class GlobalEventBus implements EventBus {
INSTANCE.post(event); INSTANCE.post(event);
} }
@Override
public <T extends EventType> void asyncPost(T event) {
INSTANCE.asyncPost(event);
}
@Override @Override
public void shutdown() { public void shutdown() {
INSTANCE.shutdown(); INSTANCE.shutdown();
@@ -43,4 +53,5 @@ public class GlobalEventBus implements EventBus {
public void reset() { public void reset() {
INSTANCE.reset(); INSTANCE.reset();
} }
} }

View File

@@ -0,0 +1,7 @@
package org.toop.framework.eventbus.bus;
import org.toop.framework.eventbus.events.EventType;
public interface AsyncEventBus extends EventBus {
<T extends EventType> void asyncPost(T event);
}

View File

@@ -5,12 +5,16 @@ import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.store.SubscriberStore; import org.toop.framework.eventbus.store.SubscriberStore;
import org.toop.framework.eventbus.subscriber.Subscriber; import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer; import java.util.function.Consumer;
public class DefaultEventBus implements EventBus { public class DefaultEventBus implements AsyncEventBus {
private final Logger logger; private final Logger logger;
private final SubscriberStore eventsHolder; private final SubscriberStore eventsHolder;
private final ExecutorService asyncExecutor = Executors.newCachedThreadPool();
public DefaultEventBus(Logger logger, SubscriberStore eventsHolder) { public DefaultEventBus(Logger logger, SubscriberStore eventsHolder) {
this.logger = logger; this.logger = logger;
this.eventsHolder = eventsHolder; this.eventsHolder = eventsHolder;
@@ -36,11 +40,16 @@ public class DefaultEventBus implements EventBus {
Class<T> eventClass = (Class<T>) subscriber.event(); Class<T> eventClass = (Class<T>) subscriber.event();
Consumer<EventType> action = (Consumer<EventType>) subscriber.handler(); Consumer<EventType> action = (Consumer<EventType>) subscriber.handler();
action.accept((EventType) eventClass.cast(event)); action.accept(eventClass.cast(event));
} }
} }
} }
@Override
public <T extends EventType> void asyncPost(T event) {
asyncExecutor.submit(() -> post(event));
}
@Override @Override
public void shutdown() { public void shutdown() {
eventsHolder.reset(); eventsHolder.reset();
@@ -50,4 +59,5 @@ public class DefaultEventBus implements EventBus {
public void reset() { public void reset() {
eventsHolder.reset(); eventsHolder.reset();
} }
} }

View File

@@ -77,13 +77,11 @@ public class OnlineThreadBehaviour extends AbstractThreadBehaviour implements Su
} }
logger.info("Successfully collected current turn's player"); logger.info("Successfully collected current turn's player");
moveExecutor.submit(() -> {
long move = player.getMove(gameCopy); long move = player.getMove(gameCopy);
logger.info("Move set: {}", move); logger.info("Move set: {}", move);
logger.info("Completed onYourTurn"); logger.info("Completed onYourTurn");
sendMove(clientId, move); sendMove(clientId, move);
});
} }
/** /**