Refactor to make Events easier to work with.

This commit is contained in:
lieght
2025-10-15 19:05:09 +02:00
parent bc6182e29a
commit 798005de2c
6 changed files with 78 additions and 157 deletions

View File

@@ -39,12 +39,13 @@ public class NetworkingClientEventListener {
} }
void handleStartClient(NetworkEvents.StartClient event) { void handleStartClient(NetworkEvents.StartClient event) {
long clientId = SnowflakeGenerator.nextId();
clientManager.startClient( clientManager.startClient(
event.identifier(), clientId,
event.networkingClientClass(), event.networkingClient(),
event.host(), event.networkingConnector(),
event.port(), () -> new EventFlow().addPostEvent(new NetworkEvents.StartClientResponse(clientId, true, event.identifier())).postEvent(),
event.networkingReconnect() () -> new EventFlow().addPostEvent(new NetworkEvents.StartClientResponse(clientId, false, event.identifier())).postEvent()
); );
} }
@@ -110,19 +111,23 @@ public class NetworkingClientEventListener {
} }
private void handleReconnect(NetworkEvents.Reconnect event) { private void handleReconnect(NetworkEvents.Reconnect event) {
try { clientManager.startClient(
clientManager.reconnect(event.clientId(), event.networkingReconnect()); event.clientId(),
} catch (ClientNotFoundException e) { event.networkingClient(),
logger.error(e); event.networkingConnector(),
} () -> new EventFlow().addPostEvent(new NetworkEvents.ReconnectResponse(true, event.identifier())).postEvent(),
() -> new EventFlow().addPostEvent(new NetworkEvents.ReconnectResponse(false, event.identifier())).postEvent()
);
} }
private void handleChangeAddress(NetworkEvents.ChangeAddress event) { private void handleChangeAddress(NetworkEvents.ChangeAddress event) {
try { clientManager.startClient(
clientManager.changeAddress(event.clientId(), event.ip(), event.port(), event.networkingReconnect()); event.clientId(),
} catch (ClientNotFoundException e) { event.networkingClient(),
logger.error(e); event.networkingConnector(),
} () -> new EventFlow().addPostEvent(new NetworkEvents.ChangeAddressResponse(true, event.identifier())).postEvent(),
() -> new EventFlow().addPostEvent(new NetworkEvents.ChangeAddressResponse(false, event.identifier())).postEvent()
);
} }
void handleCloseClient(NetworkEvents.CloseClient event) { void handleCloseClient(NetworkEvents.CloseClient event) {

View File

@@ -1,6 +1,5 @@
package org.toop.framework.networking; package org.toop.framework.networking;
import java.net.InetSocketAddress;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@@ -9,12 +8,10 @@ import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.toop.framework.eventbus.EventFlow;
import org.toop.framework.networking.events.NetworkEvents;
import org.toop.framework.networking.exceptions.ClientNotFoundException; import org.toop.framework.networking.exceptions.ClientNotFoundException;
import org.toop.framework.networking.exceptions.CouldNotConnectException; import org.toop.framework.networking.exceptions.CouldNotConnectException;
import org.toop.framework.networking.interfaces.NetworkingClient; import org.toop.framework.networking.interfaces.NetworkingClient;
import org.toop.framework.networking.types.NetworkingReconnect; import org.toop.framework.networking.types.NetworkingConnector;
public class NetworkingClientManager implements org.toop.framework.networking.interfaces.NetworkingClientManager { public class NetworkingClientManager implements org.toop.framework.networking.interfaces.NetworkingClientManager {
private static final Logger logger = LogManager.getLogger(NetworkingClientManager.class); private static final Logger logger = LogManager.getLogger(NetworkingClientManager.class);
@@ -22,13 +19,13 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
public NetworkingClientManager() {} public NetworkingClientManager() {}
@Override private void connectHelper(
public void startClient(
long id, long id,
NetworkingClient networkingClient, NetworkingClient nClient,
String host, int port, NetworkingConnector nConnector,
NetworkingReconnect networkingReconnect) { Runnable onSuccess,
Runnable onFailure
) {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
Runnable connectTask = new Runnable() { Runnable connectTask = new Runnable() {
@@ -36,26 +33,31 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
@Override @Override
public void run() { public void run() {
if (networkClients.get(id) != null) {
networkClients.remove(id);
}
try { try {
networkingClient.connect(id, host, port); nClient.connect(id, nConnector.host(), nConnector.port());
networkClients.put(id, networkingClient); networkClients.put(id, nClient);
logger.info("New client started successfully for {}:{}", host, port); logger.info("New client started successfully for {}:{}", nConnector.host(), nConnector.port());
new EventFlow().addPostEvent(new NetworkEvents.StartClientResponse(id, false, id)).postEvent(); onSuccess.run();
scheduler.shutdown(); scheduler.shutdown();
} catch (CouldNotConnectException e) { } catch (CouldNotConnectException e) {
attempts++; attempts++;
if (attempts < networkingReconnect.reconnectAttempts()) { if (attempts < nConnector.reconnectAttempts()) {
logger.warn("Could not connect to {}:{}. Retrying in {} {}", logger.warn("Could not connect to {}:{}. Retrying in {} {}",
host, port, networkingReconnect.timeout(), networkingReconnect.timeUnit()); nConnector.host(), nConnector.port(), nConnector.timeout(), nConnector.timeUnit());
scheduler.schedule(this, networkingReconnect.timeout(), networkingReconnect.timeUnit()); scheduler.schedule(this, nConnector.timeout(), nConnector.timeUnit());
} else { } else {
logger.error("Failed to start client for {}:{} after {} attempts", host, port, attempts); logger.error("Failed to start client for {}:{} after {} attempts", nConnector.host(), nConnector.port(), attempts);
new EventFlow().addPostEvent(new NetworkEvents.StartClientResponse(-1, false, id)).postEvent(); onFailure.run();
scheduler.shutdown(); scheduler.shutdown();
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("Unexpected exception during startClient", e); logger.error("Unexpected exception during startClient", e);
new EventFlow().addPostEvent(new NetworkEvents.StartClientResponse(-1, false, id)).postEvent(); onFailure.run();
scheduler.shutdown(); scheduler.shutdown();
} }
} }
@@ -64,6 +66,23 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
scheduler.schedule(connectTask, 0, TimeUnit.MILLISECONDS); scheduler.schedule(connectTask, 0, TimeUnit.MILLISECONDS);
} }
@Override
public void startClient(
long id,
NetworkingClient nClient,
NetworkingConnector nConnector,
Runnable onSuccess,
Runnable onFailure
) {
connectHelper(
id,
nClient,
nConnector,
onSuccess,
onFailure
);
}
@Override @Override
public void sendCommand(long id, String command) throws ClientNotFoundException { public void sendCommand(long id, String command) throws ClientNotFoundException {
logger.info("Sending command to client for {}:{}", id, command); logger.info("Sending command to client for {}:{}", id, command);
@@ -85,100 +104,6 @@ public class NetworkingClientManager implements org.toop.framework.networking.in
} }
@Override
public void reconnect(long id, NetworkingReconnect networkingReconnect) throws ClientNotFoundException {
NetworkingClient client = this.networkClients.get(id);
if (client == null) {
throw new ClientNotFoundException(id);
}
InetSocketAddress address = client.getAddress();
if (client.isActive()) {
client.closeConnection();
}
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
Runnable connectTask = new Runnable() {
int attempts = 0;
@Override
public void run() {
try {
client.connect(id, address.getHostName(), address.getPort());
networkClients.put(id, client);
logger.info("Client {} reconnected to {}:{}", id, address.getHostName(), address.getPort());
new EventFlow().addPostEvent(new NetworkEvents.ReconnectResponse(true, id)).postEvent().postEvent();
scheduler.shutdown();
} catch (CouldNotConnectException e) {
attempts++;
if (attempts < networkingReconnect.reconnectAttempts()) {
logger.warn("Could not reconnect client {} to {}:{}. Retrying in {} {}",
id, address.getHostName(), address.getPort(), networkingReconnect.timeout(), networkingReconnect.timeUnit());
scheduler.schedule(this, networkingReconnect.timeout(), networkingReconnect.timeUnit());
} else {
logger.error("Failed to reconnect client {} to {}:{} after {} attempts", id, address.getHostName(), address.getPort(), attempts);
new EventFlow().addPostEvent(new NetworkEvents.ReconnectResponse(false, id)).postEvent().postEvent();
scheduler.shutdown();
}
} catch (Exception e) {
logger.error("Unexpected exception during reconnect for client {}", id, e);
new EventFlow().addPostEvent(new NetworkEvents.ReconnectResponse(false, id)).postEvent().postEvent();
scheduler.shutdown();
}
}
};
scheduler.schedule(connectTask, 0, TimeUnit.MILLISECONDS);
}
@Override
public void changeAddress(long id, String host, int port, NetworkingReconnect networkingReconnect) throws ClientNotFoundException {
NetworkingClient client = this.networkClients.get(id);
if (client == null) {
throw new ClientNotFoundException(id);
}
if (client.isActive()) {
client.closeConnection();
}
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
Runnable connectTask = new Runnable() {
int attempts = 0;
@Override
public void run() {
try {
client.connect(id, host, port);
networkClients.put(id, client);
logger.info("Client {} changed address to {}:{}", id, host, port);
new EventFlow().addPostEvent(new NetworkEvents.ChangeAddressResponse(true, id)).postEvent().postEvent();
scheduler.shutdown();
} catch (CouldNotConnectException e) {
attempts++;
if (attempts < networkingReconnect.reconnectAttempts()) {
logger.warn("Could not connect client {} to {}:{}. Retrying in {} {}",
id, host, port, networkingReconnect.timeout(), networkingReconnect.timeUnit());
scheduler.schedule(this, networkingReconnect.timeout(), networkingReconnect.timeUnit());
} else {
logger.error("Failed to connect client {} to {}:{} after {} attempts", id, host, port, attempts);
new EventFlow().addPostEvent(new NetworkEvents.ChangeAddressResponse(false, id)).postEvent().postEvent();
scheduler.shutdown();
}
} catch (Exception e) {
logger.error("Unexpected exception during changeAddress for client {}", id, e);
new EventFlow().addPostEvent(new NetworkEvents.ChangeAddressResponse(false, id)).postEvent().postEvent();
scheduler.shutdown();
}
}
};
scheduler.schedule(connectTask, 0, TimeUnit.MILLISECONDS);
}
@Override @Override
public void closeClient(long id) throws ClientNotFoundException { public void closeClient(long id) throws ClientNotFoundException {
NetworkingClient client = this.networkClients.get(id); NetworkingClient client = this.networkClients.get(id);

View File

@@ -9,7 +9,7 @@ import org.toop.framework.eventbus.events.UniqueEvent;
import org.toop.framework.eventbus.events.EventsBase; import org.toop.framework.eventbus.events.EventsBase;
import org.toop.annotations.AutoResponseResult; import org.toop.annotations.AutoResponseResult;
import org.toop.framework.networking.interfaces.NetworkingClient; import org.toop.framework.networking.interfaces.NetworkingClient;
import org.toop.framework.networking.types.NetworkingReconnect; import org.toop.framework.networking.types.NetworkingConnector;
/** /**
* A collection of networking-related event records for use with the {@link * A collection of networking-related event records for use with the {@link
@@ -118,16 +118,12 @@ public class NetworkEvents extends EventsBase {
* *
* <p>Carries IP, port, and a unique event ID for correlation with responses. * <p>Carries IP, port, and a unique event ID for correlation with responses.
* *
* @param networkingClientClass The type of networking client to create. * @param networkingClient
* @param host Server IP address. * @param networkingConnector
* @param port Server port.
* @param identifier Unique event identifier for correlation.
*/ */
public record StartClient( public record StartClient(
NetworkingClient networkingClientClass, NetworkingClient networkingClient,
String host, NetworkingConnector networkingConnector,
int port,
NetworkingReconnect networkingReconnect,
long identifier) implements UniqueEvent {} long identifier) implements UniqueEvent {}
/** /**
@@ -135,13 +131,12 @@ public class NetworkEvents extends EventsBase {
* *
* @param clientId The client ID assigned to the new connection. * @param clientId The client ID assigned to the new connection.
* @param successful If successfully connected or not. If not clientId will also be -1. * @param successful If successfully connected or not. If not clientId will also be -1.
* @param identifier Event ID used for correlation.
*/ */
@AutoResponseResult @AutoResponseResult
public record StartClientResponse(long clientId, boolean successful, long identifier) implements ResponseToUniqueEvent {} public record StartClientResponse(long clientId, boolean successful, long identifier) implements ResponseToUniqueEvent {}
/** WIP (Not working) Request to reconnect a client to a previous address. */ /** WIP (Not working) Request to reconnect a client to a previous address. */
public record Reconnect(long clientId, NetworkingReconnect networkingReconnect, long identifier) public record Reconnect(long clientId, NetworkingClient networkingClient, NetworkingConnector networkingConnector, long identifier)
implements UniqueEvent {} implements UniqueEvent {}
public record ReconnectResponse(boolean successful, long identifier) implements ResponseToUniqueEvent {} public record ReconnectResponse(boolean successful, long identifier) implements ResponseToUniqueEvent {}
@@ -150,10 +145,9 @@ public class NetworkEvents extends EventsBase {
* Request to change a client connection to a new server. * Request to change a client connection to a new server.
* *
* @param clientId The client connection ID. * @param clientId The client connection ID.
* @param ip The new server IP. * @param networkingConnector
* @param port The new server port.
*/ */
public record ChangeAddress(long clientId, String ip, int port, NetworkingReconnect networkingReconnect, long identifier) public record ChangeAddress(long clientId, NetworkingClient networkingClient, NetworkingConnector networkingConnector, long identifier)
implements UniqueEvent {} implements UniqueEvent {}
public record ChangeAddressResponse(boolean successful, long identifier) implements ResponseToUniqueEvent {} public record ChangeAddressResponse(boolean successful, long identifier) implements ResponseToUniqueEvent {}

View File

@@ -2,19 +2,16 @@ package org.toop.framework.networking.interfaces;
import org.toop.framework.networking.exceptions.ClientNotFoundException; import org.toop.framework.networking.exceptions.ClientNotFoundException;
import org.toop.framework.networking.exceptions.CouldNotConnectException; import org.toop.framework.networking.exceptions.CouldNotConnectException;
import org.toop.framework.networking.types.NetworkingReconnect; import org.toop.framework.networking.types.NetworkingConnector;
import java.util.Optional;
public interface NetworkingClientManager { public interface NetworkingClientManager {
void startClient( void startClient(
long id, long id,
NetworkingClient networkingClientClass, NetworkingClient nClient,
String host, NetworkingConnector nConnector,
int port, Runnable onSuccess,
NetworkingReconnect networkingReconnect) throws CouldNotConnectException; Runnable onFailure
) throws CouldNotConnectException;
void sendCommand(long id, String command) throws ClientNotFoundException; void sendCommand(long id, String command) throws ClientNotFoundException;
void reconnect(long id, NetworkingReconnect networkingReconnect) throws ClientNotFoundException;
void changeAddress(long id, String host, int port, NetworkingReconnect networkingReconnect) throws ClientNotFoundException;
void closeClient(long id) throws ClientNotFoundException; void closeClient(long id) throws ClientNotFoundException;
} }

View File

@@ -0,0 +1,5 @@
package org.toop.framework.networking.types;
import java.util.concurrent.TimeUnit;
public record NetworkingConnector(String host, int port, int reconnectAttempts, long timeout, TimeUnit timeUnit) {}

View File

@@ -1,5 +0,0 @@
package org.toop.framework.networking.types;
import java.util.concurrent.TimeUnit;
public record NetworkingReconnect(int reconnectAttempts, long timeout, TimeUnit timeUnit) {}