getListenerClass() {
+ return clazz;
+ }
+
+ public String getName() {
+ return name;
+ }
}
diff --git a/framework/src/main/java/org/toop/framework/networking/NetworkingClientEventListener.java b/framework/src/main/java/org/toop/framework/networking/NetworkingClientEventListener.java
index ef51a46..661dc02 100644
--- a/framework/src/main/java/org/toop/framework/networking/NetworkingClientEventListener.java
+++ b/framework/src/main/java/org/toop/framework/networking/NetworkingClientEventListener.java
@@ -17,25 +17,25 @@ public class NetworkingClientEventListener {
public NetworkingClientEventListener(NetworkingClientManager clientManager) {
this.clientManager = clientManager;
new EventFlow()
- .listen(this::handleStartClient)
- .listen(this::handleCommand)
- .listen(this::handleSendLogin)
- .listen(this::handleSendLogout)
- .listen(this::handleSendGetPlayerlist)
- .listen(this::handleSendGetGamelist)
- .listen(this::handleSendSubscribe)
- .listen(this::handleSendMove)
- .listen(this::handleSendChallenge)
- .listen(this::handleSendAcceptChallenge)
- .listen(this::handleSendForfeit)
- .listen(this::handleSendMessage)
- .listen(this::handleSendHelp)
- .listen(this::handleSendHelpForCommand)
- .listen(this::handleCloseClient)
- .listen(this::handleReconnect)
- .listen(this::handleChangeAddress)
- .listen(this::handleGetAllConnections)
- .listen(this::handleShutdownAll);
+ .listen(NetworkEvents.StartClient.class, this::handleStartClient, false)
+ .listen(NetworkEvents.SendCommand.class, this::handleCommand, false)
+ .listen(NetworkEvents.SendLogin.class, this::handleSendLogin, false)
+ .listen(NetworkEvents.SendLogout.class, this::handleSendLogout, false)
+ .listen(NetworkEvents.SendGetPlayerlist.class, this::handleSendGetPlayerlist, false)
+ .listen(NetworkEvents.SendGetGamelist.class, this::handleSendGetGamelist, false)
+ .listen(NetworkEvents.SendSubscribe.class, this::handleSendSubscribe, false)
+ .listen(NetworkEvents.SendMove.class, this::handleSendMove, false)
+ .listen(NetworkEvents.SendChallenge.class, this::handleSendChallenge, false)
+ .listen(NetworkEvents.SendAcceptChallenge.class, this::handleSendAcceptChallenge, false)
+ .listen(NetworkEvents.SendForfeit.class, this::handleSendForfeit, false)
+ .listen(NetworkEvents.SendMessage.class, this::handleSendMessage, false)
+ .listen(NetworkEvents.SendHelp.class, this::handleSendHelp, false)
+ .listen(NetworkEvents.SendHelpForCommand.class, this::handleSendHelpForCommand, false)
+ .listen(NetworkEvents.CloseClient.class, this::handleCloseClient, false)
+ .listen(NetworkEvents.Reconnect.class, this::handleReconnect, false)
+ .listen(NetworkEvents.ChangeAddress.class, this::handleChangeAddress, false)
+ .listen(NetworkEvents.RequestsAllClients.class, this::handleGetAllConnections, false)
+ .listen(NetworkEvents.ForceCloseAllClients.class, this::handleShutdownAll, false);
}
void handleStartClient(NetworkEvents.StartClient event) {
diff --git a/framework/src/main/java/org/toop/framework/networking/NetworkingClientManager.java b/framework/src/main/java/org/toop/framework/networking/NetworkingClientManager.java
index 02434fc..8357980 100644
--- a/framework/src/main/java/org/toop/framework/networking/NetworkingClientManager.java
+++ b/framework/src/main/java/org/toop/framework/networking/NetworkingClientManager.java
@@ -8,6 +8,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.toop.framework.eventbus.GlobalEventBus;
+import org.toop.framework.networking.events.NetworkEvents;
import org.toop.framework.networking.exceptions.ClientNotFoundException;
import org.toop.framework.networking.exceptions.CouldNotConnectException;
import org.toop.framework.networking.interfaces.NetworkingClient;
@@ -44,6 +46,7 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
nClient.connect(id, nConnector.host(), nConnector.port());
networkClients.put(id, nClient);
logger.info("New client started successfully for {}:{}", nConnector.host(), nConnector.port());
+ GlobalEventBus.post(new NetworkEvents.ConnectTry(id, attempts, nConnector.reconnectAttempts(), true));
onSuccess.run();
scheduler.shutdown();
} catch (CouldNotConnectException e) {
@@ -51,14 +54,17 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
if (attempts < nConnector.reconnectAttempts()) {
logger.warn("Could not connect to {}:{}. Retrying in {} {}",
nConnector.host(), nConnector.port(), nConnector.timeout(), nConnector.timeUnit());
+ GlobalEventBus.post(new NetworkEvents.ConnectTry(id, attempts, nConnector.reconnectAttempts(), false));
scheduler.schedule(this, nConnector.timeout(), nConnector.timeUnit());
} else {
logger.error("Failed to start client for {}:{} after {} attempts", nConnector.host(), nConnector.port(), attempts);
+ GlobalEventBus.post(new NetworkEvents.ConnectTry(id, -1, nConnector.reconnectAttempts(), false));
onFailure.run();
scheduler.shutdown();
}
} catch (Exception e) {
logger.error("Unexpected exception during startClient", e);
+ GlobalEventBus.post(new NetworkEvents.ConnectTry(id, -1, nConnector.reconnectAttempts(), false));
onFailure.run();
scheduler.shutdown();
}
diff --git a/framework/src/main/java/org/toop/framework/networking/events/NetworkEvents.java b/framework/src/main/java/org/toop/framework/networking/events/NetworkEvents.java
index ac3de68..f199634 100644
--- a/framework/src/main/java/org/toop/framework/networking/events/NetworkEvents.java
+++ b/framework/src/main/java/org/toop/framework/networking/events/NetworkEvents.java
@@ -181,6 +181,8 @@ public class NetworkEvents extends EventsBase {
public record StartClientResponse(long clientId, boolean successful, long identifier)
implements ResponseToUniqueEvent {}
+ public record ConnectTry(long clientId, int amount, int maxAmount, boolean success) implements GenericEvent {}
+
/**
* Requests reconnection of an existing client using its previous configuration.
*
diff --git a/framework/src/main/java/org/toop/framework/resource/resources/ImageAsset.java b/framework/src/main/java/org/toop/framework/resource/resources/ImageAsset.java
index 2e6b417..70e7987 100644
--- a/framework/src/main/java/org/toop/framework/resource/resources/ImageAsset.java
+++ b/framework/src/main/java/org/toop/framework/resource/resources/ImageAsset.java
@@ -8,7 +8,7 @@ import org.toop.framework.resource.types.LoadableResource;
@FileExtension({"png", "jpg", "jpeg"})
public class ImageAsset extends BaseResource implements LoadableResource {
- private Image image;
+ private Image image = null;
public ImageAsset(final File file) {
super(file);
@@ -40,8 +40,7 @@ public class ImageAsset extends BaseResource implements LoadableResource {
public Image getImage() {
if (!this.isLoaded) {
this.load();
- return image;
}
- return null;
+ return image;
}
}
diff --git a/framework/src/main/java/org/toop/framework/resource/resources/JsonAsset.java b/framework/src/main/java/org/toop/framework/resource/resources/JsonAsset.java
index c13849c..f8491af 100644
--- a/framework/src/main/java/org/toop/framework/resource/resources/JsonAsset.java
+++ b/framework/src/main/java/org/toop/framework/resource/resources/JsonAsset.java
@@ -14,7 +14,7 @@ public class JsonAsset extends BaseResource implements LoadableResource {
private T content;
private Class type;
- private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ private final Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();
public JsonAsset(File file, Class type) {
super(file);
diff --git a/framework/src/main/java/org/toop/framework/resource/resources/SettingsAsset.java b/framework/src/main/java/org/toop/framework/resource/resources/SettingsAsset.java
index 7728c9a..d5e0734 100644
--- a/framework/src/main/java/org/toop/framework/resource/resources/SettingsAsset.java
+++ b/framework/src/main/java/org/toop/framework/resource/resources/SettingsAsset.java
@@ -38,6 +38,22 @@ public class SettingsAsset extends JsonAsset {
return getContent().layoutSize;
}
+ public Boolean getTutorialFlag() {
+ return getContent().showTutorials;
+ }
+
+ public Boolean getFirstTTT() {
+ return getContent().firstTTT;
+ }
+
+ public Boolean getFirstConnect4() {
+ return getContent().firstConnect4;
+ }
+
+ public Boolean getFirstReversi() {
+ return getContent().firstReversi;
+ }
+
public void setVolume(int volume) {
getContent().volume = volume;
save();
@@ -72,4 +88,24 @@ public class SettingsAsset extends JsonAsset {
getContent().layoutSize = layoutSize;
save();
}
+
+ public void setTutorialFlag(boolean tutorialFlag) {
+ getContent().showTutorials = tutorialFlag;
+ save();
+ }
+
+ public void setFirstTTT(boolean firstTTT) {
+ getContent().firstTTT = firstTTT;
+ save();
+ }
+
+ public void setFirstConnect4(boolean firstConnect4) {
+ getContent().firstConnect4 = firstConnect4;
+ save();
+ }
+
+ public void setFirstReversi(boolean firstReversi) {
+ getContent().firstReversi = firstReversi;
+ save();
+ }
}
diff --git a/framework/src/main/java/org/toop/framework/settings/Settings.java b/framework/src/main/java/org/toop/framework/settings/Settings.java
index 052107c..d6e6101 100644
--- a/framework/src/main/java/org/toop/framework/settings/Settings.java
+++ b/framework/src/main/java/org/toop/framework/settings/Settings.java
@@ -8,4 +8,9 @@ public class Settings {
public int volume = 100;
public int fxVolume = 20;
public int musicVolume = 15;
+ public Boolean showTutorials;
+ public Boolean firstReversi;
+ public Boolean firstTTT;
+ public Boolean firstConnect4;
+
}
diff --git a/framework/src/test/java/org/toop/framework/eventbus/EventFlowStressTest.java b/framework/src/test/java/org/toop/framework/eventbus/EventFlowStressTest.java
index 9a9ec76..cd7d090 100644
--- a/framework/src/test/java/org/toop/framework/eventbus/EventFlowStressTest.java
+++ b/framework/src/test/java/org/toop/framework/eventbus/EventFlowStressTest.java
@@ -1,235 +1,222 @@
-// package org.toop.framework.eventbus;
-//
-// import org.junit.jupiter.api.Tag;
-// import org.junit.jupiter.api.Test;
-// import org.toop.framework.eventbus.events.UniqueEvent;
-//
-// import java.math.BigInteger;
-// import java.util.concurrent.*;
-// import java.util.concurrent.atomic.LongAdder;
-//
-// import static org.junit.jupiter.api.Assertions.assertEquals;
-//
-// class EventFlowStressTest {
-//
-// /** Top-level record to ensure runtime type matches subscription */
-// public record HeavyEvent(String payload, long eventSnowflake) implements UniqueEvent {
-// @Override
-// public java.util.Map result() {
-// return java.util.Map.of("payload", payload, "eventId", eventSnowflake);
-// }
-//
-// @Override
-// public long eventSnowflake() {
-// return this.eventSnowflake;
-// }
-// }
-//
-// public record HeavyEventSuccess(String payload, long eventSnowflake) implements
-// UniqueEvent {
-// @Override
-// public java.util.Map result() {
-// return java.util.Map.of("payload", payload, "eventId", eventSnowflake);
-// }
-//
-// @Override
-// public long eventSnowflake() {
-// return eventSnowflake;
-// }
-// }
-//
-// private static final int THREADS = 32;
-// private static final long EVENTS_PER_THREAD = 10_000_000;
-//
-// @Tag("stress")
-// @Test
-// void extremeConcurrencySendTest_progressWithMemory() throws InterruptedException {
-// LongAdder counter = new LongAdder();
-// ExecutorService executor = Executors.newFixedThreadPool(THREADS);
-//
-// BigInteger totalEvents = BigInteger.valueOf(THREADS)
-// .multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
-//
-// long startTime = System.currentTimeMillis();
-//
-// // Monitor thread for EPS and memory
-// Thread monitor = new Thread(() -> {
-// long lastCount = 0;
-// long lastTime = System.currentTimeMillis();
-// Runtime runtime = Runtime.getRuntime();
-//
-// while (counter.sum() < totalEvents.longValue()) {
-// try { Thread.sleep(200); } catch (InterruptedException ignored) {}
-//
-// long now = System.currentTimeMillis();
-// long completed = counter.sum();
-// long eventsThisPeriod = completed - lastCount;
-// double eps = eventsThisPeriod / ((now - lastTime) / 1000.0);
-//
-// long usedMemory = runtime.totalMemory() - runtime.freeMemory();
-// double usedPercent = usedMemory * 100.0 / runtime.maxMemory();
-//
-// System.out.printf(
-// "Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n",
-// completed,
-// totalEvents.longValue(),
-// completed * 100.0 / totalEvents.doubleValue(),
-// eps,
-// usedMemory / 1024.0 / 1024.0,
-// usedPercent
-// );
-//
-// lastCount = completed;
-// lastTime = now;
-// }
-// });
-// monitor.setDaemon(true);
-// monitor.start();
-//
-// var listener = new EventFlow().listen(HeavyEvent.class, _ -> counter.increment());
-//
-// // Submit events asynchronously
-// for (int t = 0; t < THREADS; t++) {
-// executor.submit(() -> {
-// for (int i = 0; i < EVENTS_PER_THREAD; i++) {
-// var _ = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i)
-// .asyncPostEvent();
-// }
-// });
-// }
-//
-// executor.shutdown();
-// executor.awaitTermination(10, TimeUnit.MINUTES);
-//
-// listener.getResult();
-//
-// long endTime = System.currentTimeMillis();
-// double durationSeconds = (endTime - startTime) / 1000.0;
-//
-// System.out.println("Posted " + totalEvents + " events in " + durationSeconds + "
-// seconds");
-// double averageEps = totalEvents.doubleValue() / durationSeconds;
-// System.out.printf("Average EPS: %.0f%n", averageEps);
-//
-// assertEquals(totalEvents.longValue(), counter.sum());
-// }
-//
-// @Tag("stress")
-// @Test
-// void extremeConcurrencySendAndReturnTest_progressWithMemory() throws InterruptedException {
-// LongAdder counter = new LongAdder();
-// ExecutorService executor = Executors.newFixedThreadPool(THREADS);
-//
-// BigInteger totalEvents = BigInteger.valueOf(THREADS)
-// .multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
-//
-// long startTime = System.currentTimeMillis();
-//
-// // Monitor thread for EPS and memory
-// Thread monitor = new Thread(() -> {
-// long lastCount = 0;
-// long lastTime = System.currentTimeMillis();
-// Runtime runtime = Runtime.getRuntime();
-//
-// while (counter.sum() < totalEvents.longValue()) {
-// try { Thread.sleep(200); } catch (InterruptedException ignored) {}
-//
-// long now = System.currentTimeMillis();
-// long completed = counter.sum();
-// long eventsThisPeriod = completed - lastCount;
-// double eps = eventsThisPeriod / ((now - lastTime) / 1000.0);
-//
-// long usedMemory = runtime.totalMemory() - runtime.freeMemory();
-// double usedPercent = usedMemory * 100.0 / runtime.maxMemory();
-//
-// System.out.printf(
-// "Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n",
-// completed,
-// totalEvents.longValue(),
-// completed * 100.0 / totalEvents.doubleValue(),
-// eps,
-// usedMemory / 1024.0 / 1024.0,
-// usedPercent
-// );
-//
-// lastCount = completed;
-// lastTime = now;
-// }
-// });
-// monitor.setDaemon(true);
-// monitor.start();
-//
-// // Submit events asynchronously
-// for (int t = 0; t < THREADS; t++) {
-// executor.submit(() -> {
-// for (int i = 0; i < EVENTS_PER_THREAD; i++) {
-// var a = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i)
-// .onResponse(HeavyEventSuccess.class, _ -> counter.increment())
-// .postEvent();
-//
-// new EventFlow().addPostEvent(HeavyEventSuccess.class, "payload-" + i,
-// a.getEventSnowflake())
-// .postEvent();
-// }
-// });
-// }
-//
-// executor.shutdown();
-// executor.awaitTermination(10, TimeUnit.MINUTES);
-//
-// long endTime = System.currentTimeMillis();
-// double durationSeconds = (endTime - startTime) / 1000.0;
-//
-// System.out.println("Posted " + totalEvents + " events in " + durationSeconds + "
-// seconds");
-// double averageEps = totalEvents.doubleValue() / durationSeconds;
-// System.out.printf("Average EPS: %.0f%n", averageEps);
-//
-// assertEquals(totalEvents.longValue(), counter.sum());
-// }
-//
-//
-// @Tag("stress")
-// @Test
-// void efficientExtremeConcurrencyTest() throws InterruptedException {
-// final int THREADS = Runtime.getRuntime().availableProcessors();
-// final int EVENTS_PER_THREAD = 5000;
-//
-// ExecutorService executor = Executors.newFixedThreadPool(THREADS);
-// ConcurrentLinkedQueue processedEvents = new ConcurrentLinkedQueue<>();
-//
-// long start = System.nanoTime();
-//
-// for (int t = 0; t < THREADS; t++) {
-// executor.submit(() -> {
-// for (int i = 0; i < EVENTS_PER_THREAD; i++) {
-// new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i)
-// .onResponse(HeavyEvent.class, processedEvents::add)
-// .postEvent();
-// }
-// });
-// }
-//
-// executor.shutdown();
-// executor.awaitTermination(10, TimeUnit.MINUTES);
-//
-// long end = System.nanoTime();
-// double durationSeconds = (end - start) / 1_000_000_000.0;
-//
-// BigInteger totalEvents = BigInteger.valueOf((long)
-// THREADS).multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
-// double eps = totalEvents.doubleValue() / durationSeconds;
-//
-// System.out.printf("Posted %s events in %.3f seconds%n", totalEvents, durationSeconds);
-// System.out.printf("Throughput: %.0f events/sec%n", eps);
-//
-// Runtime rt = Runtime.getRuntime();
-// System.out.printf("Used memory: %.2f MB%n", (rt.totalMemory() - rt.freeMemory()) / 1024.0
-// / 1024.0);
-//
-// assertEquals(totalEvents.intValue(), processedEvents.size());
-// }
-//
+ package org.toop.framework.eventbus;
+
+ import org.junit.jupiter.api.Tag;
+ import org.junit.jupiter.api.Test;
+ import org.toop.framework.eventbus.events.ResponseToUniqueEvent;
+
+ import java.math.BigInteger;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.LongAdder;
+
+ import static org.junit.jupiter.api.Assertions.assertEquals;
+
+ class EventFlowStressTest {
+
+ public record HeavyEvent(String payload, long eventSnowflake) implements ResponseToUniqueEvent {
+ @Override
+ public long getIdentifier() {
+ return eventSnowflake;
+ }
+ }
+
+ public record HeavyEventSuccess(String payload, long eventSnowflake) implements ResponseToUniqueEvent {
+ @Override
+ public long getIdentifier() {
+ return eventSnowflake;
+ }
+ }
+
+ private static final int THREADS = 32;
+ private static final long EVENTS_PER_THREAD = 10_000_000;
+
+ @Tag("stress")
+ @Test
+ void extremeConcurrencySendTest_progressWithMemory() throws InterruptedException {
+ LongAdder counter = new LongAdder();
+ ExecutorService executor = Executors.newFixedThreadPool(THREADS);
+
+ BigInteger totalEvents = BigInteger.valueOf(THREADS)
+ .multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
+
+ long startTime = System.currentTimeMillis();
+
+ Thread monitor = new Thread(() -> {
+ long lastCount = 0;
+ long lastTime = System.currentTimeMillis();
+ Runtime runtime = Runtime.getRuntime();
+
+ while (counter.sum() < totalEvents.longValue()) {
+ try { Thread.sleep(200); } catch (InterruptedException ignored) {}
+
+ long now = System.currentTimeMillis();
+ long completed = counter.sum();
+ long eventsThisPeriod = completed - lastCount;
+ double eps = eventsThisPeriod / ((now - lastTime) / 1000.0);
+
+ long usedMemory = runtime.totalMemory() - runtime.freeMemory();
+ double usedPercent = usedMemory * 100.0 / runtime.maxMemory();
+
+ System.out.printf(
+ "Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n",
+ completed,
+ totalEvents.longValue(),
+ completed * 100.0 / totalEvents.doubleValue(),
+ eps,
+ usedMemory / 1024.0 / 1024.0,
+ usedPercent
+ );
+
+ lastCount = completed;
+ lastTime = now;
+ }
+ });
+ monitor.setDaemon(true);
+ monitor.start();
+
+ var listener = new EventFlow().listen(HeavyEvent.class, _ -> counter.increment());
+
+ for (int t = 0; t < THREADS; t++) {
+ executor.submit(() -> {
+ for (int i = 0; i < EVENTS_PER_THREAD; i++) {
+ var _ = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i)
+ .asyncPostEvent();
+ }
+ });
+ }
+
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.MINUTES);
+
+ listener.getResult();
+
+ long endTime = System.currentTimeMillis();
+ double durationSeconds = (endTime - startTime) / 1000.0;
+
+ System.out.println("Posted " + totalEvents + " events in " + durationSeconds + " seconds");
+ double averageEps = totalEvents.doubleValue() / durationSeconds;
+ System.out.printf("Average EPS: %.0f%n", averageEps);
+
+ assertEquals(totalEvents.longValue(), counter.sum());
+ }
+
+ @Tag("stress")
+ @Test
+ void extremeConcurrencySendAndReturnTest_progressWithMemory() throws InterruptedException {
+ LongAdder counter = new LongAdder();
+ ExecutorService executor = Executors.newFixedThreadPool(THREADS);
+
+ BigInteger totalEvents = BigInteger.valueOf(THREADS)
+ .multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
+
+ long startTime = System.currentTimeMillis();
+
+ Thread monitor = new Thread(() -> {
+ long lastCount = 0;
+ long lastTime = System.currentTimeMillis();
+ Runtime runtime = Runtime.getRuntime();
+
+ while (counter.sum() < totalEvents.longValue()) {
+ try { Thread.sleep(500); } catch (InterruptedException ignored) {}
+
+ long now = System.currentTimeMillis();
+ long completed = counter.sum();
+ long eventsThisPeriod = completed - lastCount;
+ double eps = eventsThisPeriod / ((now - lastTime) / 1000.0);
+
+ long usedMemory = runtime.totalMemory() - runtime.freeMemory();
+ double usedPercent = usedMemory * 100.0 / runtime.maxMemory();
+
+ System.out.printf(
+ "Progress: %d/%d (%.2f%%), EPS: %.0f, Memory Used: %.2f MB (%.2f%%)%n",
+ completed,
+ totalEvents.longValue(),
+ completed * 100.0 / totalEvents.doubleValue(),
+ eps,
+ usedMemory / 1024.0 / 1024.0,
+ usedPercent
+ );
+
+ lastCount = completed;
+ lastTime = now;
+ }
+ });
+ monitor.setDaemon(true);
+ monitor.start();
+
+ EventFlow sharedFlow = new EventFlow();
+ sharedFlow.listen(HeavyEventSuccess.class, _ -> counter.increment(), false, "heavyEventSuccessListener");
+
+ for (int t = 0; t < THREADS; t++) {
+ executor.submit(() -> {
+ EventFlow threadFlow = new EventFlow();
+
+ for (int i = 0; i < EVENTS_PER_THREAD; i++) {
+ var heavyEvent = threadFlow.addPostEvent(HeavyEvent.class, "payload-" + i)
+ .postEvent();
+
+ threadFlow.addPostEvent(HeavyEventSuccess.class, "payload-" + i, heavyEvent.getEventSnowflake())
+ .postEvent();
+ }
+ });
+ }
+
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.MINUTES);
+
+ long endTime = System.currentTimeMillis();
+ double durationSeconds = (endTime - startTime) / 1000.0;
+
+ System.out.println("Posted " + totalEvents + " events in " + durationSeconds + " seconds");
+ double averageEps = totalEvents.doubleValue() / durationSeconds;
+ System.out.printf("Average EPS: %.0f%n", averageEps);
+
+ assertEquals(totalEvents.longValue(), counter.sum());
+ }
+
+ @Tag("stress")
+ @Test
+ void efficientExtremeConcurrencyTest() throws InterruptedException {
+ final int THREADS = Runtime.getRuntime().availableProcessors();
+ final int EVENTS_PER_THREAD = 1_000_000;
+
+ ExecutorService executor = Executors.newFixedThreadPool(THREADS);
+ ConcurrentLinkedQueue processedEvents = new ConcurrentLinkedQueue<>();
+
+ long start = System.nanoTime();
+
+ EventFlow sharedFlow = new EventFlow();
+ sharedFlow.listen(HeavyEvent.class, processedEvents::add, false, "heavyEventListener");
+
+ for (int t = 0; t < THREADS; t++) {
+ executor.submit(() -> {
+ EventFlow threadFlow = new EventFlow();
+
+ for (int i = 0; i < EVENTS_PER_THREAD; i++) {
+ threadFlow.addPostEvent(HeavyEvent.class, "payload-" + i)
+ .postEvent();
+ }
+ });
+ }
+
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.MINUTES);
+
+ long end = System.nanoTime();
+ double durationSeconds = (end - start) / 1_000_000_000.0;
+
+ BigInteger totalEvents = BigInteger.valueOf(THREADS)
+ .multiply(BigInteger.valueOf(EVENTS_PER_THREAD));
+ double eps = totalEvents.doubleValue() / durationSeconds;
+
+ System.out.printf("Posted %s events in %.3f seconds%n", totalEvents, durationSeconds);
+ System.out.printf("Throughput: %.0f events/sec%n", eps);
+
+ Runtime rt = Runtime.getRuntime();
+ System.out.printf("Used memory: %.2f MB%n", (rt.totalMemory() - rt.freeMemory()) / 1024.0 / 1024.0);
+
+ assertEquals(totalEvents.intValue(), processedEvents.size());
+ }
+
// @Tag("stress")
// @Test
// void constructorCacheVsReflection() throws Throwable {
@@ -247,7 +234,6 @@
// long endHandle = System.nanoTime();
//
// System.out.println("Reflection: " + (endReflect - startReflect) / 1_000_000 + " ms");
-// System.out.println("MethodHandle Cache: " + (endHandle - startHandle) / 1_000_000 + "
-// ms");
+// System.out.println("MethodHandle Cache: " + (endHandle - startHandle) / 1_000_000 + " ms");
// }
-// }
+ }