7 Commits

Author SHA1 Message Date
lieght
b2e4e7d810 Removed unnecessary imports 2025-12-07 03:36:26 +01:00
lieght
4b99cfe6af Removed useless comment 2025-12-07 03:10:53 +01:00
lieght
484cf1dcd3 Optimization 2025-12-07 03:10:22 +01:00
lieght
5ee2f2187e Renaming 2025-12-07 02:47:15 +01:00
lieght
854744cefa Removed old code 2025-12-07 02:41:14 +01:00
lieght
f3e2de2c8f Removed get 2025-12-07 02:38:02 +01:00
lieght
b6f1b046b1 Fixed wrong eventbus 2025-12-07 02:20:12 +01:00
10 changed files with 145 additions and 78 deletions

View File

@@ -77,7 +77,7 @@ public class AudioEventListener<T extends AudioResource, K extends AudioResource
}
private void handleGetVolume(AudioEvents.GetVolume event) {
GlobalEventBus.get().post(new AudioEvents.GetVolumeResponse(
eventBus.post(new AudioEvents.GetVolumeResponse(
audioVolumeManager.getVolume(event.controlType()),
event.identifier()));
}

View File

@@ -1,16 +1,15 @@
package org.toop.framework.eventbus;
import org.apache.logging.log4j.LogManager;
import org.toop.framework.eventbus.bus.DefaultEventBus;
import org.toop.framework.eventbus.bus.DisruptorEventBus;
import org.toop.framework.eventbus.bus.EventBus;
import org.toop.framework.eventbus.holder.SyncEventsHolder;
import org.toop.framework.eventbus.holder.AsyncArraySubscriberStore;
import org.toop.framework.eventbus.subscriber.Subscriber;
public class GlobalEventBus implements EventBus {
private static final EventBus INSTANCE = new DefaultEventBus(
private static final EventBus INSTANCE = new DisruptorEventBus(
LogManager.getLogger(DisruptorEventBus.class),
new SyncEventsHolder()
new AsyncArraySubscriberStore()
);
private GlobalEventBus() {}
@@ -21,26 +20,26 @@ public class GlobalEventBus implements EventBus {
@Override
public void subscribe(Subscriber<?, ?> listener) {
get().subscribe(listener);
INSTANCE.subscribe(listener);
}
@Override
public void unsubscribe(Subscriber<?, ?> listener) {
get().unsubscribe(listener);
INSTANCE.unsubscribe(listener);
}
@Override
public <T> void post(T event) {
get().post(event);
INSTANCE.post(event);
}
@Override
public void shutdown() {
get().shutdown();
INSTANCE.shutdown();
}
@Override
public void reset() {
get().reset();
INSTANCE.reset();
}
}

View File

@@ -1,20 +1,17 @@
package org.toop.framework.eventbus.bus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.holder.EventsHolder;
import org.toop.framework.eventbus.holder.SubscriberStore;
import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
public class DefaultEventBus implements EventBus {
private final Logger logger;
private final EventsHolder eventsHolder;
private final SubscriberStore eventsHolder;
public DefaultEventBus(Logger logger, EventsHolder eventsHolder) {
public DefaultEventBus(Logger logger, SubscriberStore eventsHolder) {
this.logger = logger;
this.eventsHolder = eventsHolder;
}
@@ -35,9 +32,7 @@ public class DefaultEventBus implements EventBus {
Class<T> eventType = (Class<T>) event.getClass();
var subs = eventsHolder.get(eventType);
if (subs != null) {
List<Subscriber<?, ?>> snapshot = new ArrayList<>(subs);
for (Subscriber<?, ?> subscriber : snapshot) {
for (Subscriber<?, ?> subscriber : subs) {
Class<T> eventClass = (Class<T>) subscriber.getEvent();
Consumer<EventType> action = (Consumer<EventType>) subscriber.getAction();

View File

@@ -7,9 +7,8 @@ import com.lmax.disruptor.dsl.ProducerType;
import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.subscriber.Subscriber;
import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.holder.EventsHolder;
import org.toop.framework.eventbus.holder.SubscriberStore;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
@@ -20,12 +19,12 @@ public class DisruptorEventBus implements EventBus {
}
private final Logger logger;
private final EventsHolder eventsHolder;
private final SubscriberStore eventsHolder;
private final Disruptor<EventHolder<?>> disruptor;
private final RingBuffer<EventHolder<?>> ringBuffer;
public DisruptorEventBus(Logger logger, EventsHolder eventsHolder) {
public DisruptorEventBus(Logger logger, SubscriberStore eventsHolder) {
this.logger = logger;
this.eventsHolder = eventsHolder;
@@ -95,7 +94,7 @@ public class DisruptorEventBus implements EventBus {
}
private <T> void dispatchEvent(T event) {
CopyOnWriteArrayList<Subscriber<?, ?>> classListeners = (CopyOnWriteArrayList<Subscriber<?, ?>>) eventsHolder.get(event.getClass());
var classListeners = eventsHolder.get(event.getClass());
if (classListeners != null) {
for (Subscriber<?, ?> listener : classListeners) {
try {
@@ -105,18 +104,6 @@ public class DisruptorEventBus implements EventBus {
}
}
}
// TODO, Still needed?
CopyOnWriteArrayList<Subscriber<?, ?>> genericListeners = (CopyOnWriteArrayList<Subscriber<?, ?>>) eventsHolder.get(Object.class);
if (genericListeners != null) {
for (Subscriber<?, ?> listener : genericListeners) {
try {
callListener(listener, event);
} catch (Throwable e) {
logger.warn("Exception while handling event: {}", event, e);
}
}
}
}

View File

@@ -0,0 +1,71 @@
package org.toop.framework.eventbus.holder;
import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.concurrent.ConcurrentHashMap;
public class AsyncArraySubscriberStore implements SubscriberStore {
private static final Subscriber<?, ?>[] EMPTY = new Subscriber[0];
private final ConcurrentHashMap<Class<?>, Subscriber<?, ?>[]> listeners =
new ConcurrentHashMap<>();
@Override
public void add(Subscriber<?, ?> sub) {
listeners.compute(sub.getEvent(), (_, arr) -> {
if (arr == null || arr.length == 0) {
return new Subscriber<?, ?>[]{sub};
}
int len = arr.length;
Subscriber<?, ?>[] newArr = new Subscriber[len + 1];
System.arraycopy(arr, 0, newArr, 0, len);
newArr[len] = sub;
return newArr;
});
}
@Override
public void remove(Subscriber<?, ?> sub) {
listeners.computeIfPresent(sub.getEvent(), (_, arr) -> {
int len = arr.length;
if (len == 1) {
return arr[0].equals(sub) ? null : arr;
}
int keep = 0;
for (Subscriber<?, ?> s : arr) {
if (!s.equals(sub)) keep++;
}
if (keep == len) {
return arr;
}
if (keep == 0) {
return null;
}
Subscriber<?, ?>[] newArr = new Subscriber[keep];
int i = 0;
for (Subscriber<?, ?> s : arr) {
if (!s.equals(sub)) {
newArr[i++] = s;
}
}
return newArr;
});
}
@Override
public Subscriber<?, ?>[] get(Class<?> event) {
return listeners.getOrDefault(event, EMPTY);
}
@Override
public void reset() {
listeners.clear();
}
}

View File

@@ -1,33 +0,0 @@
package org.toop.framework.eventbus.holder;
import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
public class AsyncEventsHolder implements EventsHolder {
private final Map<Class<?>, CopyOnWriteArrayList<Subscriber<?, ?>>> LISTENERS = new ConcurrentHashMap<>();
@Override
public void add(Subscriber<?, ?> sub) {
LISTENERS.computeIfAbsent(sub.getEvent(), _ -> new CopyOnWriteArrayList<>()).add(sub);
}
@Override
public void remove(Subscriber<?, ?> sub) {
LISTENERS.getOrDefault(sub.getEvent(), new CopyOnWriteArrayList<>()).remove(sub);
LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}
@Override
public List<Subscriber<?, ?>> get(Class<?> event) {
return LISTENERS.get(event);
}
@Override
public void reset() {
LISTENERS.clear();
}
}

View File

@@ -0,0 +1,46 @@
package org.toop.framework.eventbus.holder;
import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class AsyncSubscriberStore implements SubscriberStore {
private final ConcurrentHashMap<Class<?>, ConcurrentLinkedQueue<Subscriber<?, ?>>> queues = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Class<?>, Subscriber<?, ?>[]> snapshots = new ConcurrentHashMap<>();
@Override
public void add(Subscriber<?, ?> sub) {
queues.computeIfAbsent(sub.getEvent(), _ -> new ConcurrentLinkedQueue<>()).add(sub);
rebuildSnapshot(sub.getEvent());
}
@Override
public void remove(Subscriber<?, ?> sub) {
ConcurrentLinkedQueue<Subscriber<?, ?>> queue = queues.get(sub.getEvent());
if (queue != null) {
queue.remove(sub);
rebuildSnapshot(sub.getEvent());
}
}
@Override
public Subscriber<?, ?>[] get(Class<?> event) {
return snapshots.getOrDefault(event, new Subscriber[0]);
}
@Override
public void reset() {
queues.clear();
snapshots.clear();
}
private void rebuildSnapshot(Class<?> event) {
ConcurrentLinkedQueue<Subscriber<?, ?>> queue = queues.get(event);
if (queue != null) {
snapshots.put(event, queue.toArray(new Subscriber[0]));
} else {
snapshots.put(event, new Subscriber[0]);
}
}
}

View File

@@ -2,11 +2,9 @@ package org.toop.framework.eventbus.holder;
import org.toop.framework.eventbus.subscriber.Subscriber;
import java.util.List;
public interface EventsHolder {
public interface SubscriberStore {
void add(Subscriber<?, ?> subscriber);
void remove(Subscriber<?, ?> subscriber);
List<Subscriber<?, ?>> get(Class<?> event);
Subscriber<?, ?>[] get(Class<?> event);
void reset();
}

View File

@@ -7,8 +7,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SyncEventsHolder implements EventsHolder {
public class SyncSubscriberStore implements SubscriberStore {
private final Map<Class<?>, List<Subscriber<?, ?>>> LISTENERS = new ConcurrentHashMap<>();
private static final Subscriber<?, ?>[] EMPTY = new Subscriber[0];
@Override
public void add(Subscriber<?, ?> sub) {
@@ -22,8 +23,10 @@ public class SyncEventsHolder implements EventsHolder {
}
@Override
public List<Subscriber<?, ?>> get(Class<?> event) {
return LISTENERS.get(event);
public Subscriber<?, ?>[] get(Class<?> event) {
List<Subscriber<?, ?>> list = LISTENERS.get(event);
if (list == null || list.isEmpty()) return EMPTY;
return list.toArray(EMPTY);
}
@Override

View File

@@ -3,6 +3,7 @@ package org.toop.framework.audio;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.toop.framework.dispatch.interfaces.Dispatcher;
import org.toop.framework.eventbus.GlobalEventBus;
import org.toop.framework.resource.resources.BaseResource;
import org.toop.framework.resource.types.AudioResource;
@@ -94,7 +95,7 @@ public class MusicManagerTest {
List<MockAudioResource> resources = List.of(track1, track2, track3);
manager = new MusicManager<>(resources, dispatcher);
manager = new MusicManager<>(GlobalEventBus.get(), resources, dispatcher);
}
@Test
@@ -188,7 +189,7 @@ public class MusicManagerTest {
manyTracks.add(new MockAudioResource("track" + i));
}
MusicManager<MockAudioResource> multiManager = new MusicManager<>(manyTracks, dispatcher);
MusicManager<MockAudioResource> multiManager = new MusicManager<>(GlobalEventBus.get(), manyTracks, dispatcher);
for (int i = 0; i < manyTracks.size() - 1; i++) {
multiManager.play();