Biggest update in Tic Tac Toe kind

This commit is contained in:
lieght
2025-09-17 15:08:44 +02:00
parent b070906386
commit 6c0dd220a4
8 changed files with 364 additions and 187 deletions

View File

@@ -23,16 +23,46 @@ public class Main {
initSystems();
ConsoleGui console = new ConsoleGui();
GameBase.State state = GameBase.State.INVALID;
CompletableFuture<String> serverIdFuture = new CompletableFuture<>();
GlobalEventBus.post(new Events.ServerEvents.StartServerRequest("5001", "tictactoe", serverIdFuture));
String serverId = serverIdFuture.get();
console.print();
CompletableFuture<String> connectionIdFuture = new CompletableFuture<>();
GlobalEventBus.post(new Events.ServerEvents.StartConnectionRequest("127.0.0.1", "5001", connectionIdFuture));
String connectionId = connectionIdFuture.get();
do {
console.print();
} while (console.next());
CompletableFuture<String> ticTacToeGame = new CompletableFuture<>();
GlobalEventBus.post(new Events.ServerEvents.CreateTicTacToeGameRequest(serverId, "J", "P", ticTacToeGame));
String ticTacToeGameId = ticTacToeGame.get();
GlobalEventBus.post(new Events.ServerEvents.RunTicTacToeGame(serverId, ticTacToeGameId));
console.print();
GlobalEventBus.post(new Events.ServerEvents.Command(
connectionId,
"gameid " + ticTacToeGameId, "player J", "MOVE", "0"
));
GlobalEventBus.post(new Events.ServerEvents.Command(
connectionId,
"gameid " + ticTacToeGameId, "player P", "MOVE", "1"
));
// for (int x = 0; x < 20000000; x++) {
// CompletableFuture<String> ticTacToeGame = new CompletableFuture<>();
// GlobalEventBus.post(new Events.ServerEvents.CreateTicTacToeGameRequest(serverId, "J"+x, "P"+x, ticTacToeGame));
// String ticTacToeGameId = ticTacToeGame.get();
// GlobalEventBus.post(new Events.ServerEvents.RunTicTacToeGame(serverId, ticTacToeGameId));
// GlobalEventBus.post(new Events.ServerEvents.Command(connectionId, "MOVE", "" + x));
// }
// ConsoleGui console = new ConsoleGui();
// GameBase.State state = GameBase.State.INVALID;
//
// console.print();
//
// do {
// console.print();
// } while (console.next());
//
// console.print();
}
public static void initSystems() {

View File

@@ -129,7 +129,7 @@ public class Events implements IEvents {
/**
* Triggers sending a command to a server.
*/
public record Command(String connectionId, String command, String... args) { }
public record Command(String connectionId, String... args) { }
/**
* Triggers when a command is sent to a server.

View File

@@ -1,5 +1,7 @@
package org.toop.game.tictactoe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.game.*;
import org.toop.server.backend.tictactoe.*;
@@ -7,7 +9,11 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TicTacToe extends GameBase implements Runnable {
protected static final Logger logger = LogManager.getLogger(TicTacToe.class);
public Thread gameThread;
public String gameId;
public BlockingQueue<ParsedCommand> commandQueue = new LinkedBlockingQueue<>();
public BlockingQueue<String> sendQueue = new LinkedBlockingQueue<>();
@@ -18,10 +24,28 @@ public class TicTacToe extends GameBase implements Runnable {
movesLeft = size * size;
}
/**
*
* Used for the server.
*
* @param player1
* @param player2
* @param gameId
*/
public TicTacToe(String player1, String player2, String gameId) {
super(3, new Player(player1, 'X'), new Player(player2, 'O'));
movesLeft = size * size;
}
public void addCommandToQueue(ParsedCommand command) {
commandQueue.add(command);
}
private void addSendToQueue(String send) throws InterruptedException {
sendQueue.put(send);
}
@Override
public void run() {
this.gameThread = new Thread(this::gameThread);
@@ -33,12 +57,16 @@ public class TicTacToe extends GameBase implements Runnable {
// String command = getNewestCommand();
// command = this.parseCommand(command).toString();
// if (command == null) { continue; }
try {
ParsedCommand cmd = this.commandQueue.take();
logger.info("Game {}, took command: {}", this.gameId, cmd.originalCommand); // TODO: Fix null gameid
this.addSendToQueue("OK");
} catch (InterruptedException e) {
logger.error("Game {} has crashed.", this.gameId);
throw new RuntimeException(e);
}
if (commandQueue.poll() == null) {
continue;
}
// TODO: Game use the commandQueue to get the commands.
// TODO: Game use the commandQueue to get the commands.
}
}

View File

@@ -4,28 +4,50 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.server.backend.tictactoe.ParsedCommand;
import java.io.*;
import java.net.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
/**
* Lightweight, thread-pool based TCP server base class.
*
* Responsibilities:
* - accept sockets
* - hand off socket I/O to connectionExecutor (pooled threads)
* - provide thread-safe queues (receivedQueue / sendQueue) to subclasses
*
* Notes:
* - Subclasses should consume receivedQueue (or call getNewestCommand()) and
* use sendQueue to send messages to all clients (or per-client, if implemented).
*/
public class TcpServer implements Runnable {
protected static final Logger logger = LogManager.getLogger(TcpServer.class);
private final ExecutorService executor = Executors.newFixedThreadPool(2);
public final BlockingQueue<String> receivedQueue = new LinkedBlockingQueue<>();
public final BlockingQueue<ParsedCommand> commandQueue = new LinkedBlockingQueue<>();
// Executor used for per-connection I/O tasks (reading/writing)
protected final ExecutorService connectionExecutor = Executors.newCachedThreadPool();
// Shared queues for subclasses / consumers
public final BlockingQueue<String> receivedQueue = new LinkedBlockingQueue<>(); // unbounded; you may choose bounded
public final BlockingQueue<String> sendQueue = new LinkedBlockingQueue<>();
public final Map<Socket, String> knownPlayers = new HashMap<>();
public final Map<String, String> playersGames = new HashMap<>();
public final int WAIT_TIME = 500; // MS
// (Optional) if you want to associate sockets -> player ids
public final Map<Socket, String> knownPlayers = new ConcurrentHashMap<>();
public final Map<String, String> playersGames = new ConcurrentHashMap<>();
// tunables
public final int WAIT_TIME = 500; // ms used by poll-based methods
public final int RETRY_ATTEMPTS = 3;
protected int port;
protected ServerSocket serverSocket = null;
private boolean running = true;
protected final int port;
protected final ServerSocket serverSocket;
private volatile boolean running = true;
public TcpServer(int port) throws IOException {
this.port = port;
@@ -33,125 +55,148 @@ public class TcpServer implements Runnable {
}
public boolean isRunning() {
return this.running;
return running;
}
/**
* Default run: accept connections and hand off to connectionExecutor.
* Subclasses overriding run() should still call startWorkers(Socket) for each accepted socket.
*/
@Override
public void run() {
logger.info("Server listening on port {}", port);
try {
logger.info("Server listening on port {}", port);
while (running) {
Socket clientSocket = this.serverSocket.accept();
logger.info("Connected to client: {}", clientSocket.getInetAddress());
new Thread(() -> this.startWorkers(clientSocket)).start();
Socket clientSocket = serverSocket.accept();
logger.info("Accepted connection from {}", clientSocket.getRemoteSocketAddress());
// hand off to pool to manage I/O for this socket
connectionExecutor.submit(() -> startWorkers(clientSocket));
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void runGame() {}
public void endGame() {}
public void newGame() {}
protected String sendServerMessage() {
try { return sendQueue.poll(this.WAIT_TIME, TimeUnit.MILLISECONDS); }
catch (InterruptedException e) {
logger.error("Interrupted", e);
return null;
}
}
protected ParsedCommand getNewestCommand() {
try {
String rec = receivedQueue.poll(this.WAIT_TIME, TimeUnit.MILLISECONDS);
if (rec != null) {
return new ParsedCommand(rec);
if (running) {
logger.error("Accept failed", e);
} else {
logger.info("Server socket closed, stopping acceptor");
}
}
catch (InterruptedException e) {
logger.error("Interrupted", e);
return null;
}
return null;
}
//
// protected void sendMessage(String message) throws InterruptedException {
// sendQueue.put(message);
// }
/**
* Listen/Write workers for an accepted client socket.
* This method submits two tasks to the connectionExecutor:
* - inputLoop: reads lines and enqueues them to receivedQueue
* - outputLoop: polls sendQueue and writes messages to the client
*
* Note: This is a simple model where sendQueue is global; if you need per-client
* send-queues, adapt this method to use one per socket.
*/
protected void startWorkers(Socket clientSocket) {
running = true;
try {
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
this.executor.submit(() -> this.inputLoop(in));
this.executor.submit(() -> this.outputLoop(out));
} catch (Exception e) {
logger.error("Server could not start, {}", e);
}
// Input task: read lines and put them on receivedQueue
Runnable inputTask = () -> {
logger.info("Starting read loop for {}", clientSocket.getRemoteSocketAddress());
try {
String line;
while (running && (line = in.readLine()) != null) {
if (line.isEmpty()) continue;
logger.debug("Received from {}: {}", clientSocket.getRemoteSocketAddress(), line);
}
private void stopWorkers() {
this.running = false;
this.receivedQueue.clear();
this.sendQueue.clear();
this.executor.shutdownNow();
}
private void inputLoop(BufferedReader in) {
logger.info("Starting {} connection read", this.port);
try {
String message;
while (running && (message = in.readLine()) != null) {
logger.info("Received: '{}'", message);
if (!message.isEmpty()) {
String finalMessage = message;
new Thread(() -> {
for (int i = 0; i < this.RETRY_ATTEMPTS; i++) {
if (this.receivedQueue.offer(finalMessage)) break;
boolean offered = false;
for (int i = 0; i < RETRY_ATTEMPTS && !offered; i++) {
try {
// Use offer to avoid blocking indefinitely; adapt timeout/policy as needed
offered = this.receivedQueue.offer(line, 200, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}).start();
if (!offered) {
logger.warn("Backpressure: dropping line from {}: {}", clientSocket.getRemoteSocketAddress(), line);
// Policy choice: drop, notify, or close connection. We drop here.
}
}
} catch (IOException e) {
logger.info("Connection closed by remote: {}", clientSocket.getRemoteSocketAddress());
} finally {
try {
clientSocket.close();
} catch (IOException ignored) {}
logger.info("Stopped read loop for {}", clientSocket.getRemoteSocketAddress());
}
}
};
// Output task: poll global sendQueue and write to this specific client.
// NOTE: With a single global sendQueue, every message is sent to every connected client.
// If you want per-client sends, change this to use per-client queue map.
Runnable outputTask = () -> {
logger.info("Starting write loop for {}", clientSocket.getRemoteSocketAddress());
try {
while (running && !clientSocket.isClosed()) {
String msg = sendQueue.poll(WAIT_TIME, TimeUnit.MILLISECONDS);
if (msg != null) {
out.println(msg);
out.flush();
logger.debug("Sent to {}: {}", clientSocket.getRemoteSocketAddress(), msg);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info("Writer interrupted for {}", clientSocket.getRemoteSocketAddress());
} catch (Exception e) {
logger.error("Writer error for {}: {}", clientSocket.getRemoteSocketAddress(), e.toString());
} finally {
try {
clientSocket.close();
} catch (IOException ignored) {}
logger.info("Stopped write loop for {}", clientSocket.getRemoteSocketAddress());
}
};
// Submit tasks - they will run on the shared connectionExecutor
connectionExecutor.submit(inputTask);
connectionExecutor.submit(outputTask);
} catch (IOException e) {
logger.error("Error reading from server", e);
} finally {
logger.error("Could not start workers for client: {}", e.toString());
try {
this.serverSocket.close();
logger.info("Client disconnected. {}", this.port);
} catch (IOException e) {
e.printStackTrace();
}
clientSocket.close();
} catch (IOException ignored) {}
}
}
private void outputLoop(PrintWriter out) {
logger.info("Starting {} connection write", this.port);
/**
* Convenience: wrapper to obtain the latest command (non-blocking poll).
* Subclasses can use this, but for blocking behavior consider using receivedQueue.take()
*/
protected ParsedCommand getNewestCommand() {
try {
while (this.running) {
String send = this.sendQueue.poll(this.WAIT_TIME, TimeUnit.MILLISECONDS);
if (send != null) {
out.println(send);
logger.info("Sent message from server {}: '{}'", this.port, send);
}
}
String rec = receivedQueue.poll(WAIT_TIME, TimeUnit.MILLISECONDS);
if (rec != null) return new ParsedCommand(rec);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while polling receivedQueue", e);
}
return null;
}
/**
* Stop server and cleanup executors/sockets.
*/
public void stop() {
stopWorkers();
logger.info("sendQueue: {}", this.sendQueue.toString());
logger.info("receivedQueue: {}", this.receivedQueue.toString());
running = false;
try {
serverSocket.close();
} catch (IOException ignored) {}
connectionExecutor.shutdownNow();
logger.info("TcpServer stopped. receivedQueue size={}, sendQueue size={}",
receivedQueue.size(), sendQueue.size());
}
}

View File

@@ -1,8 +1,16 @@
package org.toop.server.backend.tictactoe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.Main;
import java.util.ArrayList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class ParsedCommand {
private static final Logger logger = LogManager.getLogger(ParsedCommand.class);
public TicTacToeServerCommand command;
public ArrayList<Object> arguments;
public boolean isValidCommand;
@@ -10,10 +18,15 @@ public class ParsedCommand {
public TicTacToeServerMessage returnMessage;
public String errorMessage;
public String originalCommand;
public String gameId;
public String player;
public ParsedCommand(String receivedCommand) {
if (receivedCommand.isEmpty()) {
logger.info("Received empty command");
this.gameId = null;
this.player = null;
this.command = null;
this.arguments = null;
this.isValidCommand = false;
@@ -24,6 +37,30 @@ public class ParsedCommand {
return;
}
// Case-insensitive regex to match: game_id {id} player {name}
Pattern pattern = Pattern.compile(
"(?i)\\bgame[_]?id\\s+(\\S+)\\s+player\\s+(\\S+)", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(receivedCommand);
String tempGameId = null;
String tempPlayer = null;
String tempPayload = receivedCommand;
if (matcher.find()) {
tempGameId = matcher.group(1); // first capture group → game_id
tempPlayer = matcher.group(2); // second capture group → player
// Remove the matched part from the original command
tempPayload = matcher.replaceFirst("").trim();
}
this.gameId = tempGameId;
this.player = tempPlayer;
receivedCommand = tempPayload;
logger.info("Received gameId: {}", gameId);
logger.info("Received player: {}", player);
logger.info("Received command: {}", receivedCommand);
String[] segments = receivedCommand.split(" ");
if (segments[0].isEmpty()) {
this.command = null;

View File

@@ -2,106 +2,154 @@ package org.toop.server.backend.tictactoe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.toop.game.tictactoe.*;
import org.toop.game.tictactoe.TicTacToe;
import org.toop.server.backend.TcpServer;
import java.io.IOException;
import java.net.Socket;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
public class TicTacToeServer extends TcpServer {
protected static final Logger logger = LogManager.getLogger(TicTacToeServer.class);
/**
* Map of gameId -> Game instances
*/
private final ExecutorService connectionExecutor = Executors.newCachedThreadPool(); // socket I/O
private final ExecutorService dispatcherExecutor;
private final ExecutorService forwarderExecutor = Executors.newSingleThreadExecutor();
private final BlockingQueue<ParsedCommand> incomingCommands;
private final Map<String, TicTacToe> games = new ConcurrentHashMap<>();
public TicTacToeServer(int port) throws IOException {
super(port);
int dispatchers = Math.max(2, Runtime.getRuntime().availableProcessors());
this.dispatcherExecutor = Executors.newFixedThreadPool(dispatchers + 1); // TODO: Magic number for forwardMessages
this.incomingCommands = new LinkedBlockingQueue<>(5_000);
forwarderExecutor.submit(this::forwardLoop);
for (int i = 0; i < dispatchers; i++) {
dispatcherExecutor.submit(this::dispatchLoop);
}
}
@Override
public void run() {
try {
logger.info("Tic tac toe server listening on port {}", this.port);
logger.info("TicTacToe server listening on port {}", this.port);
while (isRunning()) {
Socket clientSocket = this.serverSocket.accept();
logger.info("Connected to client: {}", clientSocket.getInetAddress());
new Thread(() -> this.startWorkers(clientSocket)).start();
new Thread(this::gameManagerThread).start();
connectionExecutor.submit(() -> this.startWorkers(clientSocket));
}
} catch (IOException e) {
e.printStackTrace();
logger.error("I/O error in server run loop", e);
}
}
@Override
protected ParsedCommand getNewestCommand() {
/**
* Forwards raw messages from TcpServer.receivedQueue into ParsedCommand objects.
*/
private void forwardLoop() {
logger.info("Forwarder loop started");
try {
String rec = receivedQueue.poll(this.WAIT_TIME, TimeUnit.MILLISECONDS);
if (rec != null) {
return new ParsedCommand(rec);
while (isRunning()) {
String raw = this.receivedQueue.take(); // blocks
logger.info("Received command: {}", raw);
try {
ParsedCommand pc = new ParsedCommand(raw);
this.incomingCommands.put(pc); // blocks if full
} catch (Exception e) {
logger.warn("Invalid message ignored: {}", raw, e);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info("Forwarder loop interrupted");
}
catch (InterruptedException e) {
logger.error("Interrupted", e);
return null;
}
return null;
}
public void gameManagerThread() {
while (true) { // TODO: Very heavy on thread
try {
synchronized (this) {
wait(250);
} // Fixes current thread is not owner.
} catch (InterruptedException e) {
logger.error("Interrupted", e);
}
ParsedCommand command = getNewestCommand();
if (command != null && !command.isServerCommand) {
TicTacToe testGame = games.values().iterator().next(); // TODO: Is to get first for testing, must be done a different way later.
testGame.addCommandToQueue(command);
logger.info("Added command to the game queue: {}", command);
return;
/**
* Dispatches parsed commands into the game logic.
*/
private void dispatchLoop() {
logger.info("Dispatcher thread started");
try {
while (isRunning()) {
ParsedCommand command = this.incomingCommands.take(); // blocks
if (command.isServerCommand) {
handleServerCommand(command);
continue;
}
// Find game by ID
TicTacToe game = this.games.get(command.gameId);
if (game != null) {
game.addCommandToQueue(command);
logger.info("Dispatched command {} to game {}", command.toString(), command.gameId);
} else {
logger.warn("No active game with ID {} for command {}", command.gameId, command.toString());
// TODO: reply back
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info("Dispatcher interrupted");
}
}
private void handleServerCommand(ParsedCommand command) {
// TODO
}
public void forwardGameMessages(TicTacToe game) {
dispatcherExecutor.submit(() -> {
try {
while (isRunning()) {
String msg = game.sendQueue.take(); // blocks until a message is added to the queue
logger.info("Adding: {} to the send queue", msg);
this.sendQueue.put(msg); // push to network layer
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
public String newGame(String playerA, String playerB) {
logger.info("Creating a new game: {} vs {}", playerA, playerB);
String gameId = UUID.randomUUID().toString();
TicTacToe game = new TicTacToe(playerA, playerB);
TicTacToe game = new TicTacToe(playerA, playerB, gameId);
this.games.put(gameId, game);
// this.knownPlayers.put(sockA, playerA); // TODO: For remembering players and validation.
// this.knownPlayers.put(sockB, playerB);
// this.playersGames.put(playerA, gameId);
// this.playersGames.put(playerB, gameId);
logger.info("Created a new game: {}. {} vs {}", gameId, playerA, playerB);
forwardGameMessages(game);
logger.info("Created new game: {}. {} vs {}", gameId, playerA, playerB);
return gameId;
}
public void runGame(String gameId) {
TicTacToe game = this.games.get(gameId);
game.run();
logger.info("Running game: {}, players: {}", gameId, game.getPlayers());
if (game != null) {
game.run();
logger.info("Running game: {}, players: {}", gameId, game.getPlayers());
} else {
logger.warn("Tried to run unknown game {}", gameId);
}
}
public void endGame(String gameId) {
TicTacToe game = this.games.get(gameId);
this.games.remove(gameId);
// this.knownPlayers.put(sockA, playerA); // TODO: Remove players when game is done.
// this.knownPlayers.put(sockB, playerB);
// this.playersGames.put(playerA, gameId);
// this.playersGames.put(playerB, gameId);
logger.info("Ended game: {}", gameId);
// TODO: Multithreading, close game in a graceful matter, etc.
TicTacToe game = this.games.remove(gameId);
if (game != null) {
logger.info("Ended game: {}", gameId);
// TODO: gracefully stop game thread
} else {
logger.warn("Tried to end unknown game {}", gameId);
}
}
}

View File

@@ -55,12 +55,12 @@ public class ConnectionManager {
));
}
private void handleCommand(Events.ServerEvents.Command event) {
private void handleCommand(Events.ServerEvents.Command event) { // TODO: Move this to ServerConnection class, keep it internal.
ServerConnection serverConnection = this.serverConnections.get(event.connectionId());
if (serverConnection != null) {
serverConnection.sendCommandByString(event.command(), event.args());
serverConnection.sendCommandByString(event.args());
} else {
logger.warn("Server {} not found for command '{}'", event.connectionId(), event.command());
logger.warn("Server {} not found for command '{}'", event.connectionId(), event.args());
}
}

View File

@@ -25,6 +25,7 @@ public final class ServerConnection implements Runnable {
volatile boolean running = false;
public ServerConnection(String uuid, String ip, String port) {
this.uuid = uuid;
this.ip = ip;
this.port = port;
this.initEvents();
@@ -45,30 +46,18 @@ public final class ServerConnection implements Runnable {
* @param command The command to send to the server.
* @param args The arguments for the command.
*/
public void sendCommandByString(String command, String... args) {
if (!TicTacToeServerCommand.isValid(command)) {
logger.error("Invalid command: {}", command);
return;
}
System.out.println();
public void sendCommandByString(String... args) {
// if (!TicTacToeServerCommand.isValid(command)) {
// logger.error("Invalid command: {}", command);
// return;
// } // TODO: DO I CARE?
if (!this.running) {
logger.warn("Server has been stopped");
return;
}
for (int i = 0; i < args.length; i++) {
args[i] = args[i].trim();
if (args[i].isEmpty()) {
throw new IllegalArgumentException("Empty argument"); // TODO: Error handling, just crashes atm.
}
}
String[] fullCommand = new String[args.length + 1];
fullCommand[0] = command;
System.arraycopy(args, 0, fullCommand, 1, args.length);
command = String.join(" ", fullCommand);
String command = String.join(" ", args);
this.commandQueue.add(command);
logger.info("Command '{}' added to the queue", command);