mirror of
https://github.com/2OOP/pism.git
synced 2026-02-04 10:54:51 +00:00
Merge branch 'Development' into ReversiML
# Conflicts: # app/src/main/java/org/toop/Main.java # app/src/main/java/org/toop/app/game/ReversiGame.java # game/pom.xml # game/src/main/java/org/toop/game/reversi/Reversi.java # game/src/main/java/org/toop/game/reversi/ReversiAI.java # game/src/test/java/org/toop/game/tictactoe/ReversiTest.java
This commit is contained in:
@@ -5,37 +5,39 @@ import org.toop.framework.audio.interfaces.MusicManager;
|
||||
import org.toop.framework.audio.interfaces.SoundEffectManager;
|
||||
import org.toop.framework.audio.interfaces.VolumeManager;
|
||||
import org.toop.framework.eventbus.EventFlow;
|
||||
import org.toop.framework.eventbus.GlobalEventBus;
|
||||
import org.toop.framework.eventbus.bus.EventBus;
|
||||
import org.toop.framework.resource.types.AudioResource;
|
||||
|
||||
public class AudioEventListener<T extends AudioResource, K extends AudioResource> {
|
||||
private final EventBus eventBus;
|
||||
private final MusicManager<T> musicManager;
|
||||
private final SoundEffectManager<K> soundEffectManager;
|
||||
private final VolumeManager audioVolumeManager;
|
||||
|
||||
public AudioEventListener(
|
||||
EventBus eventBus,
|
||||
MusicManager<T> musicManager,
|
||||
SoundEffectManager<K> soundEffectManager,
|
||||
VolumeManager audioVolumeManager
|
||||
) {
|
||||
this.eventBus = eventBus;
|
||||
this.musicManager = musicManager;
|
||||
this.soundEffectManager = soundEffectManager;
|
||||
this.audioVolumeManager = audioVolumeManager;
|
||||
}
|
||||
|
||||
public AudioEventListener<?, ?> initListeners(String buttonSoundToPlay) {
|
||||
new EventFlow()
|
||||
.listen(this::handleStopMusicManager)
|
||||
.listen(this::handlePlaySound)
|
||||
.listen(this::handleSkipSong)
|
||||
.listen(this::handlePauseSong)
|
||||
.listen(this::handlePreviousSong)
|
||||
.listen(this::handleStopSound)
|
||||
.listen(this::handleMusicStart)
|
||||
.listen(this::handleVolumeChange)
|
||||
.listen(this::handleGetVolume)
|
||||
.listen(AudioEvents.ClickButton.class, _ ->
|
||||
soundEffectManager.play(buttonSoundToPlay, false));
|
||||
new EventFlow(eventBus)
|
||||
.listen(AudioEvents.StopAudioManager.class, this::handleStopMusicManager, false)
|
||||
.listen(AudioEvents.PlayEffect.class, this::handlePlaySound, false)
|
||||
.listen(AudioEvents.SkipMusic.class, this::handleSkipSong, false)
|
||||
.listen(AudioEvents.PauseMusic.class, this::handlePauseSong, false)
|
||||
.listen(AudioEvents.PreviousMusic.class, this::handlePreviousSong, false)
|
||||
.listen(AudioEvents.StopEffect.class, this::handleStopSound, false)
|
||||
.listen(AudioEvents.StartBackgroundMusic.class, this::handleMusicStart, false)
|
||||
.listen(AudioEvents.ChangeVolume.class, this::handleVolumeChange, false)
|
||||
.listen(AudioEvents.GetVolume.class, this::handleGetVolume,false)
|
||||
.listen(AudioEvents.ClickButton.class, _ -> soundEffectManager.play(buttonSoundToPlay, false), false);
|
||||
|
||||
return this;
|
||||
}
|
||||
@@ -74,7 +76,7 @@ public class AudioEventListener<T extends AudioResource, K extends AudioResource
|
||||
}
|
||||
|
||||
private void handleGetVolume(AudioEvents.GetVolume event) {
|
||||
GlobalEventBus.postAsync(new AudioEvents.GetVolumeResponse(
|
||||
eventBus.post(new AudioEvents.GetVolumeResponse(
|
||||
audioVolumeManager.getVolume(event.controlType()),
|
||||
event.identifier()));
|
||||
}
|
||||
|
||||
@@ -6,8 +6,7 @@ import org.toop.framework.audio.events.AudioEvents;
|
||||
import org.toop.framework.dispatch.interfaces.Dispatcher;
|
||||
import org.toop.framework.dispatch.JavaFXDispatcher;
|
||||
import org.toop.annotations.TestsOnly;
|
||||
import org.toop.framework.eventbus.EventFlow;
|
||||
import org.toop.framework.eventbus.GlobalEventBus;
|
||||
import org.toop.framework.eventbus.bus.EventBus;
|
||||
import org.toop.framework.resource.types.AudioResource;
|
||||
|
||||
import java.util.*;
|
||||
@@ -18,6 +17,7 @@ import java.util.concurrent.TimeUnit;
|
||||
public class MusicManager<T extends AudioResource> implements org.toop.framework.audio.interfaces.MusicManager<T> {
|
||||
private static final Logger logger = LogManager.getLogger(MusicManager.class);
|
||||
|
||||
private final EventBus eventBus;
|
||||
private final List<T> backgroundMusic = new ArrayList<>();
|
||||
private final Dispatcher dispatcher;
|
||||
private final List<T> resources;
|
||||
@@ -27,7 +27,8 @@ public class MusicManager<T extends AudioResource> implements org.toop.framework
|
||||
private ScheduledExecutorService scheduler;
|
||||
|
||||
|
||||
public MusicManager(List<T> resources, boolean shuffleMusic) {
|
||||
public MusicManager(EventBus eventbus, List<T> resources, boolean shuffleMusic) {
|
||||
this.eventBus = eventbus;
|
||||
this.dispatcher = new JavaFXDispatcher();
|
||||
this.resources = resources;
|
||||
// Shuffle if wanting to shuffle
|
||||
@@ -40,7 +41,8 @@ public class MusicManager<T extends AudioResource> implements org.toop.framework
|
||||
* {@code @TestsOnly} DO NOT USE
|
||||
*/
|
||||
@TestsOnly
|
||||
public MusicManager(List<T> resources, Dispatcher dispatcher) {
|
||||
public MusicManager(EventBus eventBus, List<T> resources, Dispatcher dispatcher) {
|
||||
this.eventBus = eventBus;
|
||||
this.dispatcher = dispatcher;
|
||||
this.resources = new ArrayList<>(resources);
|
||||
backgroundMusic.addAll(resources);
|
||||
@@ -124,7 +126,7 @@ public class MusicManager<T extends AudioResource> implements org.toop.framework
|
||||
Runnable currentMusicTask = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
GlobalEventBus.post(new AudioEvents.PlayingMusic(track.getName(), track.currentPosition(), track.duration()));
|
||||
eventBus.post(new AudioEvents.PlayingMusic(track.getName(), track.currentPosition(), track.duration()));
|
||||
scheduler.schedule(this, 1, TimeUnit.SECONDS);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -2,18 +2,10 @@ package org.toop.framework.audio;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.toop.framework.resource.ResourceManager;
|
||||
import org.toop.framework.resource.ResourceMeta;
|
||||
import org.toop.framework.resource.resources.BaseResource;
|
||||
import org.toop.framework.resource.resources.MusicAsset;
|
||||
import org.toop.framework.resource.resources.SoundEffectAsset;
|
||||
import org.toop.framework.resource.types.AudioResource;
|
||||
|
||||
import javax.sound.sampled.Clip;
|
||||
import javax.sound.sampled.LineEvent;
|
||||
import javax.sound.sampled.LineUnavailableException;
|
||||
import javax.sound.sampled.UnsupportedAudioFileException;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
@@ -2,8 +2,6 @@ package org.toop.framework.audio.events;
|
||||
|
||||
import org.toop.framework.audio.VolumeControl;
|
||||
import org.toop.framework.eventbus.events.*;
|
||||
import org.toop.framework.eventbus.events.ResponseToUniqueEvent;
|
||||
import org.toop.framework.eventbus.events.UniqueEvent;
|
||||
|
||||
public class AudioEvents extends EventsBase {
|
||||
/** Stops the audio manager. */
|
||||
|
||||
@@ -1,12 +1,7 @@
|
||||
package org.toop.framework.audio.interfaces;
|
||||
|
||||
import org.toop.framework.resource.resources.SoundEffectAsset;
|
||||
import org.toop.framework.resource.types.AudioResource;
|
||||
|
||||
import javax.sound.sampled.LineUnavailableException;
|
||||
import javax.sound.sampled.UnsupportedAudioFileException;
|
||||
import java.io.IOException;
|
||||
|
||||
public interface SoundEffectManager<T extends AudioResource> extends AudioManager<T> {
|
||||
void play(String name, boolean loop);
|
||||
void stop(String name);
|
||||
|
||||
@@ -13,10 +13,13 @@ import org.toop.framework.SnowflakeGenerator;
|
||||
import org.toop.framework.eventbus.events.EventType;
|
||||
import org.toop.framework.eventbus.events.ResponseToUniqueEvent;
|
||||
import org.toop.framework.eventbus.events.UniqueEvent;
|
||||
import org.toop.framework.eventbus.bus.EventBus;
|
||||
import org.toop.framework.eventbus.subscriber.DefaultSubscriber;
|
||||
import org.toop.framework.eventbus.subscriber.Subscriber;
|
||||
|
||||
/**
|
||||
* EventFlow is a utility class for creating, posting, and optionally subscribing to events in a
|
||||
* type-safe and chainable manner. It is designed to work with the {@link GlobalEventBus}.
|
||||
* type-safe and chainable manner. It is designed to work with the {@link EventBus}.
|
||||
*
|
||||
* <p>This class supports automatic UUID assignment for {@link UniqueEvent} events, and
|
||||
* allows filtering subscribers so they only respond to events with a specific UUID. All
|
||||
@@ -31,6 +34,8 @@ public class EventFlow {
|
||||
/** Cache of constructor handles for event classes to avoid repeated reflection lookups. */
|
||||
private static final Map<Class<?>, MethodHandle> CONSTRUCTOR_CACHE = new ConcurrentHashMap<>();
|
||||
|
||||
private final EventBus eventBus;
|
||||
|
||||
/** Automatically assigned UUID for {@link UniqueEvent} events. */
|
||||
private long eventSnowflake = -1;
|
||||
|
||||
@@ -38,24 +43,29 @@ public class EventFlow {
|
||||
private EventType event = null;
|
||||
|
||||
/** The listener returned by GlobalEventBus subscription. Used for unsubscription. */
|
||||
private final List<ListenerHandler> listeners = new ArrayList<>();
|
||||
private final List<Subscriber<?, ?>> listeners = new ArrayList<>();
|
||||
|
||||
/** Holds the results returned from the subscribed event, if any. */
|
||||
private Map<String, ?> result = null;
|
||||
|
||||
/** Empty constructor (event must be added via {@link #addPostEvent(Class, Object...)}). */
|
||||
public EventFlow() {}
|
||||
|
||||
public EventFlow addPostEvent(EventType event) {
|
||||
this.event = event;
|
||||
return this;
|
||||
public EventFlow(EventBus eventBus) {
|
||||
this.eventBus = eventBus;
|
||||
}
|
||||
|
||||
public EventFlow addPostEvent(Supplier<? extends EventType> eventSupplier) {
|
||||
this.event = eventSupplier.get();
|
||||
return this;
|
||||
public EventFlow() {
|
||||
this.eventBus = GlobalEventBus.get();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Add an event that will be triggered when {@link #postEvent()} or {@link #asyncPostEvent()} is called.
|
||||
*
|
||||
* @param eventClass The event that will be posted.
|
||||
* @param args The event arguments, see the added event record for more information.
|
||||
* @return {@link #EventFlow}
|
||||
*
|
||||
*/
|
||||
public <T extends EventType> EventFlow addPostEvent(Class<T> eventClass, Object... args) {
|
||||
try {
|
||||
boolean isUuidEvent = UniqueEvent.class.isAssignableFrom(eventClass);
|
||||
@@ -98,155 +108,403 @@ public class EventFlow {
|
||||
}
|
||||
}
|
||||
|
||||
/** Subscribe by ID: only fires if UUID matches this publisher's eventId. */
|
||||
public <TT extends ResponseToUniqueEvent> EventFlow onResponse(
|
||||
Class<TT> eventClass, Consumer<TT> action, boolean unsubscribeAfterSuccess) {
|
||||
ListenerHandler[] listenerHolder = new ListenerHandler[1];
|
||||
listenerHolder[0] =
|
||||
new ListenerHandler(
|
||||
GlobalEventBus.subscribe(
|
||||
eventClass,
|
||||
event -> {
|
||||
if (event.getIdentifier() != this.eventSnowflake) return;
|
||||
|
||||
action.accept(event);
|
||||
|
||||
if (unsubscribeAfterSuccess && listenerHolder[0] != null) {
|
||||
GlobalEventBus.unsubscribe(listenerHolder[0]);
|
||||
this.listeners.remove(listenerHolder[0]);
|
||||
}
|
||||
|
||||
this.result = event.result();
|
||||
}));
|
||||
this.listeners.add(listenerHolder[0]);
|
||||
/**
|
||||
*
|
||||
* Add an event that will be triggered when {@link #postEvent()} or {@link #asyncPostEvent()} is called.
|
||||
*
|
||||
* @param event The event to be posted.
|
||||
* @return {@link #EventFlow}
|
||||
*
|
||||
*/
|
||||
public EventFlow addPostEvent(EventType event) {
|
||||
this.event = event;
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Subscribe by ID: only fires if UUID matches this publisher's eventId. */
|
||||
public <TT extends ResponseToUniqueEvent> EventFlow onResponse(Class<TT> eventClass, Consumer<TT> action) {
|
||||
return this.onResponse(eventClass, action, true);
|
||||
/**
|
||||
*
|
||||
* Add an event that will be triggered when {@link #postEvent()} or {@link #asyncPostEvent()} is called.
|
||||
*
|
||||
* @param eventSupplier The event that will be posted through a Supplier.
|
||||
* @return {@link #EventFlow}
|
||||
*
|
||||
*/
|
||||
public EventFlow addPostEvent(Supplier<? extends EventType> eventSupplier) {
|
||||
this.event = eventSupplier.get();
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Subscribe by ID without explicit class. */
|
||||
/**
|
||||
*
|
||||
* Start listening for an event and trigger when ID correlates.
|
||||
*
|
||||
* @param event The {@link ResponseToUniqueEvent} to trigger the lambda.
|
||||
* @param action The lambda to run when triggered.
|
||||
* @param unsubscribeAfterSuccess Enable/disable auto unsubscribing to event after being triggered.
|
||||
* @param name A name given to the event, can later be used to unsubscribe.
|
||||
* @return {@link #EventFlow}
|
||||
*
|
||||
*/
|
||||
public <TT extends ResponseToUniqueEvent> EventFlow onResponse(
|
||||
Class<TT> event, Consumer<TT> action, boolean unsubscribeAfterSuccess, String name
|
||||
) {
|
||||
|
||||
final long id = SnowflakeGenerator.nextId();
|
||||
|
||||
Consumer<TT> newAction = eventClass -> {
|
||||
if (eventClass.getIdentifier() != this.eventSnowflake) return;
|
||||
|
||||
action.accept(eventClass);
|
||||
|
||||
if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id));
|
||||
|
||||
this.result = eventClass.result();
|
||||
};
|
||||
|
||||
var subscriber = new DefaultSubscriber<>(
|
||||
name,
|
||||
event,
|
||||
newAction
|
||||
);
|
||||
|
||||
eventBus.subscribe(subscriber);
|
||||
this.listeners.add(subscriber);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Start listening for an event and trigger when ID correlates, auto unsubscribes after being triggered and adds an empty name.
|
||||
*
|
||||
* @param event The {@link ResponseToUniqueEvent} to trigger the lambda.
|
||||
* @param action The lambda to run when triggered.
|
||||
* @return {@link #EventFlow}
|
||||
*
|
||||
*/
|
||||
public <TT extends ResponseToUniqueEvent> EventFlow onResponse(Class<TT> event, Consumer<TT> action) {
|
||||
return this.onResponse(event, action, true, "");
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Start listening for an event and trigger when ID correlates, auto adds an empty name.
|
||||
*
|
||||
* @param event The {@link ResponseToUniqueEvent} to trigger the lambda.
|
||||
* @param action The lambda to run when triggered.
|
||||
* @param unsubscribeAfterSuccess Enable/disable auto unsubscribing to event after being triggered.
|
||||
* @return {@link #EventFlow}
|
||||
*
|
||||
*/
|
||||
public <TT extends ResponseToUniqueEvent> EventFlow onResponse(Class<TT> event, Consumer<TT> action, boolean unsubscribeAfterSuccess) {
|
||||
return this.onResponse(event, action, unsubscribeAfterSuccess, "");
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Start listening for an event and trigger when ID correlates, auto unsubscribes after being triggered.
|
||||
*
|
||||
* @param event The {@link ResponseToUniqueEvent} to trigger the lambda.
|
||||
* @param action The lambda to run when triggered.
|
||||
* @param name A name given to the event, can later be used to unsubscribe.
|
||||
* @return {@link #EventFlow}
|
||||
*
|
||||
*/
|
||||
public <TT extends ResponseToUniqueEvent> EventFlow onResponse(Class<TT> event, Consumer<TT> action, String name) {
|
||||
return this.onResponse(event, action, true, name);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Subscribe by ID without explicit class.
|
||||
*
|
||||
* @param action The lambda to run when triggered.
|
||||
* @return {@link #EventFlow}
|
||||
*
|
||||
* @deprecated use {@link #onResponse(Class, Consumer, boolean, String)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
@SuppressWarnings("unchecked")
|
||||
public <TT extends ResponseToUniqueEvent> EventFlow onResponse(
|
||||
Consumer<TT> action, boolean unsubscribeAfterSuccess) {
|
||||
ListenerHandler[] listenerHolder = new ListenerHandler[1];
|
||||
listenerHolder[0] =
|
||||
new ListenerHandler(
|
||||
GlobalEventBus.subscribe(
|
||||
event -> {
|
||||
if (!(event instanceof UniqueEvent uuidEvent)) return;
|
||||
if (uuidEvent.getIdentifier() == this.eventSnowflake) {
|
||||
try {
|
||||
TT typedEvent = (TT) uuidEvent;
|
||||
action.accept(typedEvent);
|
||||
if (unsubscribeAfterSuccess
|
||||
&& listenerHolder[0] != null) {
|
||||
GlobalEventBus.unsubscribe(listenerHolder[0]);
|
||||
this.listeners.remove(listenerHolder[0]);
|
||||
}
|
||||
this.result = typedEvent.result();
|
||||
} catch (ClassCastException _) {
|
||||
throw new ClassCastException(
|
||||
"Cannot cast "
|
||||
+ event.getClass().getName()
|
||||
+ " to UniqueEvent");
|
||||
}
|
||||
}
|
||||
}));
|
||||
this.listeners.add(listenerHolder[0]);
|
||||
Consumer<TT> action, boolean unsubscribeAfterSuccess, String name) {
|
||||
|
||||
final long id = SnowflakeGenerator.nextId();
|
||||
|
||||
Consumer<TT> newAction = event -> {
|
||||
if (!(event instanceof UniqueEvent uuidEvent)) return;
|
||||
if (uuidEvent.getIdentifier() == this.eventSnowflake) {
|
||||
try {
|
||||
TT typedEvent = (TT) uuidEvent;
|
||||
action.accept(typedEvent);
|
||||
|
||||
if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id));
|
||||
|
||||
this.result = typedEvent.result();
|
||||
} catch (ClassCastException _) {
|
||||
throw new ClassCastException(
|
||||
"Cannot cast "
|
||||
+ event.getClass().getName()
|
||||
+ " to UniqueEvent");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
var listener = new DefaultSubscriber<>(
|
||||
name,
|
||||
(Class<TT>) action.getClass().getDeclaredMethods()[0].getParameterTypes()[0],
|
||||
newAction
|
||||
);
|
||||
|
||||
eventBus.subscribe(listener);
|
||||
this.listeners.add(listener);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Subscribe by ID without explicit class.
|
||||
*
|
||||
* @param action The lambda to run when triggered.
|
||||
* @return {@link #EventFlow}
|
||||
*
|
||||
* @deprecated use {@link #onResponse(Class, Consumer)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public <TT extends ResponseToUniqueEvent> EventFlow onResponse(Consumer<TT> action) {
|
||||
return this.onResponse(action, true);
|
||||
return this.onResponse(action, true, "");
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Start listening for an event, and run a lambda when triggered.
|
||||
*
|
||||
* @param event The {@link EventType} to trigger the lambda.
|
||||
* @param action The lambda to run when triggered.
|
||||
* @param unsubscribeAfterSuccess Enable/disable auto unsubscribing to event after being triggered.
|
||||
* @param name A name given to the event, can later be used to unsubscribe.
|
||||
* @return {@link #EventFlow}
|
||||
*
|
||||
*/
|
||||
public <TT extends EventType> EventFlow listen(
|
||||
Class<TT> eventClass, Consumer<TT> action, boolean unsubscribeAfterSuccess) {
|
||||
ListenerHandler[] listenerHolder = new ListenerHandler[1];
|
||||
listenerHolder[0] =
|
||||
new ListenerHandler(
|
||||
GlobalEventBus.subscribe(
|
||||
eventClass,
|
||||
event -> {
|
||||
action.accept(event);
|
||||
Class<TT> event, Consumer<TT> action, boolean unsubscribeAfterSuccess, String name) {
|
||||
|
||||
if (unsubscribeAfterSuccess && listenerHolder[0] != null) {
|
||||
GlobalEventBus.unsubscribe(listenerHolder[0]);
|
||||
this.listeners.remove(listenerHolder[0]);
|
||||
}
|
||||
}));
|
||||
this.listeners.add(listenerHolder[0]);
|
||||
long id = SnowflakeGenerator.nextId();
|
||||
|
||||
Consumer<TT> newAction = eventc -> {
|
||||
action.accept(eventc);
|
||||
|
||||
if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id));
|
||||
};
|
||||
|
||||
var listener = new DefaultSubscriber<>(
|
||||
name,
|
||||
event,
|
||||
newAction
|
||||
);
|
||||
|
||||
eventBus.subscribe(listener);
|
||||
this.listeners.add(listener);
|
||||
return this;
|
||||
}
|
||||
|
||||
public <TT extends EventType> EventFlow listen(Class<TT> eventClass, Consumer<TT> action) {
|
||||
return this.listen(eventClass, action, true);
|
||||
/**
|
||||
*
|
||||
* Start listening for an event, and run a lambda when triggered, auto unsubscribes.
|
||||
*
|
||||
* @param event The {@link EventType} to trigger the lambda.
|
||||
* @param action The lambda to run when triggered.
|
||||
* @param name A name given to the event, can later be used to unsubscribe.
|
||||
* @return {@link #EventFlow}
|
||||
*
|
||||
*/
|
||||
public <TT extends EventType> EventFlow listen(Class<TT> event, Consumer<TT> action, String name) {
|
||||
return this.listen(event, action, true, name);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Start listening for an event, and run a lambda when triggered, auto unsubscribe and gives it an empty name.
|
||||
*
|
||||
* @param event The {@link EventType} to trigger the lambda.
|
||||
* @param action The lambda to run when triggered.
|
||||
* @return {@link #EventFlow}
|
||||
*
|
||||
*/
|
||||
public <TT extends EventType> EventFlow listen(Class<TT> event, Consumer<TT> action) {
|
||||
return this.listen(event, action, true, "");
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Start listening for an event, and run a lambda when triggered, adds an empty name.
|
||||
*
|
||||
* @param event The {@link EventType} to trigger the lambda.
|
||||
* @param action The lambda to run when triggered.
|
||||
* @param unsubscribeAfterSuccess Enable/disable auto unsubscribing to event after being triggered.
|
||||
* @return {@link #EventFlow}
|
||||
*
|
||||
*/
|
||||
public <TT extends EventType> EventFlow listen(Class<TT> event, Consumer<TT> action, boolean unsubscribeAfterSuccess) {
|
||||
return this.listen(event, action, unsubscribeAfterSuccess, "");
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Start listening to an event.
|
||||
*
|
||||
* @param action The lambda to run when triggered.
|
||||
* @return {@link EventFlow}
|
||||
*
|
||||
* @deprecated use {@link #listen(Class, Consumer, boolean, String)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
@SuppressWarnings("unchecked")
|
||||
public <TT extends EventType> EventFlow listen(
|
||||
Consumer<TT> action, boolean unsubscribeAfterSuccess) {
|
||||
ListenerHandler[] listenerHolder = new ListenerHandler[1];
|
||||
listenerHolder[0] =
|
||||
new ListenerHandler(
|
||||
GlobalEventBus.subscribe(
|
||||
event -> {
|
||||
if (!(event instanceof EventType nonUuidEvent)) return;
|
||||
try {
|
||||
TT typedEvent = (TT) nonUuidEvent;
|
||||
action.accept(typedEvent);
|
||||
if (unsubscribeAfterSuccess && listenerHolder[0] != null) {
|
||||
GlobalEventBus.unsubscribe(listenerHolder[0]);
|
||||
this.listeners.remove(listenerHolder[0]);
|
||||
}
|
||||
} catch (ClassCastException _) {
|
||||
throw new ClassCastException(
|
||||
"Cannot cast "
|
||||
+ event.getClass().getName()
|
||||
+ " to UniqueEvent");
|
||||
}
|
||||
}));
|
||||
this.listeners.add(listenerHolder[0]);
|
||||
Consumer<TT> action, boolean unsubscribeAfterSuccess, String name) {
|
||||
long id = SnowflakeGenerator.nextId();
|
||||
|
||||
Class<TT> eventClass = (Class<TT>) action.getClass().getDeclaredMethods()[0].getParameterTypes()[0];
|
||||
|
||||
Consumer<TT> newAction = event -> {
|
||||
if (!(event instanceof EventType nonUuidEvent)) return;
|
||||
try {
|
||||
TT typedEvent = (TT) nonUuidEvent;
|
||||
action.accept(typedEvent);
|
||||
if (unsubscribeAfterSuccess) unsubscribe(String.valueOf(id));
|
||||
} catch (ClassCastException _) {
|
||||
throw new ClassCastException(
|
||||
"Cannot cast "
|
||||
+ event.getClass().getName()
|
||||
+ " to UniqueEvent");
|
||||
}
|
||||
};
|
||||
|
||||
var listener = new DefaultSubscriber<>(
|
||||
name,
|
||||
eventClass,
|
||||
newAction
|
||||
);
|
||||
|
||||
eventBus.subscribe(listener);
|
||||
this.listeners.add(listener);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Start listening to an event.
|
||||
*
|
||||
* @param action The lambda to run when triggered.
|
||||
* @return {@link EventFlow}
|
||||
*
|
||||
* @deprecated use {@link #listen(Class, Consumer)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public <TT extends EventType> EventFlow listen(Consumer<TT> action) {
|
||||
return this.listen(action, true);
|
||||
return this.listen(action, true, "");
|
||||
}
|
||||
|
||||
/** Post synchronously */
|
||||
/**
|
||||
* Posts the event added through {@link #addPostEvent}.
|
||||
*/
|
||||
public EventFlow postEvent() {
|
||||
GlobalEventBus.post(this.event);
|
||||
eventBus.post(this.event);
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Post asynchronously */
|
||||
/**
|
||||
* Posts the event added through {@link #addPostEvent} asynchronously.
|
||||
*
|
||||
* @deprecated use {@link #postEvent()} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public EventFlow asyncPostEvent() {
|
||||
GlobalEventBus.postAsync(this.event);
|
||||
eventBus.post(this.event);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* Unsubscribe from an event.
|
||||
*
|
||||
* @param action The listener object to remove and unsubscribe.
|
||||
*/
|
||||
public void unsubscribe(Consumer<?> action) {
|
||||
this.listeners.removeIf(handler -> {
|
||||
if (handler.handler().equals(action)) {
|
||||
eventBus.unsubscribe(handler);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribe from an event.
|
||||
*
|
||||
* @param name The name given to the listener.
|
||||
*/
|
||||
public void unsubscribe(String name) {
|
||||
this.listeners.removeIf(handler -> {
|
||||
if (handler.id().equals(name)) {
|
||||
eventBus.unsubscribe(handler);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribe all events.
|
||||
*/
|
||||
public void unsubscribeAll() {
|
||||
listeners.removeIf(handler -> {
|
||||
eventBus.unsubscribe(handler);
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean and remove everything inside {@link EventFlow}.
|
||||
*/
|
||||
private void clean() {
|
||||
this.listeners.clear();
|
||||
unsubscribeAll();
|
||||
this.event = null;
|
||||
this.result = null;
|
||||
} // TODO
|
||||
|
||||
/**
|
||||
* TODO
|
||||
*
|
||||
* @return TODO
|
||||
*/
|
||||
public Map<String, ?> getResult() {
|
||||
return this.result;
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO
|
||||
*
|
||||
* @return TODO
|
||||
*/
|
||||
public EventType getEvent() {
|
||||
return event;
|
||||
}
|
||||
|
||||
public ListenerHandler[] getListeners() {
|
||||
return listeners.toArray(new ListenerHandler[0]);
|
||||
/**
|
||||
*
|
||||
* Returns a copy of the list of listeners.
|
||||
*
|
||||
* @return Copy of the list of listeners.
|
||||
*/
|
||||
public Subscriber<?, ?>[] getListeners() {
|
||||
return listeners.toArray(new Subscriber[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the generated snowflake for the {@link EventFlow}
|
||||
*
|
||||
* @return The generated snowflake for this {@link EventFlow}
|
||||
*/
|
||||
public long getEventSnowflake() {
|
||||
return eventSnowflake;
|
||||
}
|
||||
|
||||
@@ -1,185 +1,45 @@
|
||||
package org.toop.framework.eventbus;
|
||||
|
||||
import com.lmax.disruptor.*;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.function.Consumer;
|
||||
import org.toop.framework.eventbus.events.EventType;
|
||||
import org.toop.framework.eventbus.events.UniqueEvent;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.toop.framework.eventbus.bus.DisruptorEventBus;
|
||||
import org.toop.framework.eventbus.bus.EventBus;
|
||||
import org.toop.framework.eventbus.store.DefaultSubscriberStore;
|
||||
import org.toop.framework.eventbus.subscriber.Subscriber;
|
||||
|
||||
/**
|
||||
* GlobalEventBus backed by the LMAX Disruptor for ultra-low latency, high-throughput event
|
||||
* publishing.
|
||||
*/
|
||||
public final class GlobalEventBus {
|
||||
public class GlobalEventBus implements EventBus {
|
||||
private static final EventBus INSTANCE = new DisruptorEventBus(
|
||||
LogManager.getLogger(DisruptorEventBus.class),
|
||||
new DefaultSubscriberStore()
|
||||
);
|
||||
|
||||
/** Map of event class to type-specific listeners. */
|
||||
private static final Map<Class<?>, CopyOnWriteArrayList<Consumer<? super EventType>>>
|
||||
LISTENERS = new ConcurrentHashMap<>();
|
||||
|
||||
/** Map of event class to Snowflake-ID-specific listeners. */
|
||||
private static final Map<
|
||||
Class<?>, ConcurrentHashMap<Long, Consumer<? extends UniqueEvent>>>
|
||||
UUID_LISTENERS = new ConcurrentHashMap<>();
|
||||
|
||||
/** Disruptor ring buffer size (must be power of two). */
|
||||
private static final int RING_BUFFER_SIZE = 1024 * 64;
|
||||
|
||||
/** Disruptor instance. */
|
||||
private static final Disruptor<EventHolder> DISRUPTOR;
|
||||
|
||||
/** Ring buffer used for publishing events. */
|
||||
private static final RingBuffer<EventHolder> RING_BUFFER;
|
||||
|
||||
static {
|
||||
ThreadFactory threadFactory =
|
||||
r -> {
|
||||
Thread t = new Thread(r, "EventBus-Disruptor");
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
};
|
||||
|
||||
DISRUPTOR =
|
||||
new Disruptor<>(
|
||||
EventHolder::new,
|
||||
RING_BUFFER_SIZE,
|
||||
threadFactory,
|
||||
ProducerType.MULTI,
|
||||
new BusySpinWaitStrategy());
|
||||
|
||||
// Single consumer that dispatches to subscribers
|
||||
DISRUPTOR.handleEventsWith(
|
||||
(holder, seq, endOfBatch) -> {
|
||||
if (holder.event != null) {
|
||||
dispatchEvent(holder.event);
|
||||
holder.event = null;
|
||||
}
|
||||
});
|
||||
|
||||
DISRUPTOR.start();
|
||||
RING_BUFFER = DISRUPTOR.getRingBuffer();
|
||||
}
|
||||
|
||||
/** Prevent instantiation. */
|
||||
private GlobalEventBus() {}
|
||||
|
||||
/** Wrapper used inside the ring buffer. */
|
||||
private static class EventHolder {
|
||||
EventType event;
|
||||
public static EventBus get() {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------
|
||||
// Subscription
|
||||
// ------------------------------------------------------------------------
|
||||
public static <T extends EventType> Consumer<? super EventType> subscribe(
|
||||
Class<T> eventClass, Consumer<T> listener) {
|
||||
|
||||
CopyOnWriteArrayList<Consumer<? super EventType>> list =
|
||||
LISTENERS.computeIfAbsent(eventClass, k -> new CopyOnWriteArrayList<>());
|
||||
|
||||
Consumer<? super EventType> wrapper = event -> listener.accept(eventClass.cast(event));
|
||||
list.add(wrapper);
|
||||
return wrapper;
|
||||
@Override
|
||||
public void subscribe(Subscriber<?, ?> listener) {
|
||||
INSTANCE.subscribe(listener);
|
||||
}
|
||||
|
||||
public static Consumer<? super EventType> subscribe(Consumer<Object> listener) {
|
||||
Consumer<? super EventType> wrapper = event -> listener.accept(event);
|
||||
LISTENERS.computeIfAbsent(Object.class, _ -> new CopyOnWriteArrayList<>()).add(wrapper);
|
||||
return wrapper;
|
||||
@Override
|
||||
public void unsubscribe(Subscriber<?, ?> listener) {
|
||||
INSTANCE.unsubscribe(listener);
|
||||
}
|
||||
|
||||
public static <T extends UniqueEvent> void subscribeById(
|
||||
Class<T> eventClass, long eventId, Consumer<T> listener) {
|
||||
UUID_LISTENERS
|
||||
.computeIfAbsent(eventClass, _ -> new ConcurrentHashMap<>())
|
||||
.put(eventId, listener);
|
||||
@Override
|
||||
public <T> void post(T event) {
|
||||
INSTANCE.post(event);
|
||||
}
|
||||
|
||||
public static void unsubscribe(Object listener) {
|
||||
LISTENERS.values().forEach(list -> list.remove(listener));
|
||||
@Override
|
||||
public void shutdown() {
|
||||
INSTANCE.shutdown();
|
||||
}
|
||||
|
||||
public static <T extends UniqueEvent> void unsubscribeById(
|
||||
Class<T> eventClass, long eventId) {
|
||||
Map<Long, Consumer<? extends UniqueEvent>> map = UUID_LISTENERS.get(eventClass);
|
||||
if (map != null) map.remove(eventId);
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------
|
||||
// Posting
|
||||
// ------------------------------------------------------------------------
|
||||
public static <T extends EventType> void post(T event) {
|
||||
dispatchEvent(event); // synchronous
|
||||
}
|
||||
|
||||
public static <T extends EventType> void postAsync(T event) {
|
||||
long seq = RING_BUFFER.next();
|
||||
try {
|
||||
EventHolder holder = RING_BUFFER.get(seq);
|
||||
holder.event = event;
|
||||
} finally {
|
||||
RING_BUFFER.publish(seq);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static void dispatchEvent(EventType event) {
|
||||
Class<?> clazz = event.getClass();
|
||||
|
||||
// class-specific listeners
|
||||
CopyOnWriteArrayList<Consumer<? super EventType>> classListeners = LISTENERS.get(clazz);
|
||||
if (classListeners != null) {
|
||||
for (Consumer<? super EventType> listener : classListeners) {
|
||||
try {
|
||||
listener.accept(event);
|
||||
} catch (Throwable e) {
|
||||
// e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// generic listeners
|
||||
CopyOnWriteArrayList<Consumer<? super EventType>> genericListeners =
|
||||
LISTENERS.get(Object.class);
|
||||
if (genericListeners != null) {
|
||||
for (Consumer<? super EventType> listener : genericListeners) {
|
||||
try {
|
||||
listener.accept(event);
|
||||
} catch (Throwable e) {
|
||||
// e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// snowflake listeners
|
||||
if (event instanceof UniqueEvent snowflakeEvent) {
|
||||
Map<Long, Consumer<? extends UniqueEvent>> map = UUID_LISTENERS.get(clazz);
|
||||
if (map != null) {
|
||||
Consumer<UniqueEvent> listener =
|
||||
(Consumer<UniqueEvent>) map.remove(snowflakeEvent.getIdentifier());
|
||||
if (listener != null) {
|
||||
try {
|
||||
listener.accept(snowflakeEvent);
|
||||
} catch (Throwable ignored) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------
|
||||
// Lifecycle
|
||||
// ------------------------------------------------------------------------
|
||||
public static void shutdown() {
|
||||
DISRUPTOR.shutdown();
|
||||
LISTENERS.clear();
|
||||
UUID_LISTENERS.clear();
|
||||
}
|
||||
|
||||
public static void reset() {
|
||||
LISTENERS.clear();
|
||||
UUID_LISTENERS.clear();
|
||||
@Override
|
||||
public void reset() {
|
||||
INSTANCE.reset();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
package org.toop.framework.eventbus;
|
||||
|
||||
public class ListenerHandler {
|
||||
private Object listener;
|
||||
|
||||
// private boolean unsubscribeAfterSuccess = true;
|
||||
|
||||
// public ListenerHandler(Object listener, boolean unsubAfterSuccess) {
|
||||
// this.listener = listener;
|
||||
// this.unsubscribeAfterSuccess = unsubAfterSuccess;
|
||||
// }
|
||||
|
||||
public ListenerHandler(Object listener) {
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
public Object getListener() {
|
||||
return this.listener;
|
||||
}
|
||||
|
||||
// public boolean isUnsubscribeAfterSuccess() {
|
||||
// return this.unsubscribeAfterSuccess;
|
||||
// }
|
||||
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package org.toop.framework.eventbus.bus;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.toop.framework.eventbus.events.EventType;
|
||||
import org.toop.framework.eventbus.store.SubscriberStore;
|
||||
import org.toop.framework.eventbus.subscriber.Subscriber;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class DefaultEventBus implements EventBus {
|
||||
private final Logger logger;
|
||||
private final SubscriberStore eventsHolder;
|
||||
|
||||
public DefaultEventBus(Logger logger, SubscriberStore eventsHolder) {
|
||||
this.logger = logger;
|
||||
this.eventsHolder = eventsHolder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(Subscriber<?, ?> subscriber) {
|
||||
eventsHolder.add(subscriber);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unsubscribe(Subscriber<?, ?> subscriber) {
|
||||
eventsHolder.remove(subscriber);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> void post(T event) {
|
||||
Class<T> eventType = (Class<T>) event.getClass();
|
||||
var subs = eventsHolder.get(eventType);
|
||||
if (subs != null) {
|
||||
for (Subscriber<?, ?> subscriber : subs) {
|
||||
Class<T> eventClass = (Class<T>) subscriber.event();
|
||||
Consumer<EventType> action = (Consumer<EventType>) subscriber.handler();
|
||||
|
||||
action.accept((EventType) eventClass.cast(event));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
eventsHolder.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
eventsHolder.reset();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,117 @@
|
||||
package org.toop.framework.eventbus.bus;
|
||||
|
||||
import com.lmax.disruptor.BusySpinWaitStrategy;
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.toop.framework.eventbus.subscriber.Subscriber;
|
||||
import org.toop.framework.eventbus.events.EventType;
|
||||
import org.toop.framework.eventbus.store.SubscriberStore;
|
||||
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class DisruptorEventBus implements EventBus {
|
||||
/** Wrapper used inside the ring buffer. */
|
||||
private static class EventHolder<T> {
|
||||
T event;
|
||||
}
|
||||
|
||||
private final Logger logger;
|
||||
private final SubscriberStore eventsHolder;
|
||||
|
||||
private final Disruptor<EventHolder<?>> disruptor;
|
||||
private final RingBuffer<EventHolder<?>> ringBuffer;
|
||||
|
||||
public DisruptorEventBus(Logger logger, SubscriberStore eventsHolder) {
|
||||
this.logger = logger;
|
||||
this.eventsHolder = eventsHolder;
|
||||
|
||||
ThreadFactory threadFactory =
|
||||
r -> {
|
||||
Thread t = new Thread(r, "EventBus-Disruptor");
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
};
|
||||
|
||||
disruptor = getEventHolderDisruptor(threadFactory);
|
||||
|
||||
disruptor.start();
|
||||
this.ringBuffer = disruptor.getRingBuffer();
|
||||
}
|
||||
|
||||
private Disruptor<EventHolder<?>> getEventHolderDisruptor(ThreadFactory threadFactory) {
|
||||
int RING_BUFFER_SIZE = 1024 * 64;
|
||||
Disruptor<EventHolder<?>> disruptor = new Disruptor<>(
|
||||
EventHolder::new,
|
||||
RING_BUFFER_SIZE,
|
||||
threadFactory,
|
||||
ProducerType.MULTI,
|
||||
new BusySpinWaitStrategy());
|
||||
|
||||
disruptor.handleEventsWith(
|
||||
(holder, _, _) -> {
|
||||
if (holder.event != null) {
|
||||
dispatchEvent(holder.event);
|
||||
holder.event = null;
|
||||
}
|
||||
});
|
||||
return disruptor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(Subscriber<?, ?> listener) {
|
||||
eventsHolder.add(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unsubscribe(Subscriber<?, ?> listener) {
|
||||
eventsHolder.remove(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void post(T event) {
|
||||
long seq = ringBuffer.next();
|
||||
try {
|
||||
@SuppressWarnings("unchecked")
|
||||
EventHolder<T> holder = (EventHolder<T>) ringBuffer.get(seq);
|
||||
holder.event = event;
|
||||
} finally {
|
||||
ringBuffer.publish(seq);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
disruptor.shutdown();
|
||||
eventsHolder.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
eventsHolder.reset();
|
||||
}
|
||||
|
||||
private <T> void dispatchEvent(T event) {
|
||||
var classListeners = eventsHolder.get(event.getClass());
|
||||
if (classListeners != null) {
|
||||
for (Subscriber<?, ?> listener : classListeners) {
|
||||
try {
|
||||
callListener(listener, event);
|
||||
} catch (Throwable e) {
|
||||
logger.warn("Exception while handling event: {}", event, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <T> void callListener(Subscriber<?, ?> subscriber, T event) {
|
||||
Class<T> eventClass = (Class<T>) subscriber.event();
|
||||
Consumer<EventType> action = (Consumer<EventType>) subscriber.handler();
|
||||
|
||||
action.accept((EventType) eventClass.cast(event));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package org.toop.framework.eventbus.bus;
|
||||
|
||||
import org.toop.framework.eventbus.subscriber.Subscriber;
|
||||
|
||||
public interface EventBus {
|
||||
void subscribe(Subscriber<?, ?> subscriber);
|
||||
void unsubscribe(Subscriber<?, ?> subscriber);
|
||||
<T> void post(T event);
|
||||
void shutdown();
|
||||
void reset();
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
package org.toop.framework.eventbus.store;
|
||||
|
||||
import org.toop.framework.eventbus.subscriber.Subscriber;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
public class AsyncSubscriberStore implements SubscriberStore {
|
||||
private final ConcurrentHashMap<Class<?>, ConcurrentLinkedQueue<Subscriber<?, ?>>> queues = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<Class<?>, Subscriber<?, ?>[]> snapshots = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public void add(Subscriber<?, ?> sub) {
|
||||
queues.computeIfAbsent(sub.event(), _ -> new ConcurrentLinkedQueue<>()).add(sub);
|
||||
rebuildSnapshot(sub.event());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(Subscriber<?, ?> sub) {
|
||||
ConcurrentLinkedQueue<Subscriber<?, ?>> queue = queues.get(sub.event());
|
||||
if (queue != null) {
|
||||
queue.remove(sub);
|
||||
rebuildSnapshot(sub.event());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Subscriber<?, ?>[] get(Class<?> event) {
|
||||
return snapshots.getOrDefault(event, new Subscriber[0]);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
queues.clear();
|
||||
snapshots.clear();
|
||||
}
|
||||
|
||||
private void rebuildSnapshot(Class<?> event) {
|
||||
ConcurrentLinkedQueue<Subscriber<?, ?>> queue = queues.get(event);
|
||||
if (queue != null) {
|
||||
snapshots.put(event, queue.toArray(new Subscriber[0]));
|
||||
} else {
|
||||
snapshots.put(event, new Subscriber[0]);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,71 @@
|
||||
package org.toop.framework.eventbus.store;
|
||||
|
||||
import org.toop.framework.eventbus.subscriber.Subscriber;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class DefaultSubscriberStore implements SubscriberStore {
|
||||
|
||||
private static final Subscriber<?, ?>[] EMPTY = new Subscriber[0];
|
||||
|
||||
private final ConcurrentHashMap<Class<?>, Subscriber<?, ?>[]> listeners =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public void add(Subscriber<?, ?> sub) {
|
||||
listeners.compute(sub.event(), (_, arr) -> {
|
||||
if (arr == null || arr.length == 0) {
|
||||
return new Subscriber<?, ?>[]{sub};
|
||||
}
|
||||
|
||||
int len = arr.length;
|
||||
Subscriber<?, ?>[] newArr = new Subscriber[len + 1];
|
||||
System.arraycopy(arr, 0, newArr, 0, len);
|
||||
newArr[len] = sub;
|
||||
return newArr;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(Subscriber<?, ?> sub) {
|
||||
listeners.computeIfPresent(sub.event(), (_, arr) -> {
|
||||
int len = arr.length;
|
||||
|
||||
if (len == 1) {
|
||||
return arr[0].equals(sub) ? null : arr;
|
||||
}
|
||||
|
||||
int keep = 0;
|
||||
for (Subscriber<?, ?> s : arr) {
|
||||
if (!s.equals(sub)) keep++;
|
||||
}
|
||||
|
||||
if (keep == len) {
|
||||
return arr;
|
||||
}
|
||||
if (keep == 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Subscriber<?, ?>[] newArr = new Subscriber[keep];
|
||||
int i = 0;
|
||||
for (Subscriber<?, ?> s : arr) {
|
||||
if (!s.equals(sub)) {
|
||||
newArr[i++] = s;
|
||||
}
|
||||
}
|
||||
|
||||
return newArr;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Subscriber<?, ?>[] get(Class<?> event) {
|
||||
return listeners.getOrDefault(event, EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
listeners.clear();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package org.toop.framework.eventbus.store;
|
||||
|
||||
import org.toop.framework.eventbus.subscriber.Subscriber;
|
||||
|
||||
public interface SubscriberStore {
|
||||
void add(Subscriber<?, ?> subscriber);
|
||||
void remove(Subscriber<?, ?> subscriber);
|
||||
Subscriber<?, ?>[] get(Class<?> event);
|
||||
void reset();
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
package org.toop.framework.eventbus.store;
|
||||
|
||||
import org.toop.framework.eventbus.subscriber.Subscriber;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class SyncSubscriberStore implements SubscriberStore {
|
||||
private final Map<Class<?>, List<Subscriber<?, ?>>> LISTENERS = new ConcurrentHashMap<>();
|
||||
private static final Subscriber<?, ?>[] EMPTY = new Subscriber[0];
|
||||
|
||||
@Override
|
||||
public void add(Subscriber<?, ?> sub) {
|
||||
LISTENERS.computeIfAbsent(sub.event(), _ -> new ArrayList<>()).add(sub);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(Subscriber<?, ?> sub) {
|
||||
LISTENERS.getOrDefault(sub.event(), new ArrayList<>()).remove(sub);
|
||||
LISTENERS.entrySet().removeIf(entry -> entry.getValue().isEmpty());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Subscriber<?, ?>[] get(Class<?> event) {
|
||||
List<Subscriber<?, ?>> list = LISTENERS.get(event);
|
||||
if (list == null || list.isEmpty()) return EMPTY;
|
||||
return list.toArray(EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
LISTENERS.clear();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package org.toop.framework.eventbus.subscriber;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public record DefaultSubscriber<K>(String id, Class<K> event, Consumer<K> handler) implements NamedSubscriber<K> {}
|
||||
@@ -0,0 +1,3 @@
|
||||
package org.toop.framework.eventbus.subscriber;
|
||||
|
||||
public interface IdSubscriber<T> extends Subscriber<Long, T> {}
|
||||
@@ -0,0 +1,5 @@
|
||||
package org.toop.framework.eventbus.subscriber;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public record LongIdSubscriber<K>(Long id, Class<K> event, Consumer<K> handler) implements IdSubscriber<K> {}
|
||||
@@ -0,0 +1,3 @@
|
||||
package org.toop.framework.eventbus.subscriber;
|
||||
|
||||
public interface NamedSubscriber<T> extends Subscriber<String, T> {}
|
||||
@@ -0,0 +1,9 @@
|
||||
package org.toop.framework.eventbus.subscriber;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public interface Subscriber<ID, K> {
|
||||
ID id();
|
||||
Class<K> event();
|
||||
Consumer<K> handler();
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package org.toop.framework.gameFramework;
|
||||
|
||||
/**
|
||||
* Represents the current state of a turn-based game.
|
||||
*/
|
||||
public enum GameState {
|
||||
/** Game is ongoing and no special condition applies. */
|
||||
NORMAL,
|
||||
|
||||
/** Game ended in a draw. */
|
||||
DRAW,
|
||||
|
||||
/** Game ended with a win for a player. */
|
||||
WIN,
|
||||
|
||||
/** Next player's turn was skipped. */
|
||||
TURN_SKIPPED,
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package org.toop.framework.gameFramework.controller;
|
||||
|
||||
/**
|
||||
* Interface for classes that can trigger a UI update.
|
||||
*/
|
||||
public interface UpdatesGameUI {
|
||||
|
||||
/** Called to refresh or update the game UI. */
|
||||
void updateUI();
|
||||
}
|
||||
@@ -0,0 +1,108 @@
|
||||
package org.toop.framework.gameFramework.model.game;
|
||||
|
||||
import org.toop.framework.gameFramework.model.player.Player;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
public abstract class AbstractGame<T extends TurnBasedGame<T>> implements TurnBasedGame<T> {
|
||||
private final int playerCount; // How many players are playing
|
||||
private final Player<T>[] players;
|
||||
private int turn = 0; // What turn it is in the game
|
||||
|
||||
/** Constant representing an empty position on the board. */
|
||||
public static final int EMPTY = -1;
|
||||
|
||||
/** Number of rows in the game board. */
|
||||
private final int rowSize;
|
||||
|
||||
/** Number of columns in the game board. */
|
||||
private final int columnSize;
|
||||
|
||||
/** The game board stored as a one-dimensional array. */
|
||||
private final int[] board;
|
||||
|
||||
|
||||
|
||||
protected AbstractGame(int rowSize, int columnSize, int playerCount, Player<T>[] players) {
|
||||
assert rowSize > 0 && columnSize > 0;
|
||||
|
||||
this.rowSize = rowSize;
|
||||
this.columnSize = columnSize;
|
||||
|
||||
this.players = players;
|
||||
|
||||
board = new int[rowSize * columnSize];
|
||||
Arrays.fill(board, EMPTY);
|
||||
|
||||
this.playerCount = playerCount;
|
||||
}
|
||||
|
||||
protected AbstractGame(AbstractGame<T> other){
|
||||
this.rowSize = other.rowSize;
|
||||
this.columnSize = other.columnSize;
|
||||
this.board = other.board.clone();
|
||||
this.playerCount = other.playerCount;
|
||||
this.turn = other.turn;
|
||||
// TODO: Make this a deep copy, add deep copy interface to Player
|
||||
this.players = other.players;
|
||||
|
||||
}
|
||||
|
||||
public static boolean contains(int[] array, int value) {
|
||||
// O(n)
|
||||
for (int element : array){
|
||||
if (element == value) return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public Player<T> getPlayer(int index) {
|
||||
return players[index];
|
||||
}
|
||||
|
||||
public int getPlayerCount(){return this.playerCount;}
|
||||
|
||||
protected void nextTurn() {
|
||||
turn += 1;
|
||||
}
|
||||
|
||||
public int getCurrentTurn() {
|
||||
return turn % playerCount;
|
||||
}
|
||||
|
||||
protected void setBoard(int position) {
|
||||
setBoard(position, getCurrentTurn());
|
||||
}
|
||||
|
||||
protected void setBoard(int position, int player) {
|
||||
this.board[position] = player;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of rows in the board.
|
||||
*
|
||||
* @return number of rows
|
||||
*/
|
||||
public int getRowSize() {
|
||||
return this.rowSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of columns in the board.
|
||||
*
|
||||
* @return number of columns
|
||||
*/
|
||||
public int getColumnSize() {
|
||||
return this.columnSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a copy of the current board state.
|
||||
*
|
||||
* @return a cloned array representing the board
|
||||
*/
|
||||
public int[] getBoard() {
|
||||
return this.board.clone();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package org.toop.framework.gameFramework.model.game;
|
||||
|
||||
public interface DeepCopyable<T extends TurnBasedGame<T>> {
|
||||
T deepCopy();
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
package org.toop.framework.gameFramework.model.game;
|
||||
|
||||
import org.toop.framework.gameFramework.GameState;
|
||||
|
||||
/**
|
||||
* Represents the result of a move in a turn-based game.
|
||||
*
|
||||
* @param state the resulting {@link GameState} after the move
|
||||
* @param player the index of the player associated with the result (winner or relevant player)
|
||||
*/
|
||||
public record PlayResult(GameState state, int player) {
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
package org.toop.framework.gameFramework.model.game;
|
||||
|
||||
import org.toop.framework.gameFramework.GameState;
|
||||
|
||||
/**
|
||||
* Interface for turn-based games that can be played and queried for legal moves.
|
||||
*/
|
||||
public interface Playable {
|
||||
|
||||
/**
|
||||
* Returns the moves that are currently valid in the game.
|
||||
*
|
||||
* @return an array of integers representing legal moves
|
||||
*/
|
||||
int[] getLegalMoves();
|
||||
|
||||
/**
|
||||
* Plays the given move and returns the resulting game state.
|
||||
*
|
||||
* @param move the move to apply
|
||||
* @return the {@link GameState} and additional info after the move
|
||||
*/
|
||||
PlayResult play(int move);
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package org.toop.framework.gameFramework.model.game;
|
||||
|
||||
import org.toop.framework.gameFramework.model.player.Player;
|
||||
|
||||
public interface PlayerProvider<T extends TurnBasedGame<T>> {
|
||||
Player<T> getPlayer(int index);
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package org.toop.framework.gameFramework.model.game;
|
||||
|
||||
import org.toop.framework.networking.events.NetworkEvents;
|
||||
|
||||
/**
|
||||
* Interface for games that support online multiplayer play.
|
||||
* <p>
|
||||
* Methods are called in response to network events from the server.
|
||||
*/
|
||||
public interface SupportsOnlinePlay {
|
||||
|
||||
/** Called when it is this player's turn to make a move. */
|
||||
void onYourTurn(NetworkEvents.YourTurnResponse event);
|
||||
|
||||
/** Called when a move from another player is received. */
|
||||
void onMoveReceived(NetworkEvents.GameMoveResponse event);
|
||||
|
||||
/** Called when the game has finished, with the final result. */
|
||||
void gameFinished(NetworkEvents.GameResultResponse event);
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package org.toop.framework.gameFramework.model.game;
|
||||
|
||||
public interface TurnBasedGame<T extends TurnBasedGame<T>> extends Playable, DeepCopyable<T>, PlayerProvider<T> {
|
||||
int getCurrentTurn();
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package org.toop.framework.gameFramework.model.game.threadBehaviour;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.toop.framework.gameFramework.model.game.TurnBasedGame;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* Base class for thread-based game behaviours.
|
||||
* <p>
|
||||
* Provides common functionality for managing game state and execution:
|
||||
* a running flag, a game reference, and a logger.
|
||||
* Subclasses implement the actual game-loop logic.
|
||||
*/
|
||||
public abstract class AbstractThreadBehaviour<T extends TurnBasedGame<T>> implements ThreadBehaviour<T> {
|
||||
|
||||
/** Indicates whether the game loop or event processing is active. */
|
||||
protected final AtomicBoolean isRunning = new AtomicBoolean();
|
||||
|
||||
/** The game instance controlled by this behaviour. */
|
||||
protected final T game;
|
||||
|
||||
/** Logger for the subclass to report errors or debug info. */
|
||||
protected final Logger logger = LogManager.getLogger(this.getClass());
|
||||
|
||||
/**
|
||||
* Creates a new base behaviour for the specified game.
|
||||
*
|
||||
* @param game the turn-based game to control
|
||||
*/
|
||||
public AbstractThreadBehaviour(T game) {
|
||||
this.game = game;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package org.toop.framework.gameFramework.model.game.threadBehaviour;
|
||||
|
||||
public interface Controllable {
|
||||
void start();
|
||||
|
||||
void stop();
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package org.toop.framework.gameFramework.model.game.threadBehaviour;
|
||||
|
||||
import org.toop.framework.gameFramework.model.game.TurnBasedGame;
|
||||
|
||||
/**
|
||||
* Strategy interface for controlling game thread behavior.
|
||||
* <p>
|
||||
* Defines how a game's execution is started, stopped, and which player is active.
|
||||
*/
|
||||
public interface ThreadBehaviour<T extends TurnBasedGame<T>> extends Controllable {
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
package org.toop.framework.gameFramework.model.player;
|
||||
|
||||
import org.toop.framework.gameFramework.model.game.TurnBasedGame;
|
||||
|
||||
/**
|
||||
* Abstract base class for AI implementations for games extending {@link GameR}.
|
||||
* <p>
|
||||
* Provides a common superclass for specific AI algorithms. Concrete subclasses
|
||||
* must implement the {@link #findBestMove(GameR, int)} method defined by
|
||||
* {@link IAIMoveR} to determine the best move given a game state and a search depth.
|
||||
* </p>
|
||||
*
|
||||
* @param <T> the specific type of game this AI can play, extending {@link GameR}
|
||||
*/
|
||||
public abstract class AbstractAI<T extends TurnBasedGame> implements MoveProvider<T> {
|
||||
// Concrete AI implementations should override findBestMove(T game, int depth)
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
package org.toop.framework.gameFramework.model.player;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.toop.framework.gameFramework.model.game.TurnBasedGame;
|
||||
|
||||
/**
|
||||
* Abstract class representing a player in a game.
|
||||
* <p>
|
||||
* Players are entities that can make moves based on the current state of a game.
|
||||
* player types, such as human players or AI players.
|
||||
* </p>
|
||||
* <p>
|
||||
* Subclasses should override the {@link #getMove(GameR)} method to provide
|
||||
* specific move logic.
|
||||
* </p>
|
||||
*/
|
||||
public abstract class AbstractPlayer<T extends TurnBasedGame<T>> implements Player<T> {
|
||||
private int playerIndex = -1;
|
||||
|
||||
private Logger logger = LogManager.getLogger(this.getClass());
|
||||
|
||||
private final String name;
|
||||
|
||||
protected AbstractPlayer(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
/**
|
||||
* Determines the next move based on the provided game state.
|
||||
* <p>
|
||||
* The default implementation throws an {@link UnsupportedOperationException},
|
||||
* indicating that concrete subclasses must override this method to provide
|
||||
* actual move logic.
|
||||
* </p>
|
||||
*
|
||||
* @param gameCopy a snapshot of the current game state
|
||||
* @return an integer representing the chosen move
|
||||
* @throws UnsupportedOperationException if the method is not overridden
|
||||
*/
|
||||
public int getMove(T gameCopy) {
|
||||
logger.error("Method getMove not implemented.");
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
public String getName(){
|
||||
return this.name;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package org.toop.framework.gameFramework.model.player;
|
||||
|
||||
import org.toop.framework.gameFramework.model.game.TurnBasedGame;
|
||||
|
||||
public interface MoveProvider<T extends TurnBasedGame> {
|
||||
int getMove(T game);
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package org.toop.framework.gameFramework.model.player;
|
||||
|
||||
public interface NameProvider {
|
||||
String getName();
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
package org.toop.framework.gameFramework.model.player;
|
||||
|
||||
import org.toop.framework.gameFramework.model.game.TurnBasedGame;
|
||||
|
||||
public interface Player<T extends TurnBasedGame<T>> extends NameProvider, MoveProvider<T> {
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
package org.toop.framework.gameFramework.view;
|
||||
|
||||
import org.toop.framework.eventbus.events.EventsBase;
|
||||
import org.toop.framework.eventbus.events.GenericEvent;
|
||||
|
||||
/**
|
||||
* Defines GUI-related events for the event bus.
|
||||
* <p>
|
||||
* These events notify the UI about updates such as game progress,
|
||||
* player actions, and game completion.
|
||||
*/
|
||||
public class GUIEvents extends EventsBase {
|
||||
|
||||
/** Event to refresh or redraw the game canvas. */
|
||||
public record RefreshGameCanvas() implements GenericEvent {}
|
||||
|
||||
/**
|
||||
* Event indicating the game has ended.
|
||||
*
|
||||
* @param winOrTie true if the game ended in a win, false for a draw
|
||||
* @param winner the index of the winning player, or -1 if no winner
|
||||
*/
|
||||
public record GameEnded(boolean winOrTie, int winner) implements GenericEvent {}
|
||||
|
||||
/** Event indicating a player has attempted a move. */
|
||||
public record PlayerAttemptedMove(int move) implements GenericEvent {}
|
||||
|
||||
/** Event indicating a player is hovering over a move (for UI feedback). */
|
||||
public record PlayerMoveHovered(int move) implements GenericEvent {}
|
||||
}
|
||||
@@ -4,42 +4,44 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.toop.framework.SnowflakeGenerator;
|
||||
import org.toop.framework.eventbus.EventFlow;
|
||||
import org.toop.framework.eventbus.bus.EventBus;
|
||||
import org.toop.framework.networking.events.NetworkEvents;
|
||||
import org.toop.framework.networking.exceptions.ClientNotFoundException;
|
||||
import org.toop.framework.networking.interfaces.NetworkingClientManager;
|
||||
|
||||
public class NetworkingClientEventListener {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(NetworkingClientEventListener.class);
|
||||
|
||||
private final NetworkingClientManager clientManager;
|
||||
|
||||
/** Starts a connection manager, to manage, connections. */
|
||||
public NetworkingClientEventListener(NetworkingClientManager clientManager) {
|
||||
public NetworkingClientEventListener(EventBus eventBus, NetworkingClientManager clientManager) {
|
||||
this.clientManager = clientManager;
|
||||
new EventFlow()
|
||||
.listen(this::handleStartClient)
|
||||
.listen(this::handleCommand)
|
||||
.listen(this::handleSendLogin)
|
||||
.listen(this::handleSendLogout)
|
||||
.listen(this::handleSendGetPlayerlist)
|
||||
.listen(this::handleSendGetGamelist)
|
||||
.listen(this::handleSendSubscribe)
|
||||
.listen(this::handleSendMove)
|
||||
.listen(this::handleSendChallenge)
|
||||
.listen(this::handleSendAcceptChallenge)
|
||||
.listen(this::handleSendForfeit)
|
||||
.listen(this::handleSendMessage)
|
||||
.listen(this::handleSendHelp)
|
||||
.listen(this::handleSendHelpForCommand)
|
||||
.listen(this::handleCloseClient)
|
||||
.listen(this::handleReconnect)
|
||||
.listen(this::handleChangeAddress)
|
||||
.listen(this::handleGetAllConnections)
|
||||
.listen(this::handleShutdownAll);
|
||||
new EventFlow(eventBus)
|
||||
.listen(NetworkEvents.StartClient.class, this::handleStartClient, false)
|
||||
.listen(NetworkEvents.SendCommand.class, this::handleCommand, false)
|
||||
.listen(NetworkEvents.SendLogin.class, this::handleSendLogin, false)
|
||||
.listen(NetworkEvents.SendLogout.class, this::handleSendLogout, false)
|
||||
.listen(NetworkEvents.SendGetPlayerlist.class, this::handleSendGetPlayerlist, false)
|
||||
.listen(NetworkEvents.SendGetGamelist.class, this::handleSendGetGamelist, false)
|
||||
.listen(NetworkEvents.SendSubscribe.class, this::handleSendSubscribe, false)
|
||||
.listen(NetworkEvents.SendMove.class, this::handleSendMove, false)
|
||||
.listen(NetworkEvents.SendChallenge.class, this::handleSendChallenge, false)
|
||||
.listen(NetworkEvents.SendAcceptChallenge.class, this::handleSendAcceptChallenge, false)
|
||||
.listen(NetworkEvents.SendForfeit.class, this::handleSendForfeit, false)
|
||||
.listen(NetworkEvents.SendMessage.class, this::handleSendMessage, false)
|
||||
.listen(NetworkEvents.SendHelp.class, this::handleSendHelp, false)
|
||||
.listen(NetworkEvents.SendHelpForCommand.class, this::handleSendHelpForCommand, false)
|
||||
.listen(NetworkEvents.CloseClient.class, this::handleCloseClient, false)
|
||||
.listen(NetworkEvents.Reconnect.class, this::handleReconnect, false)
|
||||
.listen(NetworkEvents.ChangeAddress.class, this::handleChangeAddress, false)
|
||||
.listen(NetworkEvents.RequestsAllClients.class, this::handleGetAllConnections, false)
|
||||
.listen(NetworkEvents.ForceCloseAllClients.class, this::handleShutdownAll, false);
|
||||
}
|
||||
|
||||
void handleStartClient(NetworkEvents.StartClient event) {
|
||||
long clientId = SnowflakeGenerator.nextId();
|
||||
new EventFlow().addPostEvent(new NetworkEvents.CreatedIdForClient(clientId, event.identifier())).postEvent();
|
||||
clientManager.startClient(
|
||||
clientId,
|
||||
event.networkingClient(),
|
||||
|
||||
@@ -8,6 +8,9 @@ import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.toop.framework.eventbus.EventFlow;
|
||||
import org.toop.framework.eventbus.bus.EventBus;
|
||||
import org.toop.framework.networking.events.NetworkEvents;
|
||||
import org.toop.framework.networking.exceptions.ClientNotFoundException;
|
||||
import org.toop.framework.networking.exceptions.CouldNotConnectException;
|
||||
import org.toop.framework.networking.interfaces.NetworkingClient;
|
||||
@@ -15,9 +18,13 @@ import org.toop.framework.networking.types.NetworkingConnector;
|
||||
|
||||
public class NetworkingClientManager implements org.toop.framework.networking.interfaces.NetworkingClientManager {
|
||||
private static final Logger logger = LogManager.getLogger(NetworkingClientManager.class);
|
||||
|
||||
private final EventBus eventBus;
|
||||
private final Map<Long, NetworkingClient> networkClients = new ConcurrentHashMap<>();
|
||||
|
||||
public NetworkingClientManager() {}
|
||||
public NetworkingClientManager(EventBus eventBus) {
|
||||
this.eventBus = eventBus;
|
||||
}
|
||||
|
||||
private void connectHelper(
|
||||
long id,
|
||||
@@ -26,8 +33,16 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
|
||||
Runnable onSuccess,
|
||||
Runnable onFailure
|
||||
) {
|
||||
|
||||
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
EventFlow closeEvent = new EventFlow()
|
||||
.listen(
|
||||
NetworkEvents.CloseClient.class,
|
||||
e -> {
|
||||
if (e.clientId() == id) scheduler.shutdownNow();
|
||||
}, "close");
|
||||
|
||||
Runnable connectTask = new Runnable() {
|
||||
int attempts = 0;
|
||||
|
||||
@@ -44,6 +59,7 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
|
||||
nClient.connect(id, nConnector.host(), nConnector.port());
|
||||
networkClients.put(id, nClient);
|
||||
logger.info("New client started successfully for {}:{}", nConnector.host(), nConnector.port());
|
||||
eventBus.post(new NetworkEvents.ConnectTry(id, attempts, nConnector.reconnectAttempts(), true));
|
||||
onSuccess.run();
|
||||
scheduler.shutdown();
|
||||
} catch (CouldNotConnectException e) {
|
||||
@@ -51,14 +67,17 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
|
||||
if (attempts < nConnector.reconnectAttempts()) {
|
||||
logger.warn("Could not connect to {}:{}. Retrying in {} {}",
|
||||
nConnector.host(), nConnector.port(), nConnector.timeout(), nConnector.timeUnit());
|
||||
eventBus.post(new NetworkEvents.ConnectTry(id, attempts, nConnector.reconnectAttempts(), false));
|
||||
scheduler.schedule(this, nConnector.timeout(), nConnector.timeUnit());
|
||||
} else {
|
||||
logger.error("Failed to start client for {}:{} after {} attempts", nConnector.host(), nConnector.port(), attempts);
|
||||
eventBus.post(new NetworkEvents.ConnectTry(id, -1, nConnector.reconnectAttempts(), false));
|
||||
onFailure.run();
|
||||
scheduler.shutdown();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Unexpected exception during startClient", e);
|
||||
eventBus.post(new NetworkEvents.ConnectTry(id, -1, nConnector.reconnectAttempts(), false));
|
||||
onFailure.run();
|
||||
scheduler.shutdown();
|
||||
}
|
||||
@@ -66,6 +85,8 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
|
||||
};
|
||||
|
||||
scheduler.schedule(connectTask, 0, TimeUnit.MILLISECONDS);
|
||||
//
|
||||
// closeEvent.unsubscribe("close");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -11,6 +11,7 @@ import io.netty.handler.codec.string.StringEncoder;
|
||||
import io.netty.util.CharsetUtil;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.toop.framework.eventbus.bus.EventBus;
|
||||
import org.toop.framework.networking.exceptions.CouldNotConnectException;
|
||||
import org.toop.framework.networking.handlers.NetworkingGameClientHandler;
|
||||
import org.toop.framework.networking.interfaces.NetworkingClient;
|
||||
@@ -19,9 +20,13 @@ import java.net.InetSocketAddress;
|
||||
|
||||
public class TournamentNetworkingClient implements NetworkingClient {
|
||||
private static final Logger logger = LogManager.getLogger(TournamentNetworkingClient.class);
|
||||
|
||||
private final EventBus eventBus;
|
||||
private Channel channel;
|
||||
|
||||
public TournamentNetworkingClient() {}
|
||||
public TournamentNetworkingClient(EventBus eventBus) {
|
||||
this.eventBus = eventBus;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getAddress() {
|
||||
@@ -40,7 +45,7 @@ public class TournamentNetworkingClient implements NetworkingClient {
|
||||
new ChannelInitializer<SocketChannel>() {
|
||||
@Override
|
||||
public void initChannel(SocketChannel ch) {
|
||||
NetworkingGameClientHandler handler = new NetworkingGameClientHandler(clientId);
|
||||
NetworkingGameClientHandler handler = new NetworkingGameClientHandler(eventBus, clientId);
|
||||
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
pipeline.addLast(new LineBasedFrameDecoder(1024)); // split at \n
|
||||
|
||||
@@ -4,6 +4,7 @@ import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import org.toop.annotations.AutoResponseResult;
|
||||
import org.toop.framework.eventbus.GlobalEventBus;
|
||||
import org.toop.framework.eventbus.events.*;
|
||||
import org.toop.framework.networking.interfaces.NetworkingClient;
|
||||
import org.toop.framework.networking.types.NetworkingConnector;
|
||||
@@ -11,7 +12,7 @@ import org.toop.framework.networking.types.NetworkingConnector;
|
||||
/**
|
||||
* Defines all event types related to the networking subsystem.
|
||||
* <p>
|
||||
* These events are used in conjunction with the {@link org.toop.framework.eventbus.GlobalEventBus}
|
||||
* These events are used in conjunction with the {@link GlobalEventBus}
|
||||
* and {@link org.toop.framework.eventbus.EventFlow} to communicate between components
|
||||
* such as networking clients, managers, and listeners.
|
||||
* </p>
|
||||
@@ -166,6 +167,10 @@ public class NetworkEvents extends EventsBase {
|
||||
long identifier)
|
||||
implements UniqueEvent {}
|
||||
|
||||
public record CreatedIdForClient(long clientId, long identifier) implements ResponseToUniqueEvent {}
|
||||
|
||||
public record ConnectTry(long clientId, int amount, int maxAmount, boolean success) implements GenericEvent {}
|
||||
|
||||
/**
|
||||
* Response confirming that a client has been successfully started.
|
||||
* <p>
|
||||
|
||||
@@ -2,19 +2,23 @@ package org.toop.framework.networking.handlers;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
|
||||
import java.util.regex.MatchResult;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.toop.framework.eventbus.EventFlow;
|
||||
import org.toop.framework.eventbus.bus.EventBus;
|
||||
import org.toop.framework.networking.events.NetworkEvents;
|
||||
|
||||
public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
|
||||
private static final Logger logger = LogManager.getLogger(NetworkingGameClientHandler.class);
|
||||
|
||||
private final EventBus eventBus;
|
||||
private final long connectionId;
|
||||
|
||||
public NetworkingGameClientHandler(long connectionId) {
|
||||
public NetworkingGameClientHandler(EventBus eventBus, long connectionId) {
|
||||
this.eventBus = eventBus;
|
||||
this.connectionId = connectionId;
|
||||
}
|
||||
|
||||
@@ -38,9 +42,7 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
|
||||
"Received SVR message from server-{}, data: {}",
|
||||
ctx.channel().remoteAddress(),
|
||||
msg);
|
||||
new EventFlow()
|
||||
.addPostEvent(new NetworkEvents.ServerResponse(this.connectionId))
|
||||
.asyncPostEvent();
|
||||
eventBus.post(new NetworkEvents.ServerResponse(this.connectionId));
|
||||
parseServerReturn(rec);
|
||||
return;
|
||||
}
|
||||
@@ -70,7 +72,7 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
|
||||
case "CHALLENGE":
|
||||
gameChallengeHandler(recSrvRemoved);
|
||||
return;
|
||||
case "WIN", "DRAW", "LOSE":
|
||||
case "WIN", "DRAW", "LOSS":
|
||||
gameWinConditionHandler(recSrvRemoved);
|
||||
return;
|
||||
default:
|
||||
@@ -111,25 +113,18 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
|
||||
.map(m -> m.group(1).trim())
|
||||
.toArray(String[]::new);
|
||||
|
||||
new EventFlow()
|
||||
.addPostEvent(
|
||||
new NetworkEvents.GameMoveResponse(
|
||||
this.connectionId, msg[0], msg[1], msg[2]))
|
||||
.asyncPostEvent();
|
||||
eventBus.post(new NetworkEvents.GameMoveResponse(this.connectionId, msg[0], msg[1], msg[2]));
|
||||
}
|
||||
|
||||
private void gameWinConditionHandler(String rec) {
|
||||
@SuppressWarnings("StreamToString")
|
||||
String condition =
|
||||
Pattern.compile("\\b(win|draw|lose)\\b", Pattern.CASE_INSENSITIVE)
|
||||
.matcher(rec)
|
||||
.results()
|
||||
.toString()
|
||||
.trim();
|
||||
String condition = Pattern.compile("\\b(win|draw|loss)\\b", Pattern.CASE_INSENSITIVE)
|
||||
.matcher(rec)
|
||||
.results()
|
||||
.map(MatchResult::group)
|
||||
.findFirst()
|
||||
.orElse("");
|
||||
|
||||
new EventFlow()
|
||||
.addPostEvent(new NetworkEvents.GameResultResponse(this.connectionId, condition))
|
||||
.asyncPostEvent();
|
||||
eventBus.post(new NetworkEvents.GameResultResponse(this.connectionId, condition));
|
||||
}
|
||||
|
||||
private void gameChallengeHandler(String rec) {
|
||||
@@ -144,17 +139,9 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
|
||||
.toArray(String[]::new);
|
||||
|
||||
if (isCancelled)
|
||||
new EventFlow()
|
||||
.addPostEvent(
|
||||
new NetworkEvents.ChallengeCancelledResponse(
|
||||
this.connectionId, msg[0]))
|
||||
.asyncPostEvent();
|
||||
eventBus.post(new NetworkEvents.GameResultResponse(this.connectionId, msg[0]));
|
||||
else
|
||||
new EventFlow()
|
||||
.addPostEvent(
|
||||
new NetworkEvents.ChallengeResponse(
|
||||
this.connectionId, msg[0], msg[1], msg[2]))
|
||||
.asyncPostEvent();
|
||||
eventBus.post(new NetworkEvents.ChallengeResponse(this.connectionId, msg[0], msg[1], msg[2]));
|
||||
} catch (ArrayIndexOutOfBoundsException e) {
|
||||
logger.error("Array out of bounds for: {}", rec, e);
|
||||
}
|
||||
@@ -170,11 +157,7 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
|
||||
.toArray(String[]::new);
|
||||
|
||||
// [0] playerToMove, [1] gameType, [2] opponent
|
||||
new EventFlow()
|
||||
.addPostEvent(
|
||||
new NetworkEvents.GameMatchResponse(
|
||||
this.connectionId, msg[0], msg[1], msg[2]))
|
||||
.asyncPostEvent();
|
||||
eventBus.post(new NetworkEvents.GameMatchResponse(this.connectionId, msg[0], msg[1], msg[2]));
|
||||
} catch (ArrayIndexOutOfBoundsException e) {
|
||||
logger.error("Array out of bounds for: {}", rec, e);
|
||||
}
|
||||
@@ -189,9 +172,7 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
|
||||
.toString()
|
||||
.trim();
|
||||
|
||||
new EventFlow()
|
||||
.addPostEvent(new NetworkEvents.YourTurnResponse(this.connectionId, msg))
|
||||
.asyncPostEvent();
|
||||
eventBus.post(new NetworkEvents.YourTurnResponse(this.connectionId, msg));
|
||||
}
|
||||
|
||||
private void playerlistHandler(String rec) {
|
||||
@@ -202,9 +183,7 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
|
||||
.map(m -> m.group(1).trim())
|
||||
.toArray(String[]::new);
|
||||
|
||||
new EventFlow()
|
||||
.addPostEvent(new NetworkEvents.PlayerlistResponse(this.connectionId, players))
|
||||
.asyncPostEvent();
|
||||
eventBus.post(new NetworkEvents.PlayerlistResponse(this.connectionId, players));
|
||||
}
|
||||
|
||||
private void gamelistHandler(String rec) {
|
||||
@@ -215,9 +194,7 @@ public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
|
||||
.map(m -> m.group(1).trim())
|
||||
.toArray(String[]::new);
|
||||
|
||||
new EventFlow()
|
||||
.addPostEvent(new NetworkEvents.GamelistResponse(this.connectionId, gameTypes))
|
||||
.asyncPostEvent();
|
||||
eventBus.post(new NetworkEvents.GamelistResponse(this.connectionId, gameTypes));
|
||||
}
|
||||
|
||||
private void helpHandler(String rec) {
|
||||
|
||||
@@ -92,11 +92,6 @@ public class ResourceManager {
|
||||
return asset.getResource();
|
||||
}
|
||||
|
||||
// @SuppressWarnings("unchecked")
|
||||
// public static <T extends BaseResource> ArrayList<ResourceMeta<T>> getAllOfType() {
|
||||
// return (ArrayList<ResourceMeta<T>>) (ArrayList<?>) new ArrayList<>(assets.values());
|
||||
// }
|
||||
|
||||
/**
|
||||
* Retrieve all assets of a specific resource type.
|
||||
*
|
||||
|
||||
@@ -8,7 +8,7 @@ import org.toop.framework.resource.types.LoadableResource;
|
||||
|
||||
@FileExtension({"png", "jpg", "jpeg"})
|
||||
public class ImageAsset extends BaseResource implements LoadableResource {
|
||||
private Image image;
|
||||
private Image image = null;
|
||||
|
||||
public ImageAsset(final File file) {
|
||||
super(file);
|
||||
@@ -40,8 +40,7 @@ public class ImageAsset extends BaseResource implements LoadableResource {
|
||||
public Image getImage() {
|
||||
if (!this.isLoaded) {
|
||||
this.load();
|
||||
return image;
|
||||
}
|
||||
return null;
|
||||
return image;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,24 +147,6 @@ public class LocalizationAsset extends BaseResource implements LoadableResource,
|
||||
return this.baseName;
|
||||
}
|
||||
|
||||
// /**
|
||||
// * Extracts the base name from a file name.
|
||||
// *
|
||||
// * @param fileName the file name
|
||||
// * @return base name without locale or extension
|
||||
// */
|
||||
// private String getBaseName(String fileName) {
|
||||
// int dotIndex = fileName.lastIndexOf('.');
|
||||
// String nameWithoutExtension = (dotIndex > 0) ? fileName.substring(0, dotIndex) :
|
||||
// fileName;
|
||||
//
|
||||
// int underscoreIndex = nameWithoutExtension.indexOf('_');
|
||||
// if (underscoreIndex > 0) {
|
||||
// return nameWithoutExtension.substring(0, underscoreIndex);
|
||||
// }
|
||||
// return nameWithoutExtension;
|
||||
// }
|
||||
|
||||
/**
|
||||
* Extracts a locale from a file name based on the pattern "base_LOCALE.properties".
|
||||
*
|
||||
|
||||
@@ -69,9 +69,4 @@ public interface BundledResource {
|
||||
* @return the base name used to identify this bundled resource
|
||||
*/
|
||||
String getBaseName();
|
||||
|
||||
// /**
|
||||
// Returns the name
|
||||
// */
|
||||
// String getDefaultName();
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package org.toop.framework.audio;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.toop.framework.dispatch.interfaces.Dispatcher;
|
||||
import org.toop.framework.eventbus.GlobalEventBus;
|
||||
import org.toop.framework.resource.resources.BaseResource;
|
||||
import org.toop.framework.resource.types.AudioResource;
|
||||
|
||||
@@ -94,7 +95,7 @@ public class MusicManagerTest {
|
||||
|
||||
List<MockAudioResource> resources = List.of(track1, track2, track3);
|
||||
|
||||
manager = new MusicManager<>(resources, dispatcher);
|
||||
manager = new MusicManager<>(GlobalEventBus.get(), resources, dispatcher);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -188,7 +189,7 @@ public class MusicManagerTest {
|
||||
manyTracks.add(new MockAudioResource("track" + i));
|
||||
}
|
||||
|
||||
MusicManager<MockAudioResource> multiManager = new MusicManager<>(manyTracks, dispatcher);
|
||||
MusicManager<MockAudioResource> multiManager = new MusicManager<>(GlobalEventBus.get(), manyTracks, dispatcher);
|
||||
|
||||
for (int i = 0; i < manyTracks.size() - 1; i++) {
|
||||
multiManager.play();
|
||||
|
||||
@@ -1,235 +1,222 @@
|
||||
// package org.toop.framework.eventbus;
|
||||
//
|
||||
// import org.junit.jupiter.api.Tag;
|
||||
// import org.junit.jupiter.api.Test;
|
||||
// import org.toop.framework.eventbus.events.UniqueEvent;
|
||||
//
|
||||
// import java.math.BigInteger;
|
||||
// import java.util.concurrent.*;
|
||||
// import java.util.concurrent.atomic.LongAdder;
|
||||
//
|
||||
// import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
//
|
||||
// class EventFlowStressTest {
|
||||
//
|
||||
// /** Top-level record to ensure runtime type matches subscription */
|
||||
// public record HeavyEvent(String payload, long eventSnowflake) implements UniqueEvent {
|
||||
// @Override
|
||||
// public java.util.Map<String, Object> result() {
|
||||
// return java.util.Map.of("payload", payload, "eventId", eventSnowflake);
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public long eventSnowflake() {
|
||||
// return this.eventSnowflake;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// public record HeavyEventSuccess(String payload, long eventSnowflake) implements
|
||||
// UniqueEvent {
|
||||
// @Override
|
||||
// public java.util.Map<String, Object> result() {
|
||||
// return java.util.Map.of("payload", payload, "eventId", eventSnowflake);
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public long eventSnowflake() {
|
||||
// return eventSnowflake;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// private static final int THREADS = 32;
|
||||
// private static final long EVENTS_PER_THREAD = 10_000_000;
|
||||
//
|
||||
// @Tag("stress")
|
||||
// @Test
|
||||
// void extremeConcurrencySendTest_progressWithMemory() throws InterruptedException {
|
||||
// LongAdder counter = new LongAdder();
|
||||
// ExecutorService executor = Executors.newFixedThreadPool(THREADS);
|
||||
//
|
||||
// BigInteger totalEvents = BigInteger.valueOf(THREADS)
|
||||
// .multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
|
||||
//
|
||||
// long startTime = System.currentTimeMillis();
|
||||
//
|
||||
// // Monitor thread for EPS and memory
|
||||
// Thread monitor = new Thread(() -> {
|
||||
// long lastCount = 0;
|
||||
// long lastTime = System.currentTimeMillis();
|
||||
// Runtime runtime = Runtime.getRuntime();
|
||||
//
|
||||
// while (counter.sum() < totalEvents.longValue()) {
|
||||
// try { Thread.sleep(200); } catch (InterruptedException ignored) {}
|
||||
//
|
||||
// long now = System.currentTimeMillis();
|
||||
// long completed = counter.sum();
|
||||
// long eventsThisPeriod = completed - lastCount;
|
||||
// double eps = eventsThisPeriod / ((now - lastTime) / 1000.0);
|
||||
//
|
||||
// long usedMemory = runtime.totalMemory() - runtime.freeMemory();
|
||||
// double usedPercent = usedMemory * 100.0 / runtime.maxMemory();
|
||||
//
|
||||
// System.out.printf(
|
||||
// "Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n",
|
||||
// completed,
|
||||
// totalEvents.longValue(),
|
||||
// completed * 100.0 / totalEvents.doubleValue(),
|
||||
// eps,
|
||||
// usedMemory / 1024.0 / 1024.0,
|
||||
// usedPercent
|
||||
// );
|
||||
//
|
||||
// lastCount = completed;
|
||||
// lastTime = now;
|
||||
// }
|
||||
// });
|
||||
// monitor.setDaemon(true);
|
||||
// monitor.start();
|
||||
//
|
||||
// var listener = new EventFlow().listen(HeavyEvent.class, _ -> counter.increment());
|
||||
//
|
||||
// // Submit events asynchronously
|
||||
// for (int t = 0; t < THREADS; t++) {
|
||||
// executor.submit(() -> {
|
||||
// for (int i = 0; i < EVENTS_PER_THREAD; i++) {
|
||||
// var _ = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i)
|
||||
// .asyncPostEvent();
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
//
|
||||
// executor.shutdown();
|
||||
// executor.awaitTermination(10, TimeUnit.MINUTES);
|
||||
//
|
||||
// listener.getResult();
|
||||
//
|
||||
// long endTime = System.currentTimeMillis();
|
||||
// double durationSeconds = (endTime - startTime) / 1000.0;
|
||||
//
|
||||
// System.out.println("Posted " + totalEvents + " events in " + durationSeconds + "
|
||||
// seconds");
|
||||
// double averageEps = totalEvents.doubleValue() / durationSeconds;
|
||||
// System.out.printf("Average EPS: %.0f%n", averageEps);
|
||||
//
|
||||
// assertEquals(totalEvents.longValue(), counter.sum());
|
||||
// }
|
||||
//
|
||||
// @Tag("stress")
|
||||
// @Test
|
||||
// void extremeConcurrencySendAndReturnTest_progressWithMemory() throws InterruptedException {
|
||||
// LongAdder counter = new LongAdder();
|
||||
// ExecutorService executor = Executors.newFixedThreadPool(THREADS);
|
||||
//
|
||||
// BigInteger totalEvents = BigInteger.valueOf(THREADS)
|
||||
// .multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
|
||||
//
|
||||
// long startTime = System.currentTimeMillis();
|
||||
//
|
||||
// // Monitor thread for EPS and memory
|
||||
// Thread monitor = new Thread(() -> {
|
||||
// long lastCount = 0;
|
||||
// long lastTime = System.currentTimeMillis();
|
||||
// Runtime runtime = Runtime.getRuntime();
|
||||
//
|
||||
// while (counter.sum() < totalEvents.longValue()) {
|
||||
// try { Thread.sleep(200); } catch (InterruptedException ignored) {}
|
||||
//
|
||||
// long now = System.currentTimeMillis();
|
||||
// long completed = counter.sum();
|
||||
// long eventsThisPeriod = completed - lastCount;
|
||||
// double eps = eventsThisPeriod / ((now - lastTime) / 1000.0);
|
||||
//
|
||||
// long usedMemory = runtime.totalMemory() - runtime.freeMemory();
|
||||
// double usedPercent = usedMemory * 100.0 / runtime.maxMemory();
|
||||
//
|
||||
// System.out.printf(
|
||||
// "Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n",
|
||||
// completed,
|
||||
// totalEvents.longValue(),
|
||||
// completed * 100.0 / totalEvents.doubleValue(),
|
||||
// eps,
|
||||
// usedMemory / 1024.0 / 1024.0,
|
||||
// usedPercent
|
||||
// );
|
||||
//
|
||||
// lastCount = completed;
|
||||
// lastTime = now;
|
||||
// }
|
||||
// });
|
||||
// monitor.setDaemon(true);
|
||||
// monitor.start();
|
||||
//
|
||||
// // Submit events asynchronously
|
||||
// for (int t = 0; t < THREADS; t++) {
|
||||
// executor.submit(() -> {
|
||||
// for (int i = 0; i < EVENTS_PER_THREAD; i++) {
|
||||
// var a = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i)
|
||||
// .onResponse(HeavyEventSuccess.class, _ -> counter.increment())
|
||||
// .postEvent();
|
||||
//
|
||||
// new EventFlow().addPostEvent(HeavyEventSuccess.class, "payload-" + i,
|
||||
// a.getEventSnowflake())
|
||||
// .postEvent();
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
//
|
||||
// executor.shutdown();
|
||||
// executor.awaitTermination(10, TimeUnit.MINUTES);
|
||||
//
|
||||
// long endTime = System.currentTimeMillis();
|
||||
// double durationSeconds = (endTime - startTime) / 1000.0;
|
||||
//
|
||||
// System.out.println("Posted " + totalEvents + " events in " + durationSeconds + "
|
||||
// seconds");
|
||||
// double averageEps = totalEvents.doubleValue() / durationSeconds;
|
||||
// System.out.printf("Average EPS: %.0f%n", averageEps);
|
||||
//
|
||||
// assertEquals(totalEvents.longValue(), counter.sum());
|
||||
// }
|
||||
//
|
||||
//
|
||||
// @Tag("stress")
|
||||
// @Test
|
||||
// void efficientExtremeConcurrencyTest() throws InterruptedException {
|
||||
// final int THREADS = Runtime.getRuntime().availableProcessors();
|
||||
// final int EVENTS_PER_THREAD = 5000;
|
||||
//
|
||||
// ExecutorService executor = Executors.newFixedThreadPool(THREADS);
|
||||
// ConcurrentLinkedQueue<HeavyEvent> processedEvents = new ConcurrentLinkedQueue<>();
|
||||
//
|
||||
// long start = System.nanoTime();
|
||||
//
|
||||
// for (int t = 0; t < THREADS; t++) {
|
||||
// executor.submit(() -> {
|
||||
// for (int i = 0; i < EVENTS_PER_THREAD; i++) {
|
||||
// new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i)
|
||||
// .onResponse(HeavyEvent.class, processedEvents::add)
|
||||
// .postEvent();
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
//
|
||||
// executor.shutdown();
|
||||
// executor.awaitTermination(10, TimeUnit.MINUTES);
|
||||
//
|
||||
// long end = System.nanoTime();
|
||||
// double durationSeconds = (end - start) / 1_000_000_000.0;
|
||||
//
|
||||
// BigInteger totalEvents = BigInteger.valueOf((long)
|
||||
// THREADS).multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
|
||||
// double eps = totalEvents.doubleValue() / durationSeconds;
|
||||
//
|
||||
// System.out.printf("Posted %s events in %.3f seconds%n", totalEvents, durationSeconds);
|
||||
// System.out.printf("Throughput: %.0f events/sec%n", eps);
|
||||
//
|
||||
// Runtime rt = Runtime.getRuntime();
|
||||
// System.out.printf("Used memory: %.2f MB%n", (rt.totalMemory() - rt.freeMemory()) / 1024.0
|
||||
// / 1024.0);
|
||||
//
|
||||
// assertEquals(totalEvents.intValue(), processedEvents.size());
|
||||
// }
|
||||
//
|
||||
package org.toop.framework.eventbus;
|
||||
|
||||
import org.junit.jupiter.api.Tag;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.toop.framework.eventbus.events.ResponseToUniqueEvent;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
class EventFlowStressTest {
|
||||
|
||||
public record HeavyEvent(String payload, long eventSnowflake) implements ResponseToUniqueEvent {
|
||||
@Override
|
||||
public long getIdentifier() {
|
||||
return eventSnowflake;
|
||||
}
|
||||
}
|
||||
|
||||
public record HeavyEventSuccess(String payload, long eventSnowflake) implements ResponseToUniqueEvent {
|
||||
@Override
|
||||
public long getIdentifier() {
|
||||
return eventSnowflake;
|
||||
}
|
||||
}
|
||||
|
||||
private static final int THREADS = 32;
|
||||
private static final long EVENTS_PER_THREAD = 10_000_000;
|
||||
|
||||
@Tag("stress")
|
||||
@Test
|
||||
void extremeConcurrencySendTest_progressWithMemory() throws InterruptedException {
|
||||
LongAdder counter = new LongAdder();
|
||||
ExecutorService executor = Executors.newFixedThreadPool(THREADS);
|
||||
|
||||
BigInteger totalEvents = BigInteger.valueOf(THREADS)
|
||||
.multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
Thread monitor = new Thread(() -> {
|
||||
long lastCount = 0;
|
||||
long lastTime = System.currentTimeMillis();
|
||||
Runtime runtime = Runtime.getRuntime();
|
||||
|
||||
while (counter.sum() < totalEvents.longValue()) {
|
||||
try { Thread.sleep(200); } catch (InterruptedException ignored) {}
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
long completed = counter.sum();
|
||||
long eventsThisPeriod = completed - lastCount;
|
||||
double eps = eventsThisPeriod / ((now - lastTime) / 1000.0);
|
||||
|
||||
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
|
||||
double usedPercent = usedMemory * 100.0 / runtime.maxMemory();
|
||||
|
||||
System.out.printf(
|
||||
"Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n",
|
||||
completed,
|
||||
totalEvents.longValue(),
|
||||
completed * 100.0 / totalEvents.doubleValue(),
|
||||
eps,
|
||||
usedMemory / 1024.0 / 1024.0,
|
||||
usedPercent
|
||||
);
|
||||
|
||||
lastCount = completed;
|
||||
lastTime = now;
|
||||
}
|
||||
});
|
||||
monitor.setDaemon(true);
|
||||
monitor.start();
|
||||
|
||||
var listener = new EventFlow().listen(HeavyEvent.class, _ -> counter.increment());
|
||||
|
||||
for (int t = 0; t < THREADS; t++) {
|
||||
executor.submit(() -> {
|
||||
for (int i = 0; i < EVENTS_PER_THREAD; i++) {
|
||||
var _ = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i)
|
||||
.postEvent();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
executor.shutdown();
|
||||
executor.awaitTermination(10, TimeUnit.MINUTES);
|
||||
|
||||
listener.getResult();
|
||||
|
||||
long endTime = System.currentTimeMillis();
|
||||
double durationSeconds = (endTime - startTime) / 1000.0;
|
||||
|
||||
System.out.println("Posted " + totalEvents + " events in " + durationSeconds + " seconds");
|
||||
double averageEps = totalEvents.doubleValue() / durationSeconds;
|
||||
System.out.printf("Average EPS: %.0f%n", averageEps);
|
||||
|
||||
assertEquals(totalEvents.longValue(), counter.sum());
|
||||
}
|
||||
|
||||
@Tag("stress")
|
||||
@Test
|
||||
void extremeConcurrencySendAndReturnTest_progressWithMemory() throws InterruptedException {
|
||||
LongAdder counter = new LongAdder();
|
||||
ExecutorService executor = Executors.newFixedThreadPool(THREADS);
|
||||
|
||||
BigInteger totalEvents = BigInteger.valueOf(THREADS)
|
||||
.multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
Thread monitor = new Thread(() -> {
|
||||
long lastCount = 0;
|
||||
long lastTime = System.currentTimeMillis();
|
||||
Runtime runtime = Runtime.getRuntime();
|
||||
|
||||
while (counter.sum() < totalEvents.longValue()) {
|
||||
try { Thread.sleep(500); } catch (InterruptedException ignored) {}
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
long completed = counter.sum();
|
||||
long eventsThisPeriod = completed - lastCount;
|
||||
double eps = eventsThisPeriod / ((now - lastTime) / 1000.0);
|
||||
|
||||
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
|
||||
double usedPercent = usedMemory * 100.0 / runtime.maxMemory();
|
||||
|
||||
System.out.printf(
|
||||
"Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n",
|
||||
completed,
|
||||
totalEvents.longValue(),
|
||||
completed * 100.0 / totalEvents.doubleValue(),
|
||||
eps,
|
||||
usedMemory / 1024.0 / 1024.0,
|
||||
usedPercent
|
||||
);
|
||||
|
||||
lastCount = completed;
|
||||
lastTime = now;
|
||||
}
|
||||
});
|
||||
monitor.setDaemon(true);
|
||||
monitor.start();
|
||||
|
||||
EventFlow sharedFlow = new EventFlow();
|
||||
sharedFlow.listen(HeavyEventSuccess.class, _ -> counter.increment(), false, "heavyEventSuccessListener");
|
||||
|
||||
for (int t = 0; t < THREADS; t++) {
|
||||
executor.submit(() -> {
|
||||
EventFlow threadFlow = new EventFlow();
|
||||
|
||||
for (int i = 0; i < EVENTS_PER_THREAD; i++) {
|
||||
var heavyEvent = threadFlow.addPostEvent(HeavyEvent.class, "payload-" + i)
|
||||
.postEvent();
|
||||
|
||||
threadFlow.addPostEvent(HeavyEventSuccess.class, "payload-" + i, heavyEvent.getEventSnowflake())
|
||||
.postEvent();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
executor.shutdown();
|
||||
executor.awaitTermination(10, TimeUnit.MINUTES);
|
||||
|
||||
long endTime = System.currentTimeMillis();
|
||||
double durationSeconds = (endTime - startTime) / 1000.0;
|
||||
|
||||
System.out.println("Posted " + totalEvents + " events in " + durationSeconds + " seconds");
|
||||
double averageEps = totalEvents.doubleValue() / durationSeconds;
|
||||
System.out.printf("Average EPS: %.0f%n", averageEps);
|
||||
|
||||
assertEquals(totalEvents.longValue(), counter.sum());
|
||||
}
|
||||
|
||||
@Tag("stress")
|
||||
@Test
|
||||
void efficientExtremeConcurrencyTest() throws InterruptedException {
|
||||
final int THREADS = Runtime.getRuntime().availableProcessors();
|
||||
final int EVENTS_PER_THREAD = 1_000_000;
|
||||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(THREADS);
|
||||
ConcurrentLinkedQueue<HeavyEvent> processedEvents = new ConcurrentLinkedQueue<>();
|
||||
|
||||
long start = System.nanoTime();
|
||||
|
||||
EventFlow sharedFlow = new EventFlow();
|
||||
sharedFlow.listen(HeavyEvent.class, processedEvents::add, false, "heavyEventListener");
|
||||
|
||||
for (int t = 0; t < THREADS; t++) {
|
||||
executor.submit(() -> {
|
||||
EventFlow threadFlow = new EventFlow();
|
||||
|
||||
for (int i = 0; i < EVENTS_PER_THREAD; i++) {
|
||||
threadFlow.addPostEvent(HeavyEvent.class, "payload-" + i)
|
||||
.postEvent();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
executor.shutdown();
|
||||
executor.awaitTermination(10, TimeUnit.MINUTES);
|
||||
|
||||
long end = System.nanoTime();
|
||||
double durationSeconds = (end - start) / 1_000_000_000.0;
|
||||
|
||||
BigInteger totalEvents = BigInteger.valueOf(THREADS)
|
||||
.multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
|
||||
double eps = totalEvents.doubleValue() / durationSeconds;
|
||||
|
||||
System.out.printf("Posted %s events in %.3f seconds%n", totalEvents, durationSeconds);
|
||||
System.out.printf("Throughput: %.0f events/sec%n", eps);
|
||||
|
||||
Runtime rt = Runtime.getRuntime();
|
||||
System.out.printf("Used memory: %.2f MB%n", (rt.totalMemory() - rt.freeMemory()) / 1024.0 / 1024.0);
|
||||
|
||||
assertEquals(totalEvents.intValue(), processedEvents.size());
|
||||
}
|
||||
|
||||
// @Tag("stress")
|
||||
// @Test
|
||||
// void constructorCacheVsReflection() throws Throwable {
|
||||
@@ -247,7 +234,6 @@
|
||||
// long endHandle = System.nanoTime();
|
||||
//
|
||||
// System.out.println("Reflection: " + (endReflect - startReflect) / 1_000_000 + " ms");
|
||||
// System.out.println("MethodHandle Cache: " + (endHandle - startHandle) / 1_000_000 + "
|
||||
// ms");
|
||||
// System.out.println("MethodHandle Cache: " + (endHandle - startHandle) / 1_000_000 + " ms");
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user