Initial parsing of server response

This commit is contained in:
lieght
2025-09-27 22:17:49 +02:00
committed by Bas Antonius de Jong
parent 27e7166ac7
commit a9e63b3fcc
15 changed files with 355 additions and 203 deletions

7
.gitignore vendored
View File

@@ -94,3 +94,10 @@ nb-configuration.xml
# Ignore Gradle build output directory # Ignore Gradle build output directory
build build
##############################
## Hanze
##############################
newgamesver-release-V1.jar
server.properties
gameserver.log

View File

@@ -2,6 +2,7 @@
<dictionary name="project"> <dictionary name="project">
<words> <words>
<w>aosp</w> <w>aosp</w>
<w>clid</w>
<w>dcompile</w> <w>dcompile</w>
<w>errorprone</w> <w>errorprone</w>
<w>gamelist</w> <w>gamelist</w>

2
.idea/misc.xml generated
View File

@@ -13,7 +13,7 @@
</list> </list>
</option> </option>
</component> </component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_24" project-jdk-name="openjdk-25" project-jdk-type="JavaSDK"> <component name="ProjectRootManager" version="2" languageLevel="JDK_X" default="true" project-jdk-name="openjdk-25" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" /> <output url="file://$PROJECT_DIR$/out" />
</component> </component>
</project> </project>

View File

