Added multithreaded TcpClient and TcpServer

This commit is contained in:
lieght
2025-09-13 23:39:13 +02:00
parent d17c1e010d
commit a13eee3ecd
10 changed files with 187 additions and 50 deletions

View File

@@ -3,9 +3,11 @@ package org.toop;
import org.toop.eventbus.Events; import org.toop.eventbus.Events;
import org.toop.eventbus.GlobalEventBus; import org.toop.eventbus.GlobalEventBus;
import org.toop.server.Server; import org.toop.server.Server;
import org.toop.server.backend.Testsss;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.toop.server.backend.local.TcpServer;
public class Main { public class Main {
private static final Logger logger = LogManager.getLogger(Main.class); private static final Logger logger = LogManager.getLogger(Main.class);
@@ -16,11 +18,11 @@ public class Main {
throw new RuntimeException("A event could not be initialized"); throw new RuntimeException("A event could not be initialized");
} }
TcpServer server = new TcpServer(5001);
Thread serverThread = new Thread(server);
serverThread.start();
Server.start("REMOTE", "127.0.0.1", "5001"); Server.start("REMOTE", "127.0.0.1", "5001");
// Testsss.start(""); // Used for testing server.
GlobalEventBus.post(new Events.ServerEvents.command("HELP", "THANK"));
GlobalEventBus.post(new Events.ServerEvents.command("GET"));
Window.start(""); Window.start("");
} }

View File

