// TcpServer.java package home; import java.io.*; import java.net.BindException; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; public class TcpServer extends NetworkBase { private ServerSocket serverSocket; private ExecutorService acceptorThread; private ConcurrentHashMap clients = new ConcurrentHashMap<>(); private ClientListListener clientListListener; public interface ClientListListener { void onClientListChanged(java.util.List clients); } public void setClientListListener(ClientListListener listener) { this.clientListListener = listener; } public TcpServer(String host, int port) { this.host = host; this.port = port; } @Override public boolean start() throws Exception { try { InetAddress localAddress = InetAddress.getByName(host); serverSocket = new ServerSocket(port, 50, localAddress); isRunning.set(true); startAcceptor(); notifyStatus("TCP服务器已启动 - " + host + ":" + port, true); return true; } catch (BindException e) { notifyError("端口 " + port + " 被占用,请选择其他端口"); return false; } catch (Exception e) { notifyError("启动TCP服务器失败: " + e.getMessage()); return false; } } @Override public void stop() { isRunning.set(false); try { for (ClientHandler client : clients.values()) { client.stop(); } clients.clear(); updateClientList(); if (serverSocket != null && !serverSocket.isClosed()) { serverSocket.close(); } } catch (Exception e) { // 忽略关闭时的异常 } if (acceptorThread != null) { acceptorThread.shutdownNow(); } notifyStatus("TCP服务器已停止", false); } @Override public boolean sendData(String data) throws Exception { // TCP服务器可以向所有客户端或特定客户端发送数据 // 这里简单实现向所有客户端发送 boolean sent = false; for (ClientHandler client : clients.values()) { if (client.sendData(data)) { sent = true; } } return sent; } /** * 发送字节数据到所有客户端 */ public boolean sendData(byte[] data) throws Exception { boolean sent = false; for (ClientHandler client : clients.values()) { if (client.sendData(data)) { sent = true; } } return sent; } /** * 发送字节数据到特定客户端 */ public boolean sendDataToClient(String clientKey, byte[] data) throws Exception { ClientHandler client = clients.get(clientKey); if (client != null) { return client.sendData(data); } return false; } @Override public boolean isConnected() { return isRunning.get() && serverSocket != null && !serverSocket.isClosed(); } private void startAcceptor() { acceptorThread = Executors.newSingleThreadExecutor(); acceptorThread.execute(() -> { while (isRunning.get() && serverSocket != null && !serverSocket.isClosed()) { try { Socket clientSocket = serverSocket.accept(); ClientHandler clientHandler = new ClientHandler(clientSocket); String clientKey = clientSocket.getInetAddress().getHostAddress() + ":" + clientSocket.getPort(); clients.put(clientKey, clientHandler); updateClientList(); notifyStatus("客户端连接: " + clientKey, true); } catch (Exception e) { if (isRunning.get()) { notifyError("接受客户端连接错误: " + e.getMessage()); } } } }); } private void updateClientList() { if (clientListListener != null) { clientListListener.onClientListChanged(new java.util.ArrayList<>(clients.keySet())); } } private class ClientHandler { private Socket socket; private OutputStream outputStream; private InputStream inputStream; private ExecutorService receiverThread; private AtomicBoolean running = new AtomicBoolean(true); public ClientHandler(Socket socket) throws Exception { this.socket = socket; this.outputStream = socket.getOutputStream(); this.inputStream = socket.getInputStream(); startReceiver(); } public boolean sendData(String data) { if (outputStream != null && !socket.isClosed()) { try { outputStream.write(data.getBytes()); outputStream.flush(); return true; } catch (IOException e) { return false; } } return false; } public boolean sendData(byte[] data) { if (outputStream != null && !socket.isClosed()) { try { outputStream.write(data); outputStream.flush(); return true; } catch (IOException e) { return false; } } return false; } public void stop() { running.set(false); try { if (outputStream != null) outputStream.close(); if (inputStream != null) inputStream.close(); if (socket != null) socket.close(); } catch (Exception e) { // 忽略关闭时的异常 } if (receiverThread != null) { receiverThread.shutdownNow(); } } private void startReceiver() { receiverThread = Executors.newSingleThreadExecutor(); receiverThread.execute(() -> { byte[] buffer = new byte[1024]; while (running.get() && socket != null && !socket.isClosed()) { try { int bytesRead = inputStream.read(buffer); if (bytesRead > 0) { byte[] receivedData = new byte[bytesRead]; System.arraycopy(buffer, 0, receivedData, 0, bytesRead); String fromAddress = socket.getInetAddress().getHostAddress() + ":" + socket.getPort(); // 调用新的字节数组版本的notifyData方法 notifyData(receivedData, fromAddress); } else if (bytesRead == -1) { // 客户端断开连接 break; } } catch (IOException e) { if (running.get()) { // 客户端连接错误 break; } } } // 清理客户端 String clientKey = socket.getInetAddress().getHostAddress() + ":" + socket.getPort(); clients.remove(clientKey); updateClientList(); notifyStatus("客户端断开: " + clientKey, false); }); } } /** * 通知数据接收(字节数组版本) */ protected void notifyData(byte[] data, String fromAddress) { if (dataListener != null) { dataListener.onDataReceived(data, fromAddress); } } }