diff --git a/framework/src/main/java/org/toop/framework/networking/NetworkingClientEventListener.java b/framework/src/main/java/org/toop/framework/networking/NetworkingClientEventListener.java index 9b8fb14..ef51a46 100644 --- a/framework/src/main/java/org/toop/framework/networking/NetworkingClientEventListener.java +++ b/framework/src/main/java/org/toop/framework/networking/NetworkingClientEventListener.java @@ -39,12 +39,13 @@ public class NetworkingClientEventListener { } void handleStartClient(NetworkEvents.StartClient event) { + long clientId = SnowflakeGenerator.nextId(); clientManager.startClient( - event.identifier(), - event.networkingClientClass(), - event.host(), - event.port(), - event.networkingReconnect() + clientId, + event.networkingClient(), + event.networkingConnector(), + () -> new EventFlow().addPostEvent(new NetworkEvents.StartClientResponse(clientId, true, event.identifier())).postEvent(), + () -> new EventFlow().addPostEvent(new NetworkEvents.StartClientResponse(clientId, false, event.identifier())).postEvent() ); } @@ -110,19 +111,23 @@ public class NetworkingClientEventListener { } private void handleReconnect(NetworkEvents.Reconnect event) { - try { - clientManager.reconnect(event.clientId(), event.networkingReconnect()); - } catch (ClientNotFoundException e) { - logger.error(e); - } + clientManager.startClient( + event.clientId(), + event.networkingClient(), + event.networkingConnector(), + () -> new EventFlow().addPostEvent(new NetworkEvents.ReconnectResponse(true, event.identifier())).postEvent(), + () -> new EventFlow().addPostEvent(new NetworkEvents.ReconnectResponse(false, event.identifier())).postEvent() + ); } private void handleChangeAddress(NetworkEvents.ChangeAddress event) { - try { - clientManager.changeAddress(event.clientId(), event.ip(), event.port(), event.networkingReconnect()); - } catch (ClientNotFoundException e) { - logger.error(e); - } + clientManager.startClient( + event.clientId(), + event.networkingClient(), + event.networkingConnector(), + () -> new EventFlow().addPostEvent(new NetworkEvents.ChangeAddressResponse(true, event.identifier())).postEvent(), + () -> new EventFlow().addPostEvent(new NetworkEvents.ChangeAddressResponse(false, event.identifier())).postEvent() + ); } void handleCloseClient(NetworkEvents.CloseClient event) { diff --git a/framework/src/main/java/org/toop/framework/networking/NetworkingClientManager.java b/framework/src/main/java/org/toop/framework/networking/NetworkingClientManager.java index 86d7085..0f4538f 100644 --- a/framework/src/main/java/org/toop/framework/networking/NetworkingClientManager.java +++ b/framework/src/main/java/org/toop/framework/networking/NetworkingClientManager.java @@ -1,6 +1,5 @@ package org.toop.framework.networking; -import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -9,12 +8,10 @@ import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.toop.framework.eventbus.EventFlow; -import org.toop.framework.networking.events.NetworkEvents; import org.toop.framework.networking.exceptions.ClientNotFoundException; import org.toop.framework.networking.exceptions.CouldNotConnectException; import org.toop.framework.networking.interfaces.NetworkingClient; -import org.toop.framework.networking.types.NetworkingReconnect; +import org.toop.framework.networking.types.NetworkingConnector; public class NetworkingClientManager implements org.toop.framework.networking.interfaces.NetworkingClientManager { private static final Logger logger = LogManager.getLogger(NetworkingClientManager.class); @@ -22,13 +19,13 @@ public class NetworkingClientManager implements org.toop.framework.networking.in public NetworkingClientManager() {} - @Override - public void startClient( + private void connectHelper( long id, - NetworkingClient networkingClient, - String host, int port, - NetworkingReconnect networkingReconnect) { - + NetworkingClient nClient, + NetworkingConnector nConnector, + Runnable onSuccess, + Runnable onFailure + ) { ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); Runnable connectTask = new Runnable() { @@ -36,26 +33,31 @@ public class NetworkingClientManager implements org.toop.framework.networking.in @Override public void run() { + + if (networkClients.get(id) != null) { + networkClients.remove(id); + } + try { - networkingClient.connect(id, host, port); - networkClients.put(id, networkingClient); - logger.info("New client started successfully for {}:{}", host, port); - new EventFlow().addPostEvent(new NetworkEvents.StartClientResponse(id, false, id)).postEvent(); + nClient.connect(id, nConnector.host(), nConnector.port()); + networkClients.put(id, nClient); + logger.info("New client started successfully for {}:{}", nConnector.host(), nConnector.port()); + onSuccess.run(); scheduler.shutdown(); } catch (CouldNotConnectException e) { attempts++; - if (attempts < networkingReconnect.reconnectAttempts()) { + if (attempts < nConnector.reconnectAttempts()) { logger.warn("Could not connect to {}:{}. Retrying in {} {}", - host, port, networkingReconnect.timeout(), networkingReconnect.timeUnit()); - scheduler.schedule(this, networkingReconnect.timeout(), networkingReconnect.timeUnit()); + nConnector.host(), nConnector.port(), nConnector.timeout(), nConnector.timeUnit()); + scheduler.schedule(this, nConnector.timeout(), nConnector.timeUnit()); } else { - logger.error("Failed to start client for {}:{} after {} attempts", host, port, attempts); - new EventFlow().addPostEvent(new NetworkEvents.StartClientResponse(-1, false, id)).postEvent(); + logger.error("Failed to start client for {}:{} after {} attempts", nConnector.host(), nConnector.port(), attempts); + onFailure.run(); scheduler.shutdown(); } } catch (Exception e) { logger.error("Unexpected exception during startClient", e); - new EventFlow().addPostEvent(new NetworkEvents.StartClientResponse(-1, false, id)).postEvent(); + onFailure.run(); scheduler.shutdown(); } } @@ -64,6 +66,23 @@ public class NetworkingClientManager implements org.toop.framework.networking.in scheduler.schedule(connectTask, 0, TimeUnit.MILLISECONDS); } + @Override + public void startClient( + long id, + NetworkingClient nClient, + NetworkingConnector nConnector, + Runnable onSuccess, + Runnable onFailure + ) { + connectHelper( + id, + nClient, + nConnector, + onSuccess, + onFailure + ); + } + @Override public void sendCommand(long id, String command) throws ClientNotFoundException { logger.info("Sending command to client for {}:{}", id, command); @@ -85,100 +104,6 @@ public class NetworkingClientManager implements org.toop.framework.networking.in } - @Override - public void reconnect(long id, NetworkingReconnect networkingReconnect) throws ClientNotFoundException { - NetworkingClient client = this.networkClients.get(id); - if (client == null) { - throw new ClientNotFoundException(id); - } - - InetSocketAddress address = client.getAddress(); - - if (client.isActive()) { - client.closeConnection(); - } - - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); - - Runnable connectTask = new Runnable() { - int attempts = 0; - - @Override - public void run() { - try { - client.connect(id, address.getHostName(), address.getPort()); - networkClients.put(id, client); - logger.info("Client {} reconnected to {}:{}", id, address.getHostName(), address.getPort()); - new EventFlow().addPostEvent(new NetworkEvents.ReconnectResponse(true, id)).postEvent().postEvent(); - scheduler.shutdown(); - } catch (CouldNotConnectException e) { - attempts++; - if (attempts < networkingReconnect.reconnectAttempts()) { - logger.warn("Could not reconnect client {} to {}:{}. Retrying in {} {}", - id, address.getHostName(), address.getPort(), networkingReconnect.timeout(), networkingReconnect.timeUnit()); - scheduler.schedule(this, networkingReconnect.timeout(), networkingReconnect.timeUnit()); - } else { - logger.error("Failed to reconnect client {} to {}:{} after {} attempts", id, address.getHostName(), address.getPort(), attempts); - new EventFlow().addPostEvent(new NetworkEvents.ReconnectResponse(false, id)).postEvent().postEvent(); - scheduler.shutdown(); - } - } catch (Exception e) { - logger.error("Unexpected exception during reconnect for client {}", id, e); - new EventFlow().addPostEvent(new NetworkEvents.ReconnectResponse(false, id)).postEvent().postEvent(); - scheduler.shutdown(); - } - } - }; - - scheduler.schedule(connectTask, 0, TimeUnit.MILLISECONDS); - } - - @Override - public void changeAddress(long id, String host, int port, NetworkingReconnect networkingReconnect) throws ClientNotFoundException { - NetworkingClient client = this.networkClients.get(id); - if (client == null) { - throw new ClientNotFoundException(id); - } - - if (client.isActive()) { - client.closeConnection(); - } - - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); - - Runnable connectTask = new Runnable() { - int attempts = 0; - - @Override - public void run() { - try { - client.connect(id, host, port); - networkClients.put(id, client); - logger.info("Client {} changed address to {}:{}", id, host, port); - new EventFlow().addPostEvent(new NetworkEvents.ChangeAddressResponse(true, id)).postEvent().postEvent(); - scheduler.shutdown(); - } catch (CouldNotConnectException e) { - attempts++; - if (attempts < networkingReconnect.reconnectAttempts()) { - logger.warn("Could not connect client {} to {}:{}. Retrying in {} {}", - id, host, port, networkingReconnect.timeout(), networkingReconnect.timeUnit()); - scheduler.schedule(this, networkingReconnect.timeout(), networkingReconnect.timeUnit()); - } else { - logger.error("Failed to connect client {} to {}:{} after {} attempts", id, host, port, attempts); - new EventFlow().addPostEvent(new NetworkEvents.ChangeAddressResponse(false, id)).postEvent().postEvent(); - scheduler.shutdown(); - } - } catch (Exception e) { - logger.error("Unexpected exception during changeAddress for client {}", id, e); - new EventFlow().addPostEvent(new NetworkEvents.ChangeAddressResponse(false, id)).postEvent().postEvent(); - scheduler.shutdown(); - } - } - }; - - scheduler.schedule(connectTask, 0, TimeUnit.MILLISECONDS); - } - @Override public void closeClient(long id) throws ClientNotFoundException { NetworkingClient client = this.networkClients.get(id); diff --git a/framework/src/main/java/org/toop/framework/networking/events/NetworkEvents.java b/framework/src/main/java/org/toop/framework/networking/events/NetworkEvents.java index 9740a17..8938721 100644 --- a/framework/src/main/java/org/toop/framework/networking/events/NetworkEvents.java +++ b/framework/src/main/java/org/toop/framework/networking/events/NetworkEvents.java @@ -9,7 +9,7 @@ import org.toop.framework.eventbus.events.UniqueEvent; import org.toop.framework.eventbus.events.EventsBase; import org.toop.annotations.AutoResponseResult; import org.toop.framework.networking.interfaces.NetworkingClient; -import org.toop.framework.networking.types.NetworkingReconnect; +import org.toop.framework.networking.types.NetworkingConnector; /** * A collection of networking-related event records for use with the {@link @@ -118,16 +118,12 @@ public class NetworkEvents extends EventsBase { * *
Carries IP, port, and a unique event ID for correlation with responses. * - * @param networkingClientClass The type of networking client to create. - * @param host Server IP address. - * @param port Server port. - * @param identifier Unique event identifier for correlation. + * @param networkingClient + * @param networkingConnector */ public record StartClient( - NetworkingClient networkingClientClass, - String host, - int port, - NetworkingReconnect networkingReconnect, + NetworkingClient networkingClient, + NetworkingConnector networkingConnector, long identifier) implements UniqueEvent {} /** @@ -135,13 +131,12 @@ public class NetworkEvents extends EventsBase { * * @param clientId The client ID assigned to the new connection. * @param successful If successfully connected or not. If not clientId will also be -1. - * @param identifier Event ID used for correlation. */ @AutoResponseResult public record StartClientResponse(long clientId, boolean successful, long identifier) implements ResponseToUniqueEvent {} /** WIP (Not working) Request to reconnect a client to a previous address. */ - public record Reconnect(long clientId, NetworkingReconnect networkingReconnect, long identifier) + public record Reconnect(long clientId, NetworkingClient networkingClient, NetworkingConnector networkingConnector, long identifier) implements UniqueEvent {} public record ReconnectResponse(boolean successful, long identifier) implements ResponseToUniqueEvent {} @@ -150,10 +145,9 @@ public class NetworkEvents extends EventsBase { * Request to change a client connection to a new server. * * @param clientId The client connection ID. - * @param ip The new server IP. - * @param port The new server port. + * @param networkingConnector */ - public record ChangeAddress(long clientId, String ip, int port, NetworkingReconnect networkingReconnect, long identifier) + public record ChangeAddress(long clientId, NetworkingClient networkingClient, NetworkingConnector networkingConnector, long identifier) implements UniqueEvent {} public record ChangeAddressResponse(boolean successful, long identifier) implements ResponseToUniqueEvent {} diff --git a/framework/src/main/java/org/toop/framework/networking/interfaces/NetworkingClientManager.java b/framework/src/main/java/org/toop/framework/networking/interfaces/NetworkingClientManager.java index b9d8445..c236080 100644 --- a/framework/src/main/java/org/toop/framework/networking/interfaces/NetworkingClientManager.java +++ b/framework/src/main/java/org/toop/framework/networking/interfaces/NetworkingClientManager.java @@ -2,19 +2,16 @@ package org.toop.framework.networking.interfaces; import org.toop.framework.networking.exceptions.ClientNotFoundException; import org.toop.framework.networking.exceptions.CouldNotConnectException; -import org.toop.framework.networking.types.NetworkingReconnect; - -import java.util.Optional; +import org.toop.framework.networking.types.NetworkingConnector; public interface NetworkingClientManager { void startClient( long id, - NetworkingClient networkingClientClass, - String host, - int port, - NetworkingReconnect networkingReconnect) throws CouldNotConnectException; + NetworkingClient nClient, + NetworkingConnector nConnector, + Runnable onSuccess, + Runnable onFailure + ) throws CouldNotConnectException; void sendCommand(long id, String command) throws ClientNotFoundException; - void reconnect(long id, NetworkingReconnect networkingReconnect) throws ClientNotFoundException; - void changeAddress(long id, String host, int port, NetworkingReconnect networkingReconnect) throws ClientNotFoundException; void closeClient(long id) throws ClientNotFoundException; } diff --git a/framework/src/main/java/org/toop/framework/networking/types/NetworkingConnector.java b/framework/src/main/java/org/toop/framework/networking/types/NetworkingConnector.java new file mode 100644 index 0000000..ee6ed44 --- /dev/null +++ b/framework/src/main/java/org/toop/framework/networking/types/NetworkingConnector.java @@ -0,0 +1,5 @@ +package org.toop.framework.networking.types; + +import java.util.concurrent.TimeUnit; + +public record NetworkingConnector(String host, int port, int reconnectAttempts, long timeout, TimeUnit timeUnit) {} \ No newline at end of file diff --git a/framework/src/main/java/org/toop/framework/networking/types/NetworkingReconnect.java b/framework/src/main/java/org/toop/framework/networking/types/NetworkingReconnect.java deleted file mode 100644 index 7245124..0000000 --- a/framework/src/main/java/org/toop/framework/networking/types/NetworkingReconnect.java +++ /dev/null @@ -1,5 +0,0 @@ -package org.toop.framework.networking.types; - -import java.util.concurrent.TimeUnit; - -public record NetworkingReconnect(int reconnectAttempts, long timeout, TimeUnit timeUnit) {} \ No newline at end of file