@@ -20,6 +20,8 @@ public class Events implements IEvents {
*/ */
public record OnCommand(org.toop.server.ServerCommand command, String[] args, String result) {} public record OnCommand(org.toop.server.ServerCommand command, String[] args, String result) {}
public record ReceivedMessage(String message) {}
/** /**
* Triggers on changing the server backend. * Triggers on changing the server backend.
*/ */

View File

@@ -10,8 +10,8 @@ import org.toop.server.backend.local.Local;
import org.toop.server.backend.remote.Remote; import org.toop.server.backend.remote.Remote;
import org.toop.server.backend.remote.TcpClient; import org.toop.server.backend.remote.TcpClient;
import java.io.IOException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@@ -24,16 +24,11 @@ public class Server extends Thread {
REMOTE, REMOTE,
} }
public enum Message {
OK,
ERR,
SVR,
}
String ip; String ip;
String port; String port;
IBackend backend; IBackend backend;
BlockingQueue<String> commandQueue; BlockingQueue<String> commandQueue;
// TODO Reconnect and keep trying to connect.
public Server(String set_backend, String set_ip, String set_port) { public Server(String set_backend, String set_ip, String set_port) {
ip = set_ip; ip = set_ip;
@@ -107,6 +102,7 @@ public class Server extends Thread {
*/ */
private void sendCommandByString(String command, String... args) { private void sendCommandByString(String command, String... args) {
if (!ServerCommand.isValid(command)) { if (!ServerCommand.isValid(command)) {
logger.error("Invalid command: {}", command);
return; return;
} }
@@ -123,27 +119,10 @@ public class Server extends Thread {
command = String.join(" ", fullCommand); command = String.join(" ", fullCommand);
this.commandQueue.add(command); this.commandQueue.add(command);
logger.info("Command {} added to the queue", command); logger.info("Command '{}' added to the queue", command);
} }
// /**
// * Sends a command to the server.
// *
// * @param command the command to execute
// * @return a Message indicating success or error
// */
// public void sendCommand(ServerCommand command) {
//
// Message result = Message.OK;
//
// this.commandQueue.add(command.toString());
//
// GlobalEventBus.post(new Events.ServerEvents.OnCommand(command, new String[0], result));
//
// return result;
// }
@Override @Override
public String toString() { public String toString() {
return String.format( return String.format(
@@ -161,7 +140,6 @@ public class Server extends Thread {
-> this.setPort(event.port())); -> this.setPort(event.port()));
} }
/** /**
* DO NOT USE, USE START INSTEAD. * DO NOT USE, USE START INSTEAD.
*/ */
@@ -171,24 +149,43 @@ public class Server extends Thread {
theRemoteServerTimeline(client); theRemoteServerTimeline(client);
} catch (UnknownHostException | InterruptedException e) { // TODO Better error handling. } catch (UnknownHostException | InterruptedException e) { // TODO Better error handling.
throw new RuntimeException(e); throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
} }
} }
private void theRemoteServerTimeline(TcpClient client) throws InterruptedException { private void theRemoteServerTimeline(TcpClient client) throws InterruptedException { // TODO: Rename
sleep(1000); sleep(1000); // Just wait, because why not
while (true) {
new Thread(() -> {
try { try {
String command = this.commandQueue.take(); // waits until an element is available while (true) {
logger.info("Sending command: {}", command); String received = client.readLine(); // blocks until a line is available
if (received != null) {
logger.info("Received: '{}'", received);
GlobalEventBus.post(new Events.ServerEvents.ReceivedMessage(received));
} else {
break;
}
}
} catch (IOException e) {
logger.error("Error reading from server", e);
}
}).start();
new Thread(() -> {
try {
while (true) {
String command = commandQueue.take(); // blocks until a command is available
client.sendMessage(command); client.sendMessage(command);
client.readLine(); logger.info("Sent command: '{}'", command);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
break; } catch (IOException e) {
} catch (Exception e) { throw new RuntimeException(e);
logger.error("Error in remote server timeline", e);
}
} }
}).start();
} }
/** /**
@@ -203,7 +200,7 @@ public class Server extends Thread {
try { try {
new Server(backend, ip, port).start(); new Server(backend, ip, port).start();
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
new Server(backend, "127.0.0.1", "5001").start(); // TODO: Doesn't do anything. new Server("REMOTE", "127.0.0.1", "5001").start(); // TODO: Doesn't do anything.
} }
} }

View File

@@ -0,0 +1,45 @@
package org.toop.server;
import java.util.EnumSet;
public enum ServerMessage {
OK,
ERR,
SVR;
private static final EnumSet<ServerMessage> VALID_COMMANDS = EnumSet.of(
ServerMessage.OK, ServerMessage.ERR, ServerMessage.SVR
);
public static EnumSet<ServerMessage> getValidCommands() {
return VALID_COMMANDS;
}
// TODO: Garbage code.
/**
* @param command Checks if string is a valid command.
* @return returns a boolean if string is a valid command.
*/
public static boolean isValid(String command) {
try {
ServerMessage.valueOf(command.toUpperCase());
return true;
} catch (IllegalArgumentException err) {
return false;
}
}
// TODO: Return something better
/**
* @param command Converts a string into a ServerCommand.
* @return returns a ServerCommand enum.
*/
public static ServerMessage getCommand(String command) {
if (isValid(command)) {
ServerMessage.valueOf(command.toUpperCase());
return ServerMessage.valueOf(command.toUpperCase());
}
return null;
}
}

View File

@@ -1,9 +1,9 @@
package org.toop.server.backend; package org.toop.server.backend;
import org.toop.server.Server; import org.toop.server.ServerMessage;
public interface IBackend { public interface IBackend {
Server.Message login(String username); ServerMessage login(String username);
// boolean isValidBackend(String backend); // boolean isValidBackend(String backend);
} }

View File

@@ -0,0 +1,25 @@
package org.toop.server.backend;
import org.toop.eventbus.Events;
import org.toop.eventbus.GlobalEventBus;
public class Testsss extends Thread {
public Testsss() {}
public void run() {
while (true) {
try {
sleep(1000);
GlobalEventBus.post(new Events.ServerEvents.command("HELP", "TEST"));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public static void start(String keepEmpty) {
new Testsss().start();
}
}

View File

@@ -1,11 +1,11 @@
package org.toop.server.backend.local; package org.toop.server.backend.local;
import org.toop.server.Server; import org.toop.server.ServerMessage;
import org.toop.server.backend.IBackend; import org.toop.server.backend.IBackend;
public class Local implements IBackend { public class Local implements IBackend {
@Override @Override
public Server.Message login(String username) { public ServerMessage login(String username) {
return null; return null;
} }

View File

@@ -0,0 +1,63 @@
package org.toop.server.backend.local;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.Main;
import java.io.*;
import java.net.*;
public class TcpServer implements Runnable {
private static final Logger logger = LogManager.getLogger(Main.class);
private int port;
private boolean running = true;
public TcpServer(int port) {
this.port = port;
}
@Override
public void run() {
try (ServerSocket serverSocket = new ServerSocket(port)) {
logger.info("Server listening on port " + port);
while (running) {
Socket clientSocket = serverSocket.accept();
logger.info("Connected to client: " + clientSocket.getInetAddress());
// Handle each client in a new thread
new Thread(() -> handleClient(clientSocket)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void handleClient(Socket clientSocket) {
try (
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true)
) {
String message;
while ((message = in.readLine()) != null) {
logger.info("Received: " + message);
out.println("Echo: " + message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
clientSocket.close();
logger.info("Client disconnected.");
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void stop() {
running = false;
}
}

View File

@@ -1,11 +1,11 @@
package org.toop.server.backend.remote; package org.toop.server.backend.remote;
import org.toop.server.Server; import org.toop.server.ServerMessage;
import org.toop.server.backend.IBackend; import org.toop.server.backend.IBackend;
public class Remote implements IBackend { public class Remote implements IBackend {
@Override @Override
public Server.Message login(String username) { public ServerMessage login(String username) {
return null; return null;
} }

View File

@@ -24,9 +24,12 @@ public class TcpClient {
this.out = createOut(); this.out = createOut();
} }
public TcpClient(String serverIp, int serverPort) throws UnknownHostException { public TcpClient(String serverIp, int serverPort) throws IOException {
this.serverAddress = InetAddress.getByName(serverIp); this.serverAddress = InetAddress.getByName(serverIp);
this.serverPort = serverPort; this.serverPort = serverPort;
this.socket = createSocket();
this.in = createIn();
this.out = createOut();
} }
private Socket createSocket() throws IOException { private Socket createSocket() throws IOException {
@@ -42,7 +45,7 @@ public class TcpClient {
} }
public void sendMessage(String message) throws IOException { public void sendMessage(String message) throws IOException {
this.out.write(message); this.out.println(message);
} }
public String readLine() throws IOException { public String readLine() throws IOException {