Added the ability to close and connect to a different server

This commit is contained in:
lieght
2025-09-14 01:43:13 +02:00
parent a13eee3ecd
commit f7e42926a4
9 changed files with 258 additions and 155 deletions

View File

@@ -1,17 +1,14 @@
package org.toop.server;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.Main;
import org.toop.eventbus.Events;
import org.toop.eventbus.GlobalEventBus;
import org.toop.server.backend.*;
import org.toop.server.backend.local.Local;
import org.toop.server.backend.remote.Remote;
import org.toop.server.backend.remote.TcpClient;
import org.toop.server.backend.TcpClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -26,46 +23,22 @@ public class Server extends Thread {
String ip;
String port;
IBackend backend;
BlockingQueue<String> commandQueue;
// TODO Reconnect and keep trying to connect.
BlockingQueue<String> commandQueue; // TODO, add a way to close it, for when reconnecting.
TcpClient tcpClient;
volatile boolean running = false;
Thread tcpIn;
Thread tcpOut;
public Server(String set_backend, String set_ip, String set_port) {
public Server(String set_ip, String set_port) {
ip = set_ip;
port = set_port;
this.setBackend(set_backend);
this.initEvents();
this.commandQueue = new LinkedBlockingQueue<>();
}
public IBackend getBackend() {
return backend;
}
/**
* @param backend The backend to change to.
*/
public void setBackend(ServerBackend backend) {
if (backend == ServerBackend.LOCAL) {
this.backend = new Local();
GlobalEventBus.post(new Events.ServerEvents.OnChangingServerBackend(ServerBackend.LOCAL));
try {
this.tcpClient = new TcpClient(this.getIp(), Integer.parseInt(this.getPort()));
} catch (IOException e) {
e.printStackTrace();
}
else {
this.backend = new Remote();
GlobalEventBus.post(new Events.ServerEvents.OnChangingServerBackend(ServerBackend.REMOTE));
}
}
public void setBackend(String backend) {
if (backend.equalsIgnoreCase("REMOTE")) {
this.backend = new Remote();
GlobalEventBus.post(new Events.ServerEvents.OnChangingServerBackend(ServerBackend.REMOTE));
}
else {
this.backend = new Local();
GlobalEventBus.post(new Events.ServerEvents.OnChangingServerBackend(ServerBackend.LOCAL));
}
}
public String getIp() {
@@ -76,8 +49,10 @@ public class Server extends Thread {
* @param ip The ip to change to.
*/
public void setIp(String ip) {
this.ip = ip;
GlobalEventBus.post(new Events.ServerEvents.OnChangingServerIp(ip));
}
public String getPort() {
@@ -89,8 +64,10 @@ public class Server extends Thread {
* @param port The port to change to.
*/
public void setPort(String port) {
this.port = port;
GlobalEventBus.post(new Events.ServerEvents.OnChangingServerPort(port));
}
/**
@@ -101,11 +78,17 @@ public class Server extends Thread {
* @param args The arguments for the command.
*/
private void sendCommandByString(String command, String... args) {
if (!ServerCommand.isValid(command)) {
logger.error("Invalid command: {}", command);
return;
}
if (!running) {
logger.warn("Server has been stopped");
return;
}
for (int i = 0; i < args.length; i++) {
args[i] = args[i].trim();
if (args[i].isEmpty()) {
@@ -125,42 +108,49 @@ public class Server extends Thread {
@Override
public String toString() {
return String.format(
"Server {ip: \"%s\", port: \"%s\", backend: \"%s\"}",
ip, port, backend
"Server {ip: \"%s\", port: \"%s\"}",
ip, port
);
}
private void initEvents() {
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.command.class, event
-> this.sendCommandByString(event.command(), event.args()));
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.changeServerIp.class, event
-> this.setIp(event.ip()));
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.changeServerPort.class, event
-> this.setPort(event.port()));
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.Reconnect.class, event
-> {
try {
this.reconnect(); // TODO: Terrible error handling and code. Needs cleanup.
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
GlobalEventBus.subscribeAndRegister(Events.ServerEvents.ChangeConnection.class, event
-> {
try {
this.connect(event.ip(), event.port()); // TODO: Terrible error handling and code. Needs cleanup.
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
/**
* DO NOT USE, USE START INSTEAD.
*/
public void run() {
try {
TcpClient client = new TcpClient(this.getIp(), Integer.parseInt(this.getPort())); // TODO Parsing to int is unsafe
theRemoteServerTimeline(client);
} catch (UnknownHostException | InterruptedException e) { // TODO Better error handling.
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void connection() throws InterruptedException { // TODO: Rename
private void theRemoteServerTimeline(TcpClient client) throws InterruptedException { // TODO: Rename
sleep(1000); // Just wait, because why not
new Thread(() -> {
this.tcpIn = new Thread(() -> {
try {
while (true) {
String received = client.readLine(); // blocks until a line is available
while (this.running) {
String received = this.tcpClient.readLine(); // blocks until a line is available
if (received != null) {
logger.info("Received: '{}'", received);
GlobalEventBus.post(new Events.ServerEvents.ReceivedMessage(received));
@@ -170,37 +160,124 @@ public class Server extends Thread {
}
} catch (IOException e) {
logger.error("Error reading from server", e);
try {
this.tcpClient.close();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}).start();
});
new Thread(() -> {
this.tcpOut = new Thread(() -> {
try {
while (true) {
while (this.running) {
String command = commandQueue.take(); // blocks until a command is available
client.sendMessage(command);
this.tcpClient.sendMessage(command);
logger.info("Sent command: '{}'", command);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (IOException e) {
try {
tcpClient.close();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
}).start();
});
this.tcpIn.start();
this.tcpOut.start();
}
/**
*
* Connect to a new server.
*
* @param ip The ip to connect to.
* @param port The port to connect to.
* @throws IOException wip
* @throws InterruptedException wip
*/
public void connect(String ip, String port) throws IOException, InterruptedException {
if (running) {
this.close();
}
this.ip = ip;
this.port = port;
running = true;
this.tcpClient = new TcpClient(ip, Integer.parseInt(port));
this.connection();
}
/**
*
* Reconnects to previous address.
*
* @throws IOException wip
* @throws InterruptedException wip
*/
public void reconnect() throws IOException, InterruptedException {
this.connect(this.ip, this.port);
}
/**
*
* Close connection to server.
*
* @throws IOException wip
* @throws InterruptedException wip
*/
public void close() throws IOException, InterruptedException {
this.commandQueue.clear();
running = false;
// Thread.currentThread().interrupt();
this.tcpClient.closeSocket();
this.tcpIn.interrupt();
this.tcpOut.interrupt();
this.tcpIn.join();
this.tcpOut.join();
}
/**
* DO NOT USE, USE START INSTEAD.
*/
public void run() {
try {
this.reconnect();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
*
* Starts a server thread.
*
* @param backend The backend to use {remote, local}
* @param ip The address of the server to contact.
* @param port The port of the server.
*/
public static void start(String backend, String ip, String port) {
public static void start(String ip, String port) {
try {
new Server(backend, ip, port).start();
new Server(ip, port).start();
} catch (IllegalArgumentException e) {
new Server("REMOTE", "127.0.0.1", "5001").start(); // TODO: Doesn't do anything.
new Server("127.0.0.1", "5001").start(); // TODO: Doesn't do anything.
}
}