From 484cf1dcd36bc013e0f401392ada93d97eb9aaca Mon Sep 17 00:00:00 2001 From: lieght <49651652+BAFGdeJong@users.noreply.github.com> Date: Sun, 7 Dec 2025 03:10:22 +0100 Subject: [PATCH] Optimization --- .../framework/eventbus/GlobalEventBus.java | 4 +- .../eventbus/bus/DefaultEventBus.java | 4 +- .../eventbus/bus/DisruptorEventBus.java | 2 +- .../holder/AsyncArraySubscriberStore.java | 72 +++++++++++++++++++ .../eventbus/holder/AsyncSubscriberStore.java | 35 ++++++--- .../eventbus/holder/SubscriberStore.java | 4 +- .../eventbus/holder/SyncSubscriberStore.java | 7 +- .../framework/audio/MusicManagerTest.java | 5 +- 8 files changed, 109 insertions(+), 24 deletions(-) create mode 100644 framework/src/main/java/org/toop/framework/eventbus/holder/AsyncArraySubscriberStore.java diff --git a/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java index c2abf22..df7ef0a 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java +++ b/framework/src/main/java/org/toop/framework/eventbus/GlobalEventBus.java @@ -3,13 +3,13 @@ package org.toop.framework.eventbus; import org.apache.logging.log4j.LogManager; import org.toop.framework.eventbus.bus.DisruptorEventBus; import org.toop.framework.eventbus.bus.EventBus; -import org.toop.framework.eventbus.holder.AsyncSubscriberStore; +import org.toop.framework.eventbus.holder.AsyncArraySubscriberStore; import org.toop.framework.eventbus.subscriber.Subscriber; public class GlobalEventBus implements EventBus { private static final EventBus INSTANCE = new DisruptorEventBus( LogManager.getLogger(DisruptorEventBus.class), - new AsyncSubscriberStore() + new AsyncArraySubscriberStore() ); private GlobalEventBus() {} diff --git a/framework/src/main/java/org/toop/framework/eventbus/bus/DefaultEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/bus/DefaultEventBus.java index b7c0322..5da6616 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/bus/DefaultEventBus.java +++ b/framework/src/main/java/org/toop/framework/eventbus/bus/DefaultEventBus.java @@ -34,9 +34,7 @@ public class DefaultEventBus implements EventBus { Class eventType = (Class) event.getClass(); var subs = eventsHolder.get(eventType); if (subs != null) { - List> snapshot = new ArrayList<>(subs); - - for (Subscriber subscriber : snapshot) { + for (Subscriber subscriber : subs) { Class eventClass = (Class) subscriber.getEvent(); Consumer action = (Consumer) subscriber.getAction(); diff --git a/framework/src/main/java/org/toop/framework/eventbus/bus/DisruptorEventBus.java b/framework/src/main/java/org/toop/framework/eventbus/bus/DisruptorEventBus.java index 61dec65..f6aefcb 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/bus/DisruptorEventBus.java +++ b/framework/src/main/java/org/toop/framework/eventbus/bus/DisruptorEventBus.java @@ -95,7 +95,7 @@ public class DisruptorEventBus implements EventBus { } private void dispatchEvent(T event) { - CopyOnWriteArrayList> classListeners = (CopyOnWriteArrayList>) eventsHolder.get(event.getClass()); + var classListeners = eventsHolder.get(event.getClass()); if (classListeners != null) { for (Subscriber listener : classListeners) { try { diff --git a/framework/src/main/java/org/toop/framework/eventbus/holder/AsyncArraySubscriberStore.java b/framework/src/main/java/org/toop/framework/eventbus/holder/AsyncArraySubscriberStore.java new file mode 100644 index 0000000..2bc4c76 --- /dev/null +++ b/framework/src/main/java/org/toop/framework/eventbus/holder/AsyncArraySubscriberStore.java @@ -0,0 +1,72 @@ +package org.toop.framework.eventbus.holder; + +import org.toop.framework.eventbus.subscriber.Subscriber; + +import java.util.concurrent.ConcurrentHashMap; + +public class AsyncArraySubscriberStore implements SubscriberStore { + + private static final Subscriber[] EMPTY = new Subscriber[0]; + + private final ConcurrentHashMap, Subscriber[]> listeners = + new ConcurrentHashMap<>(); + + @Override + public void add(Subscriber sub) { + listeners.compute(sub.getEvent(), (_, arr) -> { + if (arr == null || arr.length == 0) { + return new Subscriber[]{sub}; + } + + // FAST copy append + int len = arr.length; + Subscriber[] newArr = new Subscriber[len + 1]; + System.arraycopy(arr, 0, newArr, 0, len); + newArr[len] = sub; + return newArr; + }); + } + + @Override + public void remove(Subscriber sub) { + listeners.computeIfPresent(sub.getEvent(), (_, arr) -> { + int len = arr.length; + + if (len == 1) { + return arr[0].equals(sub) ? null : arr; + } + + int keep = 0; + for (Subscriber s : arr) { + if (!s.equals(sub)) keep++; + } + + if (keep == len) { + return arr; + } + if (keep == 0) { + return null; + } + + Subscriber[] newArr = new Subscriber[keep]; + int i = 0; + for (Subscriber s : arr) { + if (!s.equals(sub)) { + newArr[i++] = s; + } + } + + return newArr; + }); + } + + @Override + public Subscriber[] get(Class event) { + return listeners.getOrDefault(event, EMPTY); + } + + @Override + public void reset() { + listeners.clear(); + } +} \ No newline at end of file diff --git a/framework/src/main/java/org/toop/framework/eventbus/holder/AsyncSubscriberStore.java b/framework/src/main/java/org/toop/framework/eventbus/holder/AsyncSubscriberStore.java index 333ec6b..1cebff7 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/holder/AsyncSubscriberStore.java +++ b/framework/src/main/java/org/toop/framework/eventbus/holder/AsyncSubscriberStore.java @@ -2,32 +2,45 @@ package org.toop.framework.eventbus.holder; import org.toop.framework.eventbus.subscriber.Subscriber; -import java.util.List; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ConcurrentLinkedQueue; public class AsyncSubscriberStore implements SubscriberStore { - private final Map, CopyOnWriteArrayList>> LISTENERS = new ConcurrentHashMap<>(); + private final ConcurrentHashMap, ConcurrentLinkedQueue>> queues = new ConcurrentHashMap<>(); + private final ConcurrentHashMap, Subscriber[]> snapshots = new ConcurrentHashMap<>(); @Override public void add(Subscriber sub) { - LISTENERS.computeIfAbsent(sub.getEvent(), _ -> new CopyOnWriteArrayList<>()).add(sub); + queues.computeIfAbsent(sub.getEvent(), _ -> new ConcurrentLinkedQueue<>()).add(sub); + rebuildSnapshot(sub.getEvent()); } @Override public void remove(Subscriber sub) { - LISTENERS.getOrDefault(sub.getEvent(), new CopyOnWriteArrayList<>()).remove(sub); - LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty()); + ConcurrentLinkedQueue> queue = queues.get(sub.getEvent()); + if (queue != null) { + queue.remove(sub); + rebuildSnapshot(sub.getEvent()); + } } @Override - public List> get(Class event) { - return LISTENERS.get(event); + public Subscriber[] get(Class event) { + return snapshots.getOrDefault(event, new Subscriber[0]); } @Override public void reset() { - LISTENERS.clear(); + queues.clear(); + snapshots.clear(); } -} + + private void rebuildSnapshot(Class event) { + ConcurrentLinkedQueue> queue = queues.get(event); + if (queue != null) { + snapshots.put(event, queue.toArray(new Subscriber[0])); + } else { + snapshots.put(event, new Subscriber[0]); + } + } +} \ No newline at end of file diff --git a/framework/src/main/java/org/toop/framework/eventbus/holder/SubscriberStore.java b/framework/src/main/java/org/toop/framework/eventbus/holder/SubscriberStore.java index babc1f9..57e8669 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/holder/SubscriberStore.java +++ b/framework/src/main/java/org/toop/framework/eventbus/holder/SubscriberStore.java @@ -2,11 +2,9 @@ package org.toop.framework.eventbus.holder; import org.toop.framework.eventbus.subscriber.Subscriber; -import java.util.List; - public interface SubscriberStore { void add(Subscriber subscriber); void remove(Subscriber subscriber); - List> get(Class event); + Subscriber[] get(Class event); void reset(); } diff --git a/framework/src/main/java/org/toop/framework/eventbus/holder/SyncSubscriberStore.java b/framework/src/main/java/org/toop/framework/eventbus/holder/SyncSubscriberStore.java index fd21221..cb8397d 100644 --- a/framework/src/main/java/org/toop/framework/eventbus/holder/SyncSubscriberStore.java +++ b/framework/src/main/java/org/toop/framework/eventbus/holder/SyncSubscriberStore.java @@ -9,6 +9,7 @@ import java.util.concurrent.ConcurrentHashMap; public class SyncSubscriberStore implements SubscriberStore { private final Map, List>> LISTENERS = new ConcurrentHashMap<>(); + private static final Subscriber[] EMPTY = new Subscriber[0]; @Override public void add(Subscriber sub) { @@ -22,8 +23,10 @@ public class SyncSubscriberStore implements SubscriberStore { } @Override - public List> get(Class event) { - return LISTENERS.get(event); + public Subscriber[] get(Class event) { + List> list = LISTENERS.get(event); + if (list == null || list.isEmpty()) return EMPTY; + return list.toArray(EMPTY); } @Override diff --git a/framework/src/test/java/org/toop/framework/audio/MusicManagerTest.java b/framework/src/test/java/org/toop/framework/audio/MusicManagerTest.java index f602c72..d009d7e 100644 --- a/framework/src/test/java/org/toop/framework/audio/MusicManagerTest.java +++ b/framework/src/test/java/org/toop/framework/audio/MusicManagerTest.java @@ -3,6 +3,7 @@ package org.toop.framework.audio; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.toop.framework.dispatch.interfaces.Dispatcher; +import org.toop.framework.eventbus.GlobalEventBus; import org.toop.framework.resource.resources.BaseResource; import org.toop.framework.resource.types.AudioResource; @@ -94,7 +95,7 @@ public class MusicManagerTest { List resources = List.of(track1, track2, track3); - manager = new MusicManager<>(resources, dispatcher); + manager = new MusicManager<>(GlobalEventBus.get(), resources, dispatcher); } @Test @@ -188,7 +189,7 @@ public class MusicManagerTest { manyTracks.add(new MockAudioResource("track" + i)); } - MusicManager multiManager = new MusicManager<>(manyTracks, dispatcher); + MusicManager multiManager = new MusicManager<>(GlobalEventBus.get(), manyTracks, dispatcher); for (int i = 0; i < manyTracks.size() - 1; i++) { multiManager.play();