Optimization

This commit is contained in:
lieght
2025-12-07 03:10:22 +01:00
parent 5ee2f2187e
commit 484cf1dcd3
8 changed files with 109 additions and 24 deletions

View File

@@ -3,13 +3,13 @@ package org.toop.framework.eventbus;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.toop.framework.eventbus.bus.DisruptorEventBus; import org.toop.framework.eventbus.bus.DisruptorEventBus;
import org.toop.framework.eventbus.bus.EventBus; import org.toop.framework.eventbus.bus.EventBus;
import org.toop.framework.eventbus.holder.AsyncSubscriberStore; import org.toop.framework.eventbus.holder.AsyncArraySubscriberStore;
import org.toop.framework.eventbus.subscriber.Subscriber; import org.toop.framework.eventbus.subscriber.Subscriber;
public class GlobalEventBus implements EventBus { public class GlobalEventBus implements EventBus {
private static final EventBus INSTANCE = new DisruptorEventBus( private static final EventBus INSTANCE = new DisruptorEventBus(
LogManager.getLogger(DisruptorEventBus.class), LogManager.getLogger(DisruptorEventBus.class),
new AsyncSubscriberStore() new AsyncArraySubscriberStore()
); );
private GlobalEventBus() {} private GlobalEventBus() {}

View File

@@ -34,9 +34,7 @@ public class DefaultEventBus implements EventBus {
Class<T> eventType = (Class<T>) event.getClass(); Class<T> eventType = (Class<T>) event.getClass();
var subs = eventsHolder.get(eventType); var subs = eventsHolder.get(eventType);
if (subs != null) { if (subs != null) {
List<Subscriber<?, ?>> snapshot = new ArrayList<>(subs); for (Subscriber<?, ?> subscriber : subs) {
for (Subscriber<?, ?> subscriber : snapshot) {
Class<T> eventClass = (Class<T>) subscriber.getEvent(); Class<T> eventClass = (Class<T>) subscriber.getEvent();
Consumer<EventType> action = (Consumer<EventType>) subscriber.getAction(); Consumer<EventType> action = (Consumer<EventType>) subscriber.getAction();

View File

@@ -95,7 +95,7 @@ public class DisruptorEventBus implements EventBus {
} }
private <T> void dispatchEvent(T event) { private <T> void dispatchEvent(T event) {
CopyOnWriteArrayList<Subscriber<?, ?>> classListeners = (CopyOnWriteArrayList<Subscriber<?, ?>>) eventsHolder.get(event.getClass()); var classListeners = eventsHolder.get(event.getClass());
if (classListeners != null) { if (classListeners != null) {
for (Subscriber<?, ?> listener : classListeners) { for (Subscriber<?, ?> listener : classListeners) {
try { try {

View File

@@ -0,0 +1,72 @@
package org.toop.framework.eventbus.holder;
import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.concurrent.ConcurrentHashMap;
public class AsyncArraySubscriberStore implements SubscriberStore {
private static final Subscriber<?, ?>[] EMPTY = new Subscriber[0];
private final ConcurrentHashMap<Class<?>, Subscriber<?, ?>[]> listeners =
new ConcurrentHashMap<>();
@Override
public void add(Subscriber<?, ?> sub) {
listeners.compute(sub.getEvent(), (_, arr) -> {
if (arr == null || arr.length == 0) {
return new Subscriber<?, ?>[]{sub};
}
// FAST copy append
int len = arr.length;
Subscriber<?, ?>[] newArr = new Subscriber[len + 1];
System.arraycopy(arr, 0, newArr, 0, len);
newArr[len] = sub;
return newArr;
});
}
@Override
public void remove(Subscriber<?, ?> sub) {
listeners.computeIfPresent(sub.getEvent(), (_, arr) -> {
int len = arr.length;
if (len == 1) {
return arr[0].equals(sub) ? null : arr;
}
int keep = 0;
for (Subscriber<?, ?> s : arr) {
if (!s.equals(sub)) keep++;
}
if (keep == len) {
return arr;
}
if (keep == 0) {
return null;
}
Subscriber<?, ?>[] newArr = new Subscriber[keep];
int i = 0;
for (Subscriber<?, ?> s : arr) {
if (!s.equals(sub)) {
newArr[i++] = s;
}
}
return newArr;
});
}
@Override
public Subscriber<?, ?>[] get(Class<?> event) {
return listeners.getOrDefault(event, EMPTY);
}
@Override
public void reset() {
listeners.clear();
}
}

View File

@@ -2,32 +2,45 @@ package org.toop.framework.eventbus.holder;
import org.toop.framework.eventbus.subscriber.Subscriber; import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ConcurrentLinkedQueue;
public class AsyncSubscriberStore implements SubscriberStore { public class AsyncSubscriberStore implements SubscriberStore {
private final Map<Class<?>, CopyOnWriteArrayList<Subscriber<?, ?>>> LISTENERS = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Class<?>, ConcurrentLinkedQueue<Subscriber<?, ?>>> queues = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Class<?>, Subscriber<?, ?>[]> snapshots = new ConcurrentHashMap<>();
@Override @Override
public void add(Subscriber<?, ?> sub) { public void add(Subscriber<?, ?> sub) {
LISTENERS.computeIfAbsent(sub.getEvent(), _ -> new CopyOnWriteArrayList<>()).add(sub); queues.computeIfAbsent(sub.getEvent(), _ -> new ConcurrentLinkedQueue<>()).add(sub);
rebuildSnapshot(sub.getEvent());
} }
@Override @Override
public void remove(Subscriber<?, ?> sub) { public void remove(Subscriber<?, ?> sub) {
LISTENERS.getOrDefault(sub.getEvent(), new CopyOnWriteArrayList<>()).remove(sub); ConcurrentLinkedQueue<Subscriber<?, ?>> queue = queues.get(sub.getEvent());
LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty()); if (queue != null) {
queue.remove(sub);
rebuildSnapshot(sub.getEvent());
}
} }
@Override @Override
public List<Subscriber<?, ?>> get(Class<?> event) { public Subscriber<?, ?>[] get(Class<?> event) {
return LISTENERS.get(event); return snapshots.getOrDefault(event, new Subscriber[0]);
} }
@Override @Override
public void reset() { public void reset() {
LISTENERS.clear(); queues.clear();
snapshots.clear();
}
private void rebuildSnapshot(Class<?> event) {
ConcurrentLinkedQueue<Subscriber<?, ?>> queue = queues.get(event);
if (queue != null) {
snapshots.put(event, queue.toArray(new Subscriber[0]));
} else {
snapshots.put(event, new Subscriber[0]);
}
} }
} }

View File

@@ -2,11 +2,9 @@ package org.toop.framework.eventbus.holder;
import org.toop.framework.eventbus.subscriber.Subscriber; import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.List;
public interface SubscriberStore { public interface SubscriberStore {
void add(Subscriber<?, ?> subscriber); void add(Subscriber<?, ?> subscriber);
void remove(Subscriber<?, ?> subscriber); void remove(Subscriber<?, ?> subscriber);
List<Subscriber<?, ?>> get(Class<?> event); Subscriber<?, ?>[] get(Class<?> event);
void reset(); void reset();
} }

View File

@@ -9,6 +9,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class SyncSubscriberStore implements SubscriberStore { public class SyncSubscriberStore implements SubscriberStore {
private final Map<Class<?>, List<Subscriber<?, ?>>> LISTENERS = new ConcurrentHashMap<>(); private final Map<Class<?>, List<Subscriber<?, ?>>> LISTENERS = new ConcurrentHashMap<>();
private static final Subscriber<?, ?>[] EMPTY = new Subscriber[0];
@Override @Override
public void add(Subscriber<?, ?> sub) { public void add(Subscriber<?, ?> sub) {
@@ -22,8 +23,10 @@ public class SyncSubscriberStore implements SubscriberStore {
} }
@Override @Override
public List<Subscriber<?, ?>> get(Class<?> event) { public Subscriber<?, ?>[] get(Class<?> event) {
return LISTENERS.get(event); List<Subscriber<?, ?>> list = LISTENERS.get(event);
if (list == null || list.isEmpty()) return EMPTY;
return list.toArray(EMPTY);
} }
@Override @Override

View File

@@ -3,6 +3,7 @@ package org.toop.framework.audio;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.toop.framework.dispatch.interfaces.Dispatcher; import org.toop.framework.dispatch.interfaces.Dispatcher;
import org.toop.framework.eventbus.GlobalEventBus;
import org.toop.framework.resource.resources.BaseResource; import org.toop.framework.resource.resources.BaseResource;
import org.toop.framework.resource.types.AudioResource; import org.toop.framework.resource.types.AudioResource;
@@ -94,7 +95,7 @@ public class MusicManagerTest {
List<MockAudioResource> resources = List.of(track1, track2, track3); List<MockAudioResource> resources = List.of(track1, track2, track3);
manager = new MusicManager<>(resources, dispatcher); manager = new MusicManager<>(GlobalEventBus.get(), resources, dispatcher);
} }
@Test @Test
@@ -188,7 +189,7 @@ public class MusicManagerTest {
manyTracks.add(new MockAudioResource("track" + i)); manyTracks.add(new MockAudioResource("track" + i));
} }
MusicManager<MockAudioResource> multiManager = new MusicManager<>(manyTracks, dispatcher); MusicManager<MockAudioResource> multiManager = new MusicManager<>(GlobalEventBus.get(), manyTracks, dispatcher);
for (int i = 0; i < manyTracks.size() - 1; i++) { for (int i = 0; i < manyTracks.size() - 1; i++) {
multiManager.play(); multiManager.play();