@@ -1,13 +1,54 @@
package org.toop; package org.toop;
import org.toop.app.gui.LocalServerSelector; import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.networking.events.NetworkEvents;
import org.toop.framework.networking.NetworkingClientManager; import org.toop.framework.networking.NetworkingClientManager;
import org.toop.framework.networking.NetworkingInitializationException; import org.toop.framework.networking.NetworkingInitializationException;
import org.toop.app.gui.LocalServerSelector;
import java.util.Arrays;
public class Main { public class Main {
static void main(String[] args) { static void main(String[] args) {
initSystems(); initSystems();
javax.swing.SwingUtilities.invokeLater(LocalServerSelector::new);
EventFlow a = new EventFlow()
.addPostEvent(
NetworkEvents.StartClient.class,
"127.0.0.1",
7789)
.onResponse(Main::login)
// .onResponse(Main::sendCommand)
// .onResponse(Main::closeClient)
.asyncPostEvent();
new Thread(() -> {
while (a.getResult() == null) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {}
}
long clid = (Long) a.getResult().get("clientId");
new EventFlow()
.addPostEvent(new NetworkEvents.SendCommand(clid, "get playerlist"))
.listen(NetworkEvents.PlayerListResponse.class, response -> {
if (response.clientId() == clid) System.out.println(Arrays.toString(response.playerlist()));
})
.asyncPostEvent();
}).start();
new Thread(() -> javax.swing.SwingUtilities.invokeLater(LocalServerSelector::new)).start();
}
private static void login(NetworkEvents.StartClientResponse event) {
new Thread(() -> {
try {
Thread.sleep(1000);
new EventFlow()
.addPostEvent(new NetworkEvents.SendCommand(event.clientId(), "login bas"))
.asyncPostEvent();
} catch (InterruptedException e) {}
}).start();
} }
private static void initSystems() throws NetworkingInitializationException { private static void initSystems() throws NetworkingInitializationException {

View File

@@ -54,14 +54,15 @@ public class RemoteGameSelector {
&& !ipTextField.getText().isEmpty() && !ipTextField.getText().isEmpty()
&& !portTextField.getText().isEmpty()) { && !portTextField.getText().isEmpty()) {
AtomicReference<String> clientId = new AtomicReference<>(); AtomicReference<Long> clientId = new AtomicReference<>();
new EventFlow().addPostEvent( new EventFlow().addPostEvent(
NetworkEvents.StartClient.class, NetworkEvents.StartClient.class,
(Supplier<NetworkingGameClientHandler>) NetworkingGameClientHandler::new, (Supplier<NetworkingGameClientHandler>)
new NetworkingGameClientHandler(clientId.get()),
"127.0.0.1", "127.0.0.1",
5001 5001
).onResponse( ).onResponse(
NetworkEvents.StartClientSuccess.class, NetworkEvents.StartClientResponse.class,
(response) -> { (response) -> {
clientId.set(response.clientId()); clientId.set(response.clientId());
} }

View File

@@ -33,7 +33,7 @@ public class LocalTicTacToe { // TODO: Implement runnable
private boolean isLocal; private boolean isLocal;
private String gameId; private String gameId;
private String connectionId = null; private long connectionId = -1;
private String serverId = null; private String serverId = null;
private boolean[] isAiPlayer = new boolean[2]; private boolean[] isAiPlayer = new boolean[2];
@@ -74,7 +74,7 @@ public class LocalTicTacToe { // TODO: Implement runnable
// this.receivedMessageListener = // this.receivedMessageListener =
// GlobalEventBus.subscribe(this::receiveMessageAction); // GlobalEventBus.subscribe(this::receiveMessageAction);
// GlobalEventBus.subscribe(this.receivedMessageListener); // GlobalEventBus.subscribe(this.receivedMessageListener);
this.connectionId = this.createConnection(ip, port); // this.connectionId = this.createConnection(ip, port); TODO: Refactor this
this.createGame("X", "O"); this.createGame("X", "O");
this.isLocal = false; this.isLocal = false;
//this.executor.submit(this::remoteGameThread); //this.executor.submit(this::remoteGameThread);
@@ -101,19 +101,6 @@ public class LocalTicTacToe { // TODO: Implement runnable
return new LocalTicTacToe(ip, port); return new LocalTicTacToe(ip, port);
} }
private String createConnection(String ip, int port) {
CompletableFuture<String> connectionIdFuture = new CompletableFuture<>();
new EventFlow().addPostEvent(NetworkEvents.StartClientRequest.class,
(Supplier<NetworkingGameClientHandler>) NetworkingGameClientHandler::new,
ip, port, connectionIdFuture).asyncPostEvent(); // TODO: what if server couldn't be started with port.
try {
return connectionIdFuture.get();
} catch (InterruptedException | ExecutionException e) {
logger.error("Error getting connection ID", e);
}
return null;
}
private void createGame(String nameA, String nameB) { private void createGame(String nameA, String nameB) {
nameA = nameA.trim().replace(" ", "-"); nameA = nameA.trim().replace(" ", "-");
nameB = nameB.trim().replace(" ", "-"); nameB = nameB.trim().replace(" ", "-");
@@ -223,7 +210,7 @@ public class LocalTicTacToe { // TODO: Implement runnable
} }
private void receiveMessageAction(NetworkEvents.ReceivedMessage receivedMessage) { private void receiveMessageAction(NetworkEvents.ReceivedMessage receivedMessage) {
if (!receivedMessage.ConnectionUuid().equals(this.connectionId)) { if (receivedMessage.ConnectionId() != this.connectionId) {
return; return;
} }

View File

@@ -1,10 +1,12 @@
package org.toop.framework.eventbus; package org.toop.framework;
import java.net.NetworkInterface;
import java.time.Instant;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
public class SnowflakeGenerator { public class SnowflakeGenerator {
// Epoch start (choose your custom epoch to reduce bits wasted on old time) private static final long EPOCH = Instant.parse("2025-01-01T00:00:00Z").toEpochMilli();
private static final long EPOCH = 1700000000000L; // ~2023-11-15
// Bit allocations // Bit allocations
private static final long TIMESTAMP_BITS = 41; private static final long TIMESTAMP_BITS = 41;
@@ -14,20 +16,36 @@ public class SnowflakeGenerator {
// Max values // Max values
private static final long MAX_MACHINE_ID = (1L << MACHINE_BITS) - 1; private static final long MAX_MACHINE_ID = (1L << MACHINE_BITS) - 1;
private static final long MAX_SEQUENCE = (1L << SEQUENCE_BITS) - 1; private static final long MAX_SEQUENCE = (1L << SEQUENCE_BITS) - 1;
private static final long MAX_TIMESTAMP = (1L << TIMESTAMP_BITS) - 1;
// Bit shifts // Bit shifts
private static final long MACHINE_SHIFT = SEQUENCE_BITS; private static final long MACHINE_SHIFT = SEQUENCE_BITS;
private static final long TIMESTAMP_SHIFT = SEQUENCE_BITS + MACHINE_BITS; private static final long TIMESTAMP_SHIFT = SEQUENCE_BITS + MACHINE_BITS;
private final long machineId; private static final long machineId = SnowflakeGenerator.genMachineId();
private final AtomicLong lastTimestamp = new AtomicLong(-1L); private final AtomicLong lastTimestamp = new AtomicLong(-1L);
private long sequence = 0L; private long sequence = 0L;
public SnowflakeGenerator(long machineId) { private static long genMachineId() {
try {
StringBuilder sb = new StringBuilder();
for (NetworkInterface ni : Collections.list(NetworkInterface.getNetworkInterfaces())) {
byte[] mac = ni.getHardwareAddress();
if (mac != null) {
for (byte b : mac) sb.append(String.format("%02X", b));
}
}
// limit to 10 bits (01023)
return sb.toString().hashCode() & 0x3FF;
} catch (Exception e) {
return (long) (Math.random() * 1024); // fallback
}
}
public SnowflakeGenerator() {
if (machineId < 0 || machineId > MAX_MACHINE_ID) { if (machineId < 0 || machineId > MAX_MACHINE_ID) {
throw new IllegalArgumentException("Machine ID must be between 0 and " + MAX_MACHINE_ID); throw new IllegalArgumentException("Machine ID must be between 0 and " + MAX_MACHINE_ID);
} }
this.machineId = machineId;
} }
public synchronized long nextId() { public synchronized long nextId() {
@@ -37,6 +55,10 @@ public class SnowflakeGenerator {
throw new IllegalStateException("Clock moved backwards. Refusing to generate id."); throw new IllegalStateException("Clock moved backwards. Refusing to generate id.");
} }
if (currentTimestamp > MAX_TIMESTAMP) {
throw new IllegalStateException("Timestamp bits overflow, Snowflake expired.");
}
if (currentTimestamp == lastTimestamp.get()) { if (currentTimestamp == lastTimestamp.get()) {
sequence = (sequence + 1) & MAX_SEQUENCE; sequence = (sequence + 1) & MAX_SEQUENCE;
if (sequence == 0) { if (sequence == 0) {

View File

@@ -1,15 +1,18 @@
package org.toop.framework.eventbus; package org.toop.framework.eventbus;
import org.toop.framework.SnowflakeGenerator;
import org.toop.framework.eventbus.events.EventType; import org.toop.framework.eventbus.events.EventType;
import org.toop.framework.eventbus.events.EventWithSnowflake; import org.toop.framework.eventbus.events.EventWithSnowflake;
import org.toop.framework.eventbus.SnowflakeGenerator;
import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType; import java.lang.invoke.MethodType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier;
/** /**
* EventFlow is a utility class for creating, posting, and optionally subscribing to events * EventFlow is a utility class for creating, posting, and optionally subscribing to events
@@ -22,6 +25,8 @@ import java.util.function.Consumer;
*/ */
public class EventFlow { public class EventFlow {
/** Lookup object used for dynamically invoking constructors via MethodHandles. */ /** Lookup object used for dynamically invoking constructors via MethodHandles. */
private static final MethodHandles.Lookup LOOKUP = MethodHandles.lookup(); private static final MethodHandles.Lookup LOOKUP = MethodHandles.lookup();
@@ -35,10 +40,7 @@ public class EventFlow {
private EventType event = null; private EventType event = null;
/** The listener returned by GlobalEventBus subscription. Used for unsubscription. */ /** The listener returned by GlobalEventBus subscription. Used for unsubscription. */
private Object listener; private final List<ListenerHandler> listeners = new ArrayList<>();
/** Flag indicating whether to automatically unsubscribe the listener after success. */
private boolean unsubscribeAfterSuccess = false;
/** Holds the results returned from the subscribed event, if any. */ /** Holds the results returned from the subscribed event, if any. */
private Map<String, Object> result = null; private Map<String, Object> result = null;
@@ -46,9 +48,19 @@ public class EventFlow {
/** Empty constructor (event must be added via {@link #addPostEvent(Class, Object...)}). */ /** Empty constructor (event must be added via {@link #addPostEvent(Class, Object...)}). */
public EventFlow() {} public EventFlow() {}
/** // New: accept an event instance directly
* Instantiate an event of the given class and store it in this publisher. public EventFlow addPostEvent(EventType event) {
*/ this.event = event;
return this;
}
// Optional: accept a Supplier<EventType> to defer construction
public EventFlow addPostEvent(Supplier<? extends EventType> eventSupplier) {
this.event = eventSupplier.get();
return this;
}
// Keep the old class+args version if needed
public <T extends EventType> EventFlow addPostEvent(Class<T> eventClass, Object... args) { public <T extends EventType> EventFlow addPostEvent(Class<T> eventClass, Object... args) {
try { try {
boolean isUuidEvent = EventWithSnowflake.class.isAssignableFrom(eventClass); boolean isUuidEvent = EventWithSnowflake.class.isAssignableFrom(eventClass);
@@ -67,7 +79,7 @@ public class EventFlow {
int expectedParamCount = ctorHandle.type().parameterCount(); int expectedParamCount = ctorHandle.type().parameterCount();
if (isUuidEvent && args.length < expectedParamCount) { if (isUuidEvent && args.length < expectedParamCount) {
this.eventSnowflake = new SnowflakeGenerator(1).nextId(); this.eventSnowflake = new SnowflakeGenerator().nextId();
finalArgs = new Object[args.length + 1]; finalArgs = new Object[args.length + 1];
System.arraycopy(args, 0, finalArgs, 0, args.length); System.arraycopy(args, 0, finalArgs, 0, args.length);
finalArgs[args.length] = this.eventSnowflake; finalArgs[args.length] = this.eventSnowflake;
@@ -86,124 +98,132 @@ public class EventFlow {
} }
} }
// public EventFlow addSnowflake() {
// this.eventSnowflake = new SnowflakeGenerator(1).nextId();
// return this;
// }
/** /**
* Start listening for a response event type, chainable with perform(). * Subscribe by ID: only fires if UUID matches this publisher's eventId.
*/ */
public <TT extends EventType> ResponseBuilder<TT> onResponse(Class<TT> eventClass) { public <TT extends EventWithSnowflake> EventFlow onResponse(Class<TT> eventClass, Consumer<TT> action,
return new ResponseBuilder<>(this, eventClass); boolean unsubscribeAfterSuccess) {
} ListenerHandler[] listenerHolder = new ListenerHandler[1];
listenerHolder[0] = new ListenerHandler(
GlobalEventBus.subscribe(eventClass, event -> {
if (event.eventSnowflake() != this.eventSnowflake) return;
public static class ResponseBuilder<R extends EventType> { action.accept(event);
private final EventFlow parent;
private final Class<R> responseClass;
ResponseBuilder(EventFlow parent, Class<R> responseClass) { if (unsubscribeAfterSuccess && listenerHolder[0] != null) {
this.parent = parent; GlobalEventBus.unsubscribe(listenerHolder[0]);
this.responseClass = responseClass; this.listeners.remove(listenerHolder[0]);
} }
/** Finalize the subscription */ this.result = event.result();
public EventFlow perform(Consumer<R> action) { })
parent.listener = GlobalEventBus.subscribe(responseClass, event -> { );
action.accept(responseClass.cast(event)); this.listeners.add(listenerHolder[0]);
if (parent.unsubscribeAfterSuccess && parent.listener != null) { return this;
GlobalEventBus.unsubscribe(parent.listener);
}
});
return parent;
}
} }
/** /**
* Subscribe by ID: only fires if UUID matches this publisher's eventId. * Subscribe by ID: only fires if UUID matches this publisher's eventId.
*/ */
public <TT extends EventWithSnowflake> EventFlow onResponse(Class<TT> eventClass, Consumer<TT> action) { public <TT extends EventWithSnowflake> EventFlow onResponse(Class<TT> eventClass, Consumer<TT> action) {
this.listener = GlobalEventBus.subscribe(eventClass, event -> { return this.onResponse(eventClass, action, true);
if (event.eventSnowflake() == this.eventSnowflake) {
action.accept(event);
if (unsubscribeAfterSuccess && listener != null) {
GlobalEventBus.unsubscribe(listener);
}
this.result = event.result();
}
});
return this;
} }
/** /**
* Subscribe by ID without explicit class. * Subscribe by ID without explicit class.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <TT extends EventWithSnowflake> EventFlow onResponse(Consumer<TT> action) { public <TT extends EventWithSnowflake> EventFlow onResponse(Consumer<TT> action, boolean unsubscribeAfterSuccess) {
this.listener = GlobalEventBus.subscribe(event -> { ListenerHandler[] listenerHolder = new ListenerHandler[1];
if (event instanceof EventWithSnowflake uuidEvent) { listenerHolder[0] = new ListenerHandler(
GlobalEventBus.subscribe(event -> {
if (!(event instanceof EventWithSnowflake uuidEvent)) return;
if (uuidEvent.eventSnowflake() == this.eventSnowflake) { if (uuidEvent.eventSnowflake() == this.eventSnowflake) {
try { try {
TT typedEvent = (TT) uuidEvent; TT typedEvent = (TT) uuidEvent;
action.accept(typedEvent); action.accept(typedEvent);
if (unsubscribeAfterSuccess && listener != null) { if (unsubscribeAfterSuccess && listenerHolder[0] != null) {
GlobalEventBus.unsubscribe(listener); GlobalEventBus.unsubscribe(listenerHolder[0]);
this.listeners.remove(listenerHolder[0]);
} }
this.result = typedEvent.result(); this.result = typedEvent.result();
} catch (ClassCastException ignored) {} } catch (ClassCastException _) {
throw new ClassCastException("Cannot cast " + event.getClass().getName() +
" to EventWithSnowflake");
}
} }
} })
}); );
this.listeners.add(listenerHolder[0]);
return this; return this;
} }
// choose event type public <TT extends EventWithSnowflake> EventFlow onResponse(Consumer<TT> action) {
public <TT extends EventType> EventSubscriberBuilder<TT> onEvent(Class<TT> eventClass) { return this.onResponse(action, true);
return new EventSubscriberBuilder<>(this, eventClass); }
public <TT extends EventType> EventFlow listen(Class<TT> eventClass, Consumer<TT> action,
boolean unsubscribeAfterSuccess) {
ListenerHandler[] listenerHolder = new ListenerHandler[1];
listenerHolder[0] = new ListenerHandler(
GlobalEventBus.subscribe(eventClass, event -> {
action.accept(event);
if (unsubscribeAfterSuccess && listenerHolder[0] != null) {
GlobalEventBus.unsubscribe(listenerHolder[0]);
this.listeners.remove(listenerHolder[0]);
}
})
);
this.listeners.add(listenerHolder[0]);
return this;
} }
// One-liner shorthand
public <TT extends EventType> EventFlow listen(Class<TT> eventClass, Consumer<TT> action) { public <TT extends EventType> EventFlow listen(Class<TT> eventClass, Consumer<TT> action) {
return this.onEvent(eventClass).perform(action); return this.listen(eventClass, action, true);
} }
// Builder for chaining .onEvent(...).perform(...) @SuppressWarnings("unchecked")
public static class EventSubscriberBuilder<TT extends EventType> { public <TT extends EventType> EventFlow listen(Consumer<TT> action, boolean unsubscribeAfterSuccess) {
private final EventFlow publisher; ListenerHandler[] listenerHolder = new ListenerHandler[1];
private final Class<TT> eventClass; listenerHolder[0] = new ListenerHandler(
GlobalEventBus.subscribe(event -> {
if (!(event instanceof EventType nonUuidEvent)) return;
try {
TT typedEvent = (TT) nonUuidEvent;
action.accept(typedEvent);
if (unsubscribeAfterSuccess && listenerHolder[0] != null) {
GlobalEventBus.unsubscribe(listenerHolder[0]);
this.listeners.remove(listenerHolder[0]);
}
} catch (ClassCastException _) {
throw new ClassCastException("Cannot cast " + event.getClass().getName() +
" to EventWithSnowflake");
}
})
);
this.listeners.add(listenerHolder[0]);
return this;
}
EventSubscriberBuilder(EventFlow publisher, Class<TT> eventClass) { public <TT extends EventType> EventFlow listen(Consumer<TT> action) {
this.publisher = publisher; return this.listen(action, true);
this.eventClass = eventClass;
}
public EventFlow perform(Consumer<TT> action) {
publisher.listener = GlobalEventBus.subscribe(eventClass, event -> {
action.accept(eventClass.cast(event));
if (publisher.unsubscribeAfterSuccess && publisher.listener != null) {
GlobalEventBus.unsubscribe(publisher.listener);
}
});
return publisher;
}
} }
/** Post synchronously */ /** Post synchronously */
public EventFlow postEvent() { public EventFlow postEvent() {
GlobalEventBus.post(event); GlobalEventBus.post(this.event);
return this; return this;
} }
/** Post asynchronously */ /** Post asynchronously */
public EventFlow asyncPostEvent() { public EventFlow asyncPostEvent() {
GlobalEventBus.postAsync(event); GlobalEventBus.postAsync(this.event);
return this;
}
public EventFlow unsubscribeAfterSuccess() {
this.unsubscribeAfterSuccess = true;
return this;
}
public EventFlow unsubscribeNow() {
if (unsubscribeAfterSuccess && listener != null) {
GlobalEventBus.unsubscribe(listener);
}
return this; return this;
} }
@@ -215,7 +235,11 @@ public class EventFlow {
return event; return event;
} }
public long getEventId() { public ListenerHandler[] getListeners() {
return listeners.toArray(new ListenerHandler[0]);
}
public long getEventSnowflake() {
return eventSnowflake; return eventSnowflake;
} }
} }

View File

@@ -0,0 +1,26 @@
package org.toop.framework.eventbus;
import org.toop.framework.eventbus.events.EventType;
public class ListenerHandler {
private Object listener = null;
// private boolean unsubscribeAfterSuccess = true;
// public ListenerHandler(Object listener, boolean unsubAfterSuccess) {
// this.listener = listener;
// this.unsubscribeAfterSuccess = unsubAfterSuccess;
// }
public ListenerHandler(Object listener) {
this.listener = listener;
}
public Object getListener() {
return this.listener;
}
// public boolean isUnsubscribeAfterSuccess() {
// return this.unsubscribeAfterSuccess;
// }
}

View File

@@ -7,6 +7,8 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@@ -15,33 +17,35 @@ import java.util.function.Supplier;
public class NetworkingClient { public class NetworkingClient {
private static final Logger logger = LogManager.getLogger(NetworkingClient.class); private static final Logger logger = LogManager.getLogger(NetworkingClient.class);
final Bootstrap bootstrap = new Bootstrap(); private long connectionId;
final EventLoopGroup workerGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
private String connectionUuid;
private Channel channel; private Channel channel;
private NetworkingGameClientHandler handler; private NetworkingGameClientHandler handler;
public NetworkingClient( public NetworkingClient(
Supplier<? extends NetworkingGameClientHandler> handlerFactory, Supplier<NetworkingGameClientHandler> handlerFactory,
String host, String host,
int port) { int port,
long connectionId) {
this.connectionId = connectionId;
try { try {
this.bootstrap.group(this.workerGroup); Bootstrap bootstrap = new Bootstrap();
this.bootstrap.channel(NioSocketChannel.class); EventLoopGroup workerGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
this.bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.group(workerGroup);
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() { bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override @Override
public void initChannel(SocketChannel ch) { public void initChannel(SocketChannel ch) {
handler = handlerFactory.get(); handler = handlerFactory.get();
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(1024)); // split at \n pipeline.addLast(new LineBasedFrameDecoder(1024)); // split at \n
pipeline.addLast(new StringDecoder()); // bytes -> String pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // bytes -> String
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(handler); pipeline.addLast(handler);
} }
}); });
ChannelFuture channelFuture = this.bootstrap.connect(host, port).sync(); ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
this.channel = channelFuture.channel(); this.channel = channelFuture.channel();
} catch (Exception e) { } catch (Exception e) {
logger.error("Failed to create networking client instance", e); logger.error("Failed to create networking client instance", e);
@@ -52,8 +56,8 @@ public class NetworkingClient {
return handler; return handler;
} }
public void setConnectionUuid(String connectionUuid) { public void setConnectionId(long connectionId) {
this.connectionUuid = connectionUuid; this.connectionId = connectionId;
} }
public boolean isChannelActive() { public boolean isChannelActive() {
@@ -64,18 +68,18 @@ public class NetworkingClient {
String literalMsg = msg.replace("\n", "\\n").replace("\r", "\\r"); String literalMsg = msg.replace("\n", "\\n").replace("\r", "\\r");
if (isChannelActive()) { if (isChannelActive()) {
this.channel.writeAndFlush(msg); this.channel.writeAndFlush(msg);
logger.info("Connection {} sent message: {}", this.channel.remoteAddress(), literalMsg); logger.info("Connection {} sent message: '{}'", this.channel.remoteAddress(), literalMsg);
} else { } else {
logger.warn("Cannot send message: {}, connection inactive.", literalMsg); logger.warn("Cannot send message: '{}', connection inactive.", literalMsg);
} }
} }
public void writeAndFlushnl(String msg) { public void writeAndFlushnl(String msg) {
if (isChannelActive()) { if (isChannelActive()) {
this.channel.writeAndFlush(msg + "\n"); this.channel.writeAndFlush(msg + "\r\n");
logger.info("Connection {} sent message: {}", this.channel.remoteAddress(), msg); logger.info("Connection {} sent message: '{}'", this.channel.remoteAddress(), msg);
} else { } else {
logger.warn("Cannot send message: {}, connection inactive.", msg); logger.warn("Cannot send message: '{}', connection inactive.", msg);
} }
} }
@@ -137,4 +141,8 @@ public class NetworkingClient {
} }
} }
public long getId() {
return this.connectionId;
}
} }

View File

@@ -7,6 +7,7 @@ import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.EventFlow; import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.SnowflakeGenerator;
import org.toop.framework.networking.events.NetworkEvents; import org.toop.framework.networking.events.NetworkEvents;
public class NetworkingClientManager { public class NetworkingClientManager {
@@ -14,67 +15,62 @@ public class NetworkingClientManager {
private static final Logger logger = LogManager.getLogger(NetworkingClientManager.class); private static final Logger logger = LogManager.getLogger(NetworkingClientManager.class);
/** Map of serverId -> Server instances */ /** Map of serverId -> Server instances */
private final Map<String, NetworkingClient> networkClients = new ConcurrentHashMap<>(); private final Map<Long, NetworkingClient> networkClients = new ConcurrentHashMap<>();
/** Starts a connection manager, to manage, connections. */ /** Starts a connection manager, to manage, connections. */
public NetworkingClientManager() throws NetworkingInitializationException { public NetworkingClientManager() throws NetworkingInitializationException {
try { try {
new EventFlow().listen(NetworkEvents.StartClientRequest.class, this::handleStartClientRequest); new EventFlow()
new EventFlow().listen(NetworkEvents.StartClient.class, this::handleStartClient); .listen(this::handleStartClient)
new EventFlow().listen(NetworkEvents.SendCommand.class, this::handleCommand); .listen(this::handleCommand)
new EventFlow().listen(NetworkEvents.CloseClient.class, this::handleCloseClient); .listen(this::handleCloseClient)
new EventFlow().listen(NetworkEvents.RequestsAllClients.class, this::getAllConnections); .listen(this::getAllConnections)
new EventFlow().listen(NetworkEvents.ForceCloseAllClients.class, this::shutdownAll); .listen(this::shutdownAll);
logger.info("NetworkingClientManager initialized");
} catch (Exception e) { } catch (Exception e) {
logger.error("Failed to initialize the client manager", e); logger.error("Failed to initialize the client manager", e);
throw e; throw e;
} }
} }
private String startClientRequest(Supplier<? extends NetworkingGameClientHandler> handlerFactory, private long startClientRequest(String ip, int port) {
String ip, long connectionId = new SnowflakeGenerator().nextId(); // TODO: Maybe use the one generated
int port) { try { // With EventFlow
String connectionUuid = UUID.randomUUID().toString();
try {
NetworkingClient client = new NetworkingClient( NetworkingClient client = new NetworkingClient(
handlerFactory, () -> new NetworkingGameClientHandler(connectionId),
ip, ip,
port); port,
this.networkClients.put(connectionUuid, client); connectionId);
client.setConnectionId(connectionId);
this.networkClients.put(connectionId, client);
} catch (Exception e) { } catch (Exception e) {
logger.error(e); logger.error(e);
} }
logger.info("Client {} started", connectionUuid); logger.info("Client {} started", connectionId);
return connectionUuid; return connectionId;
}
private void handleStartClientRequest(NetworkEvents.StartClientRequest request) {
request.future()
.complete(
this.startClientRequest(
request.handlerFactory(),
request.ip(),
request.port())); // TODO: Maybe post ConnectionEstablished event.
} }
private void handleStartClient(NetworkEvents.StartClient event) { private void handleStartClient(NetworkEvents.StartClient event) {
String uuid = this.startClientRequest(event.handlerFactory(), event.ip(), event.port()); long id = this.startClientRequest(event.ip(), event.port());
new EventFlow().addPostEvent(NetworkEvents.StartClientSuccess.class, new Thread(() -> {
uuid, event.eventSnowflake() try {
).asyncPostEvent(); Thread.sleep(100); // TODO: Is this a good idea?
new EventFlow().addPostEvent(NetworkEvents.StartClientResponse.class,
id, event.eventSnowflake()
).asyncPostEvent();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
} }
private void handleCommand( private void handleCommand(
NetworkEvents.SendCommand NetworkEvents.SendCommand
event) { // TODO: Move this to ServerConnection class, keep it internal. event) { // TODO: Move this to ServerConnection class, keep it internal.
NetworkingClient client = this.networkClients.get(event.connectionId()); NetworkingClient client = this.networkClients.get(event.connectionId());
logger.info("Preparing to send command: {} to server: {}", event.args(), client); logger.info("Preparing to send command: {} to server: {}", event.args(), client.getId());
if (client != null) { String args = String.join(" ", event.args());
String args = String.join(" ", event.args()) + "\n"; client.writeAndFlushnl(args);
client.writeAndFlush(args);
} else {
logger.warn("Server {} not found for command '{}'", event.connectionId(), event.args());
}
} }
private void handleCloseClient(NetworkEvents.CloseClient event) { private void handleCloseClient(NetworkEvents.CloseClient event) {
@@ -115,7 +111,7 @@ public class NetworkingClientManager {
private void getAllConnections(NetworkEvents.RequestsAllClients request) { private void getAllConnections(NetworkEvents.RequestsAllClients request) {
List<NetworkingClient> a = new ArrayList<>(this.networkClients.values()); List<NetworkingClient> a = new ArrayList<>(this.networkClients.values());
request.future().complete(a.toString()); request.future().complete(a);
} }
public void shutdownAll(NetworkEvents.ForceCloseAllClients request) { public void shutdownAll(NetworkEvents.ForceCloseAllClients request) {

View File

@@ -2,19 +2,65 @@ package org.toop.framework.networking;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import jdk.jfr.Event;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.networking.events.NetworkEvents;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter { public class NetworkingGameClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(NetworkingGameClientHandler.class); private static final Logger logger = LogManager.getLogger(NetworkingGameClientHandler.class);
public NetworkingGameClientHandler() {} private final long connectionId;
public NetworkingGameClientHandler(long connectionId) {
this.connectionId = connectionId;
}
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { public void channelRead(ChannelHandlerContext ctx, Object msg) {
logger.debug("Received message from server-{}, data: {}", ctx.channel().remoteAddress(), msg); String rec = msg.toString().trim();
// TODO: Handle server messages if (rec.equalsIgnoreCase("err")) {
logger.error("server-{} send back error, data: {}", ctx.channel().remoteAddress(), msg);
return;
}
if (rec.equalsIgnoreCase("ok")) {
logger.info("Received OK message from server-{}, data: {}", ctx.channel().remoteAddress(), msg);
return;
}
if (rec.toLowerCase().startsWith("svr")) {
logger.info("Received SVR message from server-{}, data: {}", ctx.channel().remoteAddress(), msg);
new EventFlow().addPostEvent(new NetworkEvents.ServerResponse(this.connectionId)).asyncPostEvent();
parseServerReturn(rec);
return;
}
logger.info("Received unparsed message from server-{}, data: {}", ctx.channel().remoteAddress(), msg);
}
private void parseServerReturn(String rec) {
if (rec.toLowerCase().contains("playerlist")) {
playerListHandler(rec);
} else if (rec.toLowerCase().contains("close")) {
} else {}
}
private void playerListHandler(String rec) {
Pattern pattern = Pattern.compile("\"([^\"]+)\"");
String[] players = pattern.matcher(rec)
.results()
.map(m -> m.group(1))
.toArray(String[]::new);
new EventFlow()
.addPostEvent(new NetworkEvents.PlayerListResponse(this.connectionId, players))
.asyncPostEvent();
} }
@Override @Override

View File

@@ -3,6 +3,7 @@ package org.toop.framework.networking.events;
import org.toop.framework.eventbus.events.EventWithSnowflake; import org.toop.framework.eventbus.events.EventWithSnowflake;
import org.toop.framework.eventbus.events.EventWithoutSnowflake; import org.toop.framework.eventbus.events.EventWithoutSnowflake;
import org.toop.framework.eventbus.events.EventsBase; import org.toop.framework.eventbus.events.EventsBase;
import org.toop.framework.networking.NetworkingClient;
import org.toop.framework.networking.NetworkingGameClientHandler; import org.toop.framework.networking.NetworkingGameClientHandler;
import java.lang.reflect.RecordComponent; import java.lang.reflect.RecordComponent;
@@ -20,14 +21,14 @@ public class NetworkEvents extends EventsBase {
* *
* @param future List of all connections in string form. * @param future List of all connections in string form.
*/ */
public record RequestsAllClients(CompletableFuture<String> future) implements EventWithoutSnowflake {} public record RequestsAllClients(CompletableFuture<List<NetworkingClient>> future) implements EventWithoutSnowflake {}
/** Forces closing all active connections immediately. */ /** Forces closing all active connections immediately. */
public record ForceCloseAllClients() implements EventWithoutSnowflake {} public record ForceCloseAllClients() implements EventWithoutSnowflake {}
public record CloseClientRequest(CompletableFuture<String> future) implements EventWithoutSnowflake {} public record PlayerListResponse(long clientId, String[] playerlist) implements EventWithoutSnowflake {}
public record CloseClient(String connectionId) implements EventWithoutSnowflake {} public record CloseClient(long connectionId) implements EventWithoutSnowflake {}
/** /**
* Event to start a new client connection to a server. * Event to start a new client connection to a server.
@@ -48,14 +49,12 @@ public class NetworkEvents extends EventsBase {
* or {@code StartClientFailure} event may carry the same {@code eventId}. * or {@code StartClientFailure} event may carry the same {@code eventId}.
* </p> * </p>
* *
* @param handlerFactory Factory for constructing a {@link NetworkingGameClientHandler}.
* @param ip The IP address of the server to connect to. * @param ip The IP address of the server to connect to.
* @param port The port number of the server to connect to. * @param port The port number of the server to connect to.
* @param eventSnowflake A unique identifier for this event, typically injected * @param eventSnowflake A unique identifier for this event, typically injected
* automatically by the {@link org.toop.framework.eventbus.EventFlow}. * automatically by the {@link org.toop.framework.eventbus.EventFlow}.
*/ */
public record StartClient( public record StartClient(
Supplier<? extends NetworkingGameClientHandler> handlerFactory,
String ip, String ip,
int port, int port,
long eventSnowflake long eventSnowflake
@@ -94,24 +93,12 @@ public class NetworkEvents extends EventsBase {
} }
} }
/**
* TODO: Update docs new input.
* BLOCKING Triggers starting a server connection and returns a future.
*
* @param ip The IP address of the server to connect to.
* @param port The port of the server to connect to.
* @param future Returns the UUID of the connection, when connection is established.
*/
public record StartClientRequest(
Supplier<? extends NetworkingGameClientHandler> handlerFactory,
String ip, int port, CompletableFuture<String> future) implements EventWithoutSnowflake {}
/** /**
* *
* @param clientId The ID of the client to be used in requests. * @param clientId The ID of the client to be used in requests.
* @param eventSnowflake The eventID used in checking if event is for you. * @param eventSnowflake The eventID used in checking if event is for you.
*/ */
public record StartClientSuccess(String clientId, long eventSnowflake) public record StartClientResponse(long clientId, long eventSnowflake)
implements EventWithSnowflake { implements EventWithSnowflake {
@Override @Override
public Map<String, Object> result() { public Map<String, Object> result() {
@@ -134,28 +121,34 @@ public class NetworkEvents extends EventsBase {
} }
} }
/**
*
* @param clientId The ID of the client that received the response.
*/
public record ServerResponse(long clientId) implements EventWithoutSnowflake {}
/** /**
* Triggers sending a command to a server. * Triggers sending a command to a server.
* *
* @param connectionId The UUID of the connection to send the command on. * @param connectionId The UUID of the connection to send the command on.
* @param args The command arguments. * @param args The command arguments.
*/ */
public record SendCommand(String connectionId, String... args) implements EventWithoutSnowflake {} public record SendCommand(long connectionId, String... args) implements EventWithoutSnowflake {}
/** /**
* Triggers reconnecting to a previous address. * Triggers reconnecting to a previous address.
* *
* @param connectionId The identifier of the connection being reconnected. * @param connectionId The identifier of the connection being reconnected.
*/ */
public record Reconnect(Object connectionId) implements EventWithoutSnowflake {} public record Reconnect(long connectionId) implements EventWithoutSnowflake {}
/** /**
* Triggers when the server client receives a message. * Triggers when the server client receives a message.
* *
* @param ConnectionUuid The UUID of the connection that received the message. * @param ConnectionId The snowflake id of the connection that received the message.
* @param message The message received. * @param message The message received.
*/ */
public record ReceivedMessage(String ConnectionUuid, String message) implements EventWithoutSnowflake {} public record ReceivedMessage(long ConnectionId, String message) implements EventWithoutSnowflake {}
/** /**
* Triggers changing connection to a new address. * Triggers changing connection to a new address.
@@ -164,7 +157,7 @@ public class NetworkEvents extends EventsBase {
* @param ip The new IP address. * @param ip The new IP address.
* @param port The new port. * @param port The new port.
*/ */
public record ChangeClient(Object connectionId, String ip, int port) implements EventWithoutSnowflake {} public record ChangeClient(long connectionId, String ip, int port) implements EventWithoutSnowflake {}
/** /**
@@ -172,7 +165,7 @@ public class NetworkEvents extends EventsBase {
* *
* @param connectionId The identifier of the connection that failed. * @param connectionId The identifier of the connection that failed.
*/ */
public record CouldNotConnect(Object connectionId) implements EventWithoutSnowflake {} public record CouldNotConnect(long connectionId) implements EventWithoutSnowflake {}
/** WIP Triggers when a connection closes. */ /** WIP Triggers when a connection closes. */
public record ClosedConnection() implements EventWithoutSnowflake {} public record ClosedConnection() implements EventWithoutSnowflake {}

View File

@@ -163,10 +163,9 @@ class EventPublisherStressTest {
for (int i = 0; i < EVENTS_PER_THREAD; i++) { for (int i = 0; i < EVENTS_PER_THREAD; i++) {
var a = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i) var a = new EventFlow().addPostEvent(HeavyEvent.class, "payload-" + i)
.onResponse(HeavyEventSuccess.class, _ -> counter.increment()) .onResponse(HeavyEventSuccess.class, _ -> counter.increment())
.unsubscribeAfterSuccess()
.postEvent(); .postEvent();
new EventFlow().addPostEvent(HeavyEventSuccess.class, "payload-" + i, a.getEventId()) new EventFlow().addPostEvent(HeavyEventSuccess.class, "payload-" + i, a.getEventSnowflake())
.postEvent(); .postEvent();
} }
}); });

View File

@@ -1,6 +1,7 @@
package org.toop.framework.eventbus; package org.toop.framework.eventbus;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.toop.framework.SnowflakeGenerator;
import org.toop.framework.eventbus.events.EventWithSnowflake; import org.toop.framework.eventbus.events.EventWithSnowflake;
import java.util.HashSet; import java.util.HashSet;
@@ -13,7 +14,7 @@ class EventFlowTest {
@Test @Test
void testSnowflakeStructure() { void testSnowflakeStructure() {
long id = new SnowflakeGenerator(1).nextId(); long id = new SnowflakeGenerator().nextId();
long timestampPart = id >>> 22; long timestampPart = id >>> 22;
long randomPart = id & ((1L << 22) - 1); long randomPart = id & ((1L << 22) - 1);
@@ -55,7 +56,7 @@ class EventFlowTest {
EventFlow flow = new EventFlow(); EventFlow flow = new EventFlow();
flow.addPostEvent(DummySnowflakeEvent.class); // no args, should auto-generate flow.addPostEvent(DummySnowflakeEvent.class); // no args, should auto-generate
long id = flow.getEventId(); long id = flow.getEventSnowflake();
assertNotEquals(-1, id, "Snowflake should be auto-generated"); assertNotEquals(-1, id, "Snowflake should be auto-generated");
assertTrue(flow.getEvent() instanceof DummySnowflakeEvent); assertTrue(flow.getEvent() instanceof DummySnowflakeEvent);
assertEquals(id, ((DummySnowflakeEvent) flow.getEvent()).eventSnowflake()); assertEquals(id, ((DummySnowflakeEvent) flow.getEvent()).eventSnowflake());
@@ -74,7 +75,7 @@ class EventFlowTest {
assertFalse(handlerCalled.get(), "Handler should not fire for mismatched snowflake"); assertFalse(handlerCalled.get(), "Handler should not fire for mismatched snowflake");
// Post with matching snowflake // Post with matching snowflake
GlobalEventBus.post(new DummySnowflakeEvent(flow.getEventId())); GlobalEventBus.post(new DummySnowflakeEvent(flow.getEventSnowflake()));
assertTrue(handlerCalled.get(), "Handler should fire for matching snowflake"); assertTrue(handlerCalled.get(), "Handler should fire for matching snowflake");
} }
} }