Disabled error prone for now. Improved eventflow speed

This commit is contained in:
lieght
2025-09-24 22:04:00 +02:00
parent e6e11a3604
commit 7431d1b03f
21 changed files with 564 additions and 393 deletions

View File

@@ -1,13 +1,13 @@
package org.toop.framework.eventbus;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.events.EventWithUuid;
import org.toop.framework.eventbus.events.EventWithSnowflake;
import org.toop.framework.eventbus.SnowflakeGenerator;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
@@ -15,7 +15,7 @@ import java.util.function.Consumer;
* EventFlow is a utility class for creating, posting, and optionally subscribing to events
* in a type-safe and chainable manner. It is designed to work with the {@link GlobalEventBus}.
*
* <p>This class supports automatic UUID assignment for {@link EventWithUuid} events,
* <p>This class supports automatic UUID assignment for {@link EventWithSnowflake} events,
* and allows filtering subscribers so they only respond to events with a specific UUID.
* All subscription methods are chainable, and you can configure automatic unsubscription
* after an event has been successfully handled.</p>
@@ -28,8 +28,8 @@ public class EventFlow {
/** Cache of constructor handles for event classes to avoid repeated reflection lookups. */
private static final Map<Class<?>, MethodHandle> CONSTRUCTOR_CACHE = new ConcurrentHashMap<>();
/** Automatically assigned UUID for {@link EventWithUuid} events. */
private String eventId = null;
/** Automatically assigned UUID for {@link EventWithSnowflake} events. */
private long eventSnowflake = -1;
/** The event instance created by this publisher. */
private EventType event = null;
@@ -51,7 +51,7 @@ public class EventFlow {
*/
public <T extends EventType> EventFlow addPostEvent(Class<T> eventClass, Object... args) {
try {
boolean isUuidEvent = EventWithUuid.class.isAssignableFrom(eventClass);
boolean isUuidEvent = EventWithSnowflake.class.isAssignableFrom(eventClass);
MethodHandle ctorHandle = CONSTRUCTOR_CACHE.computeIfAbsent(eventClass, cls -> {
try {
@@ -67,12 +67,12 @@ public class EventFlow {
int expectedParamCount = ctorHandle.type().parameterCount();
if (isUuidEvent && args.length < expectedParamCount) {
this.eventId = UUID.randomUUID().toString();
this.eventSnowflake = new SnowflakeGenerator(1).nextId();
finalArgs = new Object[args.length + 1];
System.arraycopy(args, 0, finalArgs, 0, args.length);
finalArgs[args.length] = this.eventId;
finalArgs[args.length] = this.eventSnowflake;
} else if (isUuidEvent) {
this.eventId = (String) args[args.length - 1];
this.eventSnowflake = (Long) args[args.length - 1];
finalArgs = args;
} else {
finalArgs = args;
@@ -117,9 +117,9 @@ public class EventFlow {
/**
* Subscribe by ID: only fires if UUID matches this publisher's eventId.
*/
public <TT extends EventWithUuid> EventFlow onResponse(Class<TT> eventClass, Consumer<TT> action) {
public <TT extends EventWithSnowflake> EventFlow onResponse(Class<TT> eventClass, Consumer<TT> action) {
this.listener = GlobalEventBus.subscribe(eventClass, event -> {
if (event.eventId().equals(this.eventId)) {
if (event.eventSnowflake() == this.eventSnowflake) {
action.accept(event);
if (unsubscribeAfterSuccess && listener != null) {
GlobalEventBus.unsubscribe(listener);
@@ -134,10 +134,10 @@ public class EventFlow {
* Subscribe by ID without explicit class.
*/
@SuppressWarnings("unchecked")
public <TT extends EventWithUuid> EventFlow onResponse(Consumer<TT> action) {
public <TT extends EventWithSnowflake> EventFlow onResponse(Consumer<TT> action) {
this.listener = GlobalEventBus.subscribe(event -> {
if (event instanceof EventWithUuid uuidEvent) {
if (uuidEvent.eventId().equals(this.eventId)) {
if (event instanceof EventWithSnowflake uuidEvent) {
if (uuidEvent.eventSnowflake() == this.eventSnowflake) {
try {
TT typedEvent = (TT) uuidEvent;
action.accept(typedEvent);
@@ -215,7 +215,7 @@ public class EventFlow {
return event;
}
public String getEventId() {
return eventId;
public long getEventId() {
return eventSnowflake;
}
}

View File

@@ -1,80 +1,76 @@
package org.toop.framework.eventbus;
import org.toop.framework.eventbus.events.EventWithUuid;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.events.EventWithSnowflake;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Consumer;
/**
* GlobalEventBus is a high-throughput, thread-safe event bus for publishing and subscribing
* to events within the application.
*
* <p>It supports:</p>
* <ul>
* <li>Type-specific subscriptions via {@link #subscribe(Class, Consumer)}</li>
* <li>UUID-specific subscriptions via {@link #subscribeById(Class, String, Consumer)}</li>
* <li>Asynchronous posting of events with automatic queueing and fallback</li>
* </ul>
*
* <p><b>Performance note:</b> Directly using {@link GlobalEventBus} is possible,
* but for safer type handling, automatic UUID management, and easier unsubscription,
* it is recommended to use {@link EventFlow} whenever possible.</p>
*
* <p>The bus maintains a fixed pool of worker threads that continuously process queued events.</p>
* GlobalEventBus backed by the LMAX Disruptor for ultra-low latency,
* high-throughput event publishing.
*/
public final class GlobalEventBus {
/** Number of worker threads, set to the number of available CPU cores. */
private static final int WORKERS = Runtime.getRuntime().availableProcessors();
/** Queue for asynchronous event processing. */
private static final BlockingQueue<EventType> EVENT_QUEUE = new LinkedBlockingQueue<>(WORKERS * 1024);
/** Map of event class to type-specific listeners. */
private static final Map<Class<?>, CopyOnWriteArrayList<Consumer<? super EventType>>> LISTENERS = new ConcurrentHashMap<>();
private static final Map<Class<?>, CopyOnWriteArrayList<Consumer<? super EventType>>> LISTENERS =
new ConcurrentHashMap<>();
/** Map of event class to UUID-specific listeners. */
private static final Map<Class<?>, ConcurrentHashMap<String, Consumer<? extends EventWithUuid>>> UUID_LISTENERS = new ConcurrentHashMap<>();
/** Map of event class to Snowflake-ID-specific listeners. */
private static final Map<Class<?>, ConcurrentHashMap<Long, Consumer<? extends EventWithSnowflake>>> UUID_LISTENERS =
new ConcurrentHashMap<>();
/** Thread pool for worker threads processing queued events. */
private static final ExecutorService WORKER_POOL = Executors.newFixedThreadPool(WORKERS, r -> {
Thread t = new Thread(r, "EventBus-Worker-" + r.hashCode());
t.setDaemon(true);
return t;
});
/** Disruptor ring buffer size (must be power of two). */
private static final int RING_BUFFER_SIZE = 1024 * 64;
/** Disruptor instance. */
private static final Disruptor<EventHolder> DISRUPTOR;
/** Ring buffer used for publishing events. */
private static final RingBuffer<EventHolder> RING_BUFFER;
// Initialize worker threads
static {
for (int i = 0; i < WORKERS; i++) {
WORKER_POOL.submit(GlobalEventBus::workerLoop);
}
ThreadFactory threadFactory = r -> {
Thread t = new Thread(r, "EventBus-Disruptor");
t.setDaemon(true);
return t;
};
DISRUPTOR = new Disruptor<>(
EventHolder::new,
RING_BUFFER_SIZE,
threadFactory,
ProducerType.MULTI,
new BusySpinWaitStrategy()
);
// Single consumer that dispatches to subscribers
DISRUPTOR.handleEventsWith((holder, seq, endOfBatch) -> {
if (holder.event != null) {
dispatchEvent(holder.event);
holder.event = null;
}
});
DISRUPTOR.start();
RING_BUFFER = DISRUPTOR.getRingBuffer();
}
/** Private constructor to prevent instantiation. */
/** Prevent instantiation. */
private GlobalEventBus() {}
/** Continuously processes events from the queue and dispatches them to listeners. */
private static void workerLoop() {
try {
while (true) {
EventType event = EVENT_QUEUE.take();
dispatchEvent(event);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
/** Wrapper used inside the ring buffer. */
private static class EventHolder {
EventType event;
}
/**
* Subscribes a type-specific listener for all events of a given class.
*
* @param eventClass the class of events to subscribe to
* @param listener the action to execute when the event is posted
* @param <T> the event type
* @return the provided listener for possible unsubscription
*/
// ------------------------------------------------------------------------
// Subscription
// ------------------------------------------------------------------------
public static <T extends EventType> Consumer<T> subscribe(Class<T> eventClass, Consumer<T> listener) {
CopyOnWriteArrayList<Consumer<? super EventType>> list =
LISTENERS.computeIfAbsent(eventClass, k -> new CopyOnWriteArrayList<>());
@@ -82,81 +78,50 @@ public final class GlobalEventBus {
return listener;
}
/**
* Subscribes a generic listener for all events (no type filtering).
*
* @param listener the action to execute on any event
* @return the provided listener for possible unsubscription
*/
public static Consumer<Object> subscribe(Consumer<Object> listener) {
LISTENERS.computeIfAbsent(Object.class, _ -> new CopyOnWriteArrayList<>())
.add(listener);
return listener;
}
/**
* Subscribes a listener for a specific {@link EventWithUuid} identified by its UUID.
*
* @param eventClass the class of the UUID event
* @param eventId the UUID of the event to listen for
* @param listener the action to execute when the event with the matching UUID is posted
* @param <T> the event type extending EventWithUuid
*/
public static <T extends EventWithUuid> void subscribeById(Class<T> eventClass, String eventId, Consumer<T> listener) {
public static <T extends EventWithSnowflake> void subscribeById(
Class<T> eventClass, long eventId, Consumer<T> listener) {
UUID_LISTENERS
.computeIfAbsent(eventClass, _ -> new ConcurrentHashMap<>())
.put(eventId, listener);
}
/**
* Unsubscribes a previously registered listener.
*
* @param listener the listener to remove
*/
public static void unsubscribe(Object listener) {
LISTENERS.values().forEach(list -> list.remove(listener));
}
/**
* Unsubscribes a UUID-specific listener.
*
* @param eventClass the class of the UUID event
* @param eventId the UUID of the listener to remove
* @param <T> the event type extending EventWithUuid
*/
public static <T extends EventWithUuid> void unsubscribeById(Class<T> eventClass, String eventId) {
Map<String, Consumer<? extends EventWithUuid>> map = UUID_LISTENERS.get(eventClass);
public static <T extends EventWithSnowflake> void unsubscribeById(Class<T> eventClass, long eventId) {
Map<Long, Consumer<? extends EventWithSnowflake>> map = UUID_LISTENERS.get(eventClass);
if (map != null) map.remove(eventId);
}
/**
* Posts an event synchronously to all subscribed listeners.
*
* @param event the event instance to post
* @param <T> the event type
*/
// ------------------------------------------------------------------------
// Posting
// ------------------------------------------------------------------------
public static <T extends EventType> void post(T event) {
dispatchEvent(event);
dispatchEvent(event); // synchronous
}
/**
* Posts an event asynchronously by adding it to the internal queue.
* If the queue is full, the event is dispatched synchronously.
*
* @param event the event instance to post
* @param <T> the event type
*/
public static <T extends EventType> void postAsync(T event) {
if (!EVENT_QUEUE.offer(event)) {
dispatchEvent(event);
long seq = RING_BUFFER.next();
try {
EventHolder holder = RING_BUFFER.get(seq);
holder.event = event;
} finally {
RING_BUFFER.publish(seq);
}
}
/** Dispatches an event to all type-specific, generic, and UUID-specific listeners. */
@SuppressWarnings("unchecked")
private static void dispatchEvent(EventType event) {
Class<?> clazz = event.getClass();
// class-specific listeners
CopyOnWriteArrayList<Consumer<? super EventType>> classListeners = LISTENERS.get(clazz);
if (classListeners != null) {
for (Consumer<? super EventType> listener : classListeners) {
@@ -164,6 +129,7 @@ public final class GlobalEventBus {
}
}
// generic listeners
CopyOnWriteArrayList<Consumer<? super EventType>> genericListeners = LISTENERS.get(Object.class);
if (genericListeners != null) {
for (Consumer<? super EventType> listener : genericListeners) {
@@ -171,31 +137,28 @@ public final class GlobalEventBus {
}
}
if (event instanceof EventWithUuid uuidEvent) {
Map<String, Consumer<? extends EventWithUuid>> map = UUID_LISTENERS.get(clazz);
// snowflake listeners
if (event instanceof EventWithSnowflake snowflakeEvent) {
Map<Long, Consumer<? extends EventWithSnowflake>> map = UUID_LISTENERS.get(clazz);
if (map != null) {
Consumer<EventWithUuid> listener = (Consumer<EventWithUuid>) map.remove(uuidEvent.eventId());
Consumer<EventWithSnowflake> listener =
(Consumer<EventWithSnowflake>) map.remove(snowflakeEvent.eventSnowflake());
if (listener != null) {
try { listener.accept(uuidEvent); } catch (Throwable ignored) {}
try { listener.accept(snowflakeEvent); } catch (Throwable ignored) {}
}
}
}
}
/**
* Shuts down the bus immediately, clearing all listeners and queued events.
* Worker threads are stopped.
*/
// ------------------------------------------------------------------------
// Lifecycle
// ------------------------------------------------------------------------
public static void shutdown() {
WORKER_POOL.shutdownNow();
DISRUPTOR.shutdown();
LISTENERS.clear();
UUID_LISTENERS.clear();
EVENT_QUEUE.clear();
}
/**
* Clears all listeners and UUID-specific subscriptions without stopping worker threads.
*/
public static void reset() {
LISTENERS.clear();
UUID_LISTENERS.clear();

View File

@@ -0,0 +1,68 @@
package org.toop.framework.eventbus;
import java.util.concurrent.atomic.AtomicLong;
public class SnowflakeGenerator {
// Epoch start (choose your custom epoch to reduce bits wasted on old time)
private static final long EPOCH = 1700000000000L; // ~2023-11-15
// Bit allocations
private static final long TIMESTAMP_BITS = 41;
private static final long MACHINE_BITS = 10;
private static final long SEQUENCE_BITS = 12;
// Max values
private static final long MAX_MACHINE_ID = (1L << MACHINE_BITS) - 1;
private static final long MAX_SEQUENCE = (1L << SEQUENCE_BITS) - 1;
// Bit shifts
private static final long MACHINE_SHIFT = SEQUENCE_BITS;
private static final long TIMESTAMP_SHIFT = SEQUENCE_BITS + MACHINE_BITS;
private final long machineId;
private final AtomicLong lastTimestamp = new AtomicLong(-1L);
private long sequence = 0L;
public SnowflakeGenerator(long machineId) {
if (machineId < 0 || machineId > MAX_MACHINE_ID) {
throw new IllegalArgumentException("Machine ID must be between 0 and " + MAX_MACHINE_ID);
}
this.machineId = machineId;
}
public synchronized long nextId() {
long currentTimestamp = timestamp();
if (currentTimestamp < lastTimestamp.get()) {
throw new IllegalStateException("Clock moved backwards. Refusing to generate id.");
}
if (currentTimestamp == lastTimestamp.get()) {
sequence = (sequence + 1) & MAX_SEQUENCE;
if (sequence == 0) {
// Sequence overflow, wait for next millisecond
currentTimestamp = waitNextMillis(currentTimestamp);
}
} else {
sequence = 0L;
}
lastTimestamp.set(currentTimestamp);
return ((currentTimestamp - EPOCH) << TIMESTAMP_SHIFT)
| (machineId << MACHINE_SHIFT)
| sequence;
}
private long waitNextMillis(long lastTimestamp) {
long ts = timestamp();
while (ts <= lastTimestamp) {
ts = timestamp();
}
return ts;
}
private long timestamp() {
return System.currentTimeMillis();
}
}

View File

@@ -2,7 +2,7 @@ package org.toop.framework.eventbus.events;
import java.util.Map;
public interface EventWithUuid extends EventType {
public interface EventWithSnowflake extends EventType {
Map<String, Object> result();
String eventId();
long eventSnowflake();
}

View File

@@ -0,0 +1,3 @@
package org.toop.framework.eventbus.events;
public interface EventWithoutSnowflake extends EventType {}

View File

@@ -1,3 +0,0 @@
package org.toop.framework.eventbus.events;
public interface EventWithoutUuid extends EventType {}

View File

@@ -2,10 +2,9 @@ package org.toop.framework.eventbus.events;
import java.lang.reflect.Constructor;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
/** Events that are used in the GlobalEventBus class. */
public class Events {
public class EventsBase {
/**
* WIP, DO NOT USE!

View File

@@ -7,7 +7,7 @@ import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.eventbus.events.NetworkEvents;
import org.toop.framework.networking.events.NetworkEvents;
public class NetworkingClientManager {
@@ -60,7 +60,7 @@ public class NetworkingClientManager {
private void handleStartClient(NetworkEvents.StartClient event) {
String uuid = this.startClientRequest(event.handlerFactory(), event.ip(), event.port());
new EventFlow().addPostEvent(NetworkEvents.StartClientSuccess.class,
uuid, event.eventId()
uuid, event.eventSnowflake()
).asyncPostEvent();
}

View File

@@ -1,5 +1,8 @@
package org.toop.framework.eventbus.events;
package org.toop.framework.networking.events;
import org.toop.framework.eventbus.events.EventWithSnowflake;
import org.toop.framework.eventbus.events.EventWithoutSnowflake;
import org.toop.framework.eventbus.events.EventsBase;
import org.toop.framework.networking.NetworkingGameClientHandler;
import java.lang.reflect.RecordComponent;
@@ -9,7 +12,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class NetworkEvents extends Events {
public class NetworkEvents extends EventsBase {
/**
* BLOCKING Requests all active connections. The result is returned via the provided
@@ -17,14 +20,14 @@ public class NetworkEvents extends Events {
*
* @param future List of all connections in string form.
*/
public record RequestsAllClients(CompletableFuture<String> future) implements EventWithoutUuid {}
public record RequestsAllClients(CompletableFuture<String> future) implements EventWithoutSnowflake {}
/** Forces closing all active connections immediately. */
public record ForceCloseAllClients() implements EventWithoutUuid {}
public record ForceCloseAllClients() implements EventWithoutSnowflake {}
public record CloseClientRequest(CompletableFuture<String> future) implements EventWithoutUuid {}
public record CloseClientRequest(CompletableFuture<String> future) implements EventWithoutSnowflake {}
public record CloseClient(String connectionId) implements EventWithoutUuid {}
public record CloseClient(String connectionId) implements EventWithoutSnowflake {}
/**
* Event to start a new client connection to a server.
@@ -40,7 +43,7 @@ public class NetworkEvents extends Events {
* </p>
*
* <p>
* The {@link #eventId()} allows callers to correlate the {@code StartClient} event
* The {@link #eventSnowflake()} allows callers to correlate the {@code StartClient} event
* with subsequent success/failure events. For example, a {@code StartClientSuccess}
* or {@code StartClientFailure} event may carry the same {@code eventId}.
* </p>
@@ -48,15 +51,15 @@ public class NetworkEvents extends Events {
* @param handlerFactory Factory for constructing a {@link NetworkingGameClientHandler}.
* @param ip The IP address of the server to connect to.
* @param port The port number of the server to connect to.
* @param eventId A unique identifier for this event, typically injected
* @param eventSnowflake A unique identifier for this event, typically injected
* automatically by the {@link org.toop.framework.eventbus.EventFlow}.
*/
public record StartClient(
Supplier<? extends NetworkingGameClientHandler> handlerFactory,
String ip,
int port,
String eventId
) implements EventWithUuid {
long eventSnowflake
) implements EventWithSnowflake {
/**
* Returns a map representation of this event, where keys are record component names
@@ -86,8 +89,8 @@ public class NetworkEvents extends Events {
* @return the event ID string
*/
@Override
public String eventId() {
return this.eventId;
public long eventSnowflake() {
return this.eventSnowflake;
}
}
@@ -101,15 +104,15 @@ public class NetworkEvents extends Events {
*/
public record StartClientRequest(
Supplier<? extends NetworkingGameClientHandler> handlerFactory,
String ip, int port, CompletableFuture<String> future) implements EventWithoutUuid {}
String ip, int port, CompletableFuture<String> future) implements EventWithoutSnowflake {}
/**
*
* @param clientId The ID of the client to be used in requests.
* @param eventId The eventID used in checking if event is for you.
* @param eventSnowflake The eventID used in checking if event is for you.
*/
public record StartClientSuccess(String clientId, String eventId)
implements EventWithUuid {
public record StartClientSuccess(String clientId, long eventSnowflake)
implements EventWithSnowflake {
@Override
public Map<String, Object> result() {
return Stream.of(this.getClass().getRecordComponents())
@@ -126,8 +129,8 @@ public class NetworkEvents extends Events {
}
@Override
public String eventId() {
return this.eventId;
public long eventSnowflake() {
return this.eventSnowflake;
}
}
@@ -137,13 +140,13 @@ public class NetworkEvents extends Events {
* @param connectionId The UUID of the connection to send the command on.
* @param args The command arguments.
*/
public record SendCommand(String connectionId, String... args) implements EventWithoutUuid {}
public record SendCommand(String connectionId, String... args) implements EventWithoutSnowflake {}
/**
* Triggers reconnecting to a previous address.
*
* @param connectionId The identifier of the connection being reconnected.
*/
public record Reconnect(Object connectionId) implements EventWithoutUuid {}
public record Reconnect(Object connectionId) implements EventWithoutSnowflake {}
/**
@@ -152,7 +155,7 @@ public class NetworkEvents extends Events {
* @param ConnectionUuid The UUID of the connection that received the message.
* @param message The message received.
*/
public record ReceivedMessage(String ConnectionUuid, String message) implements EventWithoutUuid {}
public record ReceivedMessage(String ConnectionUuid, String message) implements EventWithoutSnowflake {}
/**
* Triggers changing connection to a new address.
@@ -161,7 +164,7 @@ public class NetworkEvents extends Events {
* @param ip The new IP address.
* @param port The new port.
*/
public record ChangeClient(Object connectionId, String ip, int port) implements EventWithoutUuid {}
public record ChangeClient(Object connectionId, String ip, int port) implements EventWithoutSnowflake {}
/**
@@ -169,9 +172,9 @@ public class NetworkEvents extends Events {
*
* @param connectionId The identifier of the connection that failed.
*/
public record CouldNotConnect(Object connectionId) implements EventWithoutUuid {}
public record CouldNotConnect(Object connectionId) implements EventWithoutSnowflake {}
/** WIP Triggers when a connection closes. */
public record ClosedConnection() implements EventWithoutUuid {}
public record ClosedConnection() implements EventWithoutSnowflake {}
}