目录
- 需求分析
- 系统架构
- 实现细节
- 客户端实现(client.cpp)
- 服务器实现(server.cpp)
- 编译和运行
- 代码解析
- 端口号传递机制
- 信号处理
- 多线程连接
- 条件变量同步
- 总结
- 进一步改进
需求分析
我们的目标是实现以下功能:
- 服务器能够启动多个客户端进程,客户端数量可配置
- 每个客户端在不同的端口上监听
- 服务器能够连接到所有客户端
- 服务器能够同时向所有客户端发送消息
- 客户端接收到消息后发送确认
- 程序能够优雅地处理终止信号
系统架构
整个系统由两个主要组件组成:
客户端程序(Client):
- 在指定端口上监听连接
- 接收服务器发送的消息
- 发送确认消息给服务器
服务器程序(Server):
- 从配置文件读取设置
- 启动多个客户端进程
- 连接到每个客户端
- 向所有客户端发送消息
- 接收客户端的确认消息
实现细节
客户端实现(client.cpp)
客户端程序需要创建一个Socket,在指定端口上监听连接,接收服务器发送的消息,并发送确认。
#include <IOStream> #include <cstring> #include <unistd.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <signal.h> #define BUFFER_SIZE 1024 #define DEFAULT_PORT 8888 bool running = true; void signalHandler(int signum) { std::cout << "Interrupt signal (" << signum << ") received.\n"; running = false; } int main(int argc, char* argv[]) { // 注册信号处理函数,用于优雅关闭 signal(SIGINT, signalHandler); signal(SIGTERM, signalHandler); // 解析命令行参数 int port = DEFAULT_PORT; if (argc > 1) { port = std::stoi(argv[1]); } // 客户端ID(进程ID) pid_t pid = getpid(); std::cout << "Client started with PID: " << pid << std::endl; // 创建Socket int clientSocket = socket(AF_INET, SOCK_STREAM, 0); if (clientSocket < 0) { std::cerr << "Error creating socket" << std::endl; return 1; } // 设置服务器地址 struct sockaddr_in serverAddr; memset(&serverAddr, 0, sizeof(serverAddr)); serverAddr.sin_family = AF_INET; serverAddr.sin_port = htons(port); serverAddr.sin_addr.s_addr = INADDR_ANY; // 绑定Socket到地址 if (bind(clientSocket, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) < 0) { std::cerr << "Error binding socket to port " << port << std::endl; close(clientSocket); return 1; } // 监听连接 if (listen(clientSocket, 5) < 0) { std::cerr << "Error listening on socket" << std::endl; close(clientSocket); return 1; } std::cout << "Client listening on port " << port << std::endl; // 接受服务器连接 struct sockaddr_in serverConnAddr; socklen_t serverLen = sizeof(serverConnAddr); int serverConnection = accept(clientSocket, (struct sockaddr*)&serverConnAddr, &serverLen); if (serverConnection < 0) { std::cerr << "Error accepting connection" << std::endl; close(clientSocket); return 1; } std::cout << "Connected to server at " << inet_ntoa(serverConnAddr.sin_addr) << ":" << ntohs(serverConnAddr.sin_port) << std::endl; // 接收服务器消息 char buffer[BUFFER_SIZE]; while (running) { memset(buffer, 0, BUFFER_SIZE); int bytesReceived = recv(serverConnection, buffer, BUFFER_SIZE - 1, 0); if (bytesReceived > 0) { std::cout << "Message received: " << buffer << std::endl; // 发送确认 std::string ack = "Client " + std::to_string(pid) + " received message"; send(serverConnection, ack.c_str(), ack.length(), 0); } else if (bytesReceived == 0) { std::cout << "Server disconnected" << std::endl; break; } else { if (errno != EINTR) { // 忽略信号中断 std::cerr << "Error receiving data: " << strerror(errno) << std::endl; break; } } } // 清理资源 close(serverConnection); close(clientSocket); std::cout << "Client terminated" << std::endl; return 0; }
服务器实现(server.cpp)
服务器程序需要启动多个客户端进程,连接到每个客户端,并向所有客户端发送消息。
#include <iostream> #include <vector> #include <string> #include <cstring> #include <unistd.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <thread> #include <chrono> #include <fstream> #include <sstream> #include <signal.h> #include <sys/wait.h> #include <mutex> #include <condition_variable> #define BUFFER_SIZE 1024 #define DEFAULT_BASE_PORT 8888 #define DEFAULphpT_NUM_CLIENTS 3 #define CONFIG_FILE "server_config.txt" std::mutex mtx; std::condition_variable cv; bool allClientsConnected = false; int connectedClients = 0; int totalClients = 0; std::vector<pid_t> clientPids; std::vector<int> clientSockets; // 从配置文件读取配置 bool readConfig(int& numClients, int& basePort) { std::ifstream configFile(CONFIG_FILE); if (!configFile.is_open()) { std::cout << "Config file not found, using defaults" << std::endl; return false; } std::string line; while (std::getline(configFile, line)) { std::istringstream iss(line); std::string key; if (std::getline(iss, key, '=')) { std::string value; if (std::getline(iss, value)) { if (key == "NUM_CLIENTS") { numClients = std::stoi(value); } else if (key == "BASE_PORT") { basePort = std::stoi(value); } } } } configFile.close(); return true; } // 写入默认配置到文件 void writeDefaultConfig() { std::ofstream configFile(CONFIG_FIUJdYRLE); if (configFile.is_open()) { configFile << "NUM_CLIENTS=" << DEFAULT_NUM_CLIENTS << std::endl; configFile << "BASE_PORT=" << DEFAULT_BASE_PORT << std::endl; configFile.close(); std::cout << "Created default configuration file" << std::endl; } else { std::cerr << "Unable to create configuration file" << std::endl; } } // 启动客户端进程 pid_t launchClient(int port) { pid_t pid = fork(); if (pid == 0) { // 子进程 std::string portStr = std::to_string(port); execl("./Client", "Client", portStr.c_str(), nullptr); // 如果execl返回,说明出错了 std::cerr << "Error launching client: " << strerror(errno) << std::endl; exit(1); } else if (pid < 0) { // fork失败 std::cerr << "Fork failed: " << strerror(errno) << std::endl; return -1; } // 父进程 return pid; } // 连接到客户端 bool connectToClient(int& clientSocket, int port) { clientSocket = socket(AF_INET, SOCK_STREAM, 0); if (clientSocket < 0) { std::cerr << "Error creating socket for client on port " << port << std::endl; return false; } struct sockaddr_in clientAddr; memset(&clientAddr, 0, sizeof(clientAddr)); clientAddr.sin_family = AF_INET; clientAddr.sin_port = htons(port); clientAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); // 尝试连接,带重试 int retries = 10; while (retries > 0) { if (connect(clientSocket, (struct sockaddr*)&clientAddr, sizeof(clientAddr)) == 0) { std::cout << "Connected to client on port " << port << std::endl; return true; } std::cout << "Connection attempt failed, retrying in 1 second..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); retries--; } std::cerr << "Failed to connect to client on port " << port << std::endl; close(clientSocket); return false; } // 连接客户端的线程函数 void clientConnectionThread(int port, int clientIndex) { int clientSocket; if (connectToClient(clientSocket, port)) { std::lock_guard<std::mutex> lock(mtx); clientSockets[clientIndex] = clientSocket; connectedClients++; if (connectedClients == totalClients) { allClientsConnected = true; cv.notify_one(); } } } // 向所有已连接的客户端发送消息 void sendToAllClients(const std::string& message) { for (int socket : clientSockets) { if (socket > 0) { send(socket, message.c_str(), message.length(), 0); // 接收确认 char buffer[BUFFER_SIZE]; memset(buffer, 0, BUFFER_SIZE); int bytesReceived = recv(socket, buffer, BUFFER_SIZE - 1, 0); if (bytesReceived > 0) { std::cout << "Acknowledgment: " << buffer << std::endl; } } } } // 清理资源 void cleanup() { // 关闭所有客户端Socket for (int socket : clientSockets) { if (socket > 0) { close(socket); } } // 终止所有客户端进程 for (pid_t pid : clientPids) { if (pid > 0) { kill(pid, SIGTERM); waitpid(pid, nullptr, 0); } } } // 信号处理函数 void signalHandler(int signum) { std::cout << "Interrupt signal (" << signum << ") received.\n"; cleanup(); exit(signum); } int main() { // 注册信号处理函数 signal(SIGINT, signalHandler); signal(SIGTERM, signalHandler); // 读取配置 int numClients = DEFAULT_NUM_CLIENTS; int basePort = DEFAULT_BASE_PORT; if (!readConfig(numClients, basePort)) { writeDefaultConfig(); } std::cout << "Starting server with " << numClients << " clients, base port: " << basePort << std::endl; // 初始化客户端向量 totalClients = numClients; clientPids.resize(numClients, -1); clientSockets.resize(numClients, -1); // 启动客户端进程 for (int i = 0; i < numClients; i++) { int port = basePort + i; pid_jst pid = launchClient(port); if (pid > 0) { clientPids[i] = pid; std::cout << "Launched client " << i + 1 << " with PID " << pid << " on port " << port << std::endl; } else { std::cerr << "Failed to launch client " << i + 1 << std::endl; } } // 给客户端启动的时间 std::cout << "Waiting for clients to start..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(2)); // 连接到客户端 std::vector<std::thread> connectionThreads; for (int i = 0; i < numClients; i++) { if (clientPids[i] > 0) { int port = basePort + i; connectionThreads.push_back(std::thread(clientConnectionThread, port, i)); } } // 等待所有连接建立 { std::unique_lock<std::mutex> lock(mtx); if (!allClientsConnected) { std::cout << "Waiting for all clients to connect..." << std::endl; cv.wait(lock, []{ return allClientsConnected; }); } } // 等待所有连接线程结束 for (auto& thread : connectionThreads) { thread.join(); } std::cout << "All clients connected. Ready to send messages." << std::endl; // 主循环 std::string message; while (true) { std::cout << "Enter message to send to all clients (or 'exit' to quit): "; std::getline(std::cin, message); if (message == "exit") { break; } std::cout << "Sending message to all clients..." << std::endl; sendToAllClients(message); } // 清理 cleanup(); std::cout << "Server terminated" << std::endl; return 0; }
编译和运行
为了方便编译,我们可以创建一个Makefile:
CC = g++ CFLAGS = -std=c++11 -Wall -pthread LDFLAGS = -pthread all: Client Server Client: client.cpp $(CC) $(CFLAGS) -o Client client.cpp $(LDFLAGS) Server: server.cpp $(CC) $(CFLAGS) -o Server server.cpp $(LDFLAGS) clean: rm -f Client Server server_config.txt .PHONY: all clean
编译和运行的步骤:
# 编译 make # 运行服务器 ./Server
代码解析
端口号传递机制
服务器如何将端口号传递给客户端是本系统的一个关键点。整个过程如下:
- 服务器为每个客户端分配一个唯一的端口号(基础端口号 + 索引)
- 服务器通过
fork()
创建子进程 - 子进程通过
execl()
执行客户端程序,将端口号作为命令行参数传递 - 客户端程序解析命令行参数获取端口号
- 客户端使用该端口号创建和绑定Socket
- 服务器知道每个客户端的端口号,并使用这些端口号连接到客户端
信号处理
程序使用信号处理机制来实现优雅关闭:
signal(SIGINT, signalHandler); signal(SIGTERM, signalHandler);
这两行代码注册了信号处理函数,当程序接收到SIGINT(通常是按Ctrl+C)或SIGTERM(通常是系统发送的终止信号)时,会调用signalHandler
函数。在这个函数中,程序会清理资源并正常退出。
多线程连接
服务器使用多线程来并行连接到所有客户端:
std::vector<std::thread> connectionThreads; for (int i = 0; i < numClients; 编程i++) { if (clientPids[i] > 0) { int port = basePort + i; connectionThreads.push_back(std::thread(clientConnectionThread, port, i)); } }
这样可以同时尝试连接到所有客户端,而不是一个接一个地连接,提高了效率。
条件变量同步
服务器使用条件变量来等待所有客户端连接完成:
{ std::unique_lock<std::mutex> lock(mtx); if (!allClientsConnected) { javascript std::cout << "Waiting for all clients to connect..." << std::endl; cv.wait(lock, []{ return allClientsConnected; }); } }
当所有客户端都连接成功后,allClientsConnected
变量会被设置为true
,条件变量会通知主线程继续执行。
总结
本文介绍了如何在linux环境下实现一个服务器程序,该程序能够启动多个客户端进程,并通过Socket与这些客户端进行通信。主要特点包括:
- 可配置的客户端数量
- 动态端口分配
- 并行连接
- 广播消息
- 确认机制
- 优雅关闭
这个示例展示了多进程、Socket通信、多线程和同步机制的综合应用,可以作为网络编程的参考实现。
进一步改进
这个示例还可以进一步改进,例如:
- 添加错误恢复机制
- 实现客户端自动重连
- 添加消息队列
- 实现更复杂的通信协议
- 添加安全机制(如TLS加密)
以上就是Linux环境下实现多进程Socket通信功能的详细内容,更多关于Linux多进程Socket通信的资料请关注编程客栈(www.devze.com)其它相关文章!
精彩评论