开发者

Linux环境下实现多进程Socket通信功能

开发者 https://www.devze.com 2025-05-30 10:00 出处:网络 作者: 天天进步2015
目录需求分析系统架构实现细节客户端实现(client.cpp)服务器实现(server.cpp)编译和运行代码解析端口号传递机制信号处理多线程连接条件变量同步总结进一步改进需求分析
目录
  • 需求分析
  • 系统架构
  • 实现细节
    • 客户端实现(client.cpp)
    • 服务器实现(server.cpp)
    • 编译和运行
  • 代码解析
    • 端口号传递机制
    • 信号处理
    • 多线程连接
    • 条件变量同步
  • 总结
    • 进一步改进

      需求分析

      我们的目标是实现以下功能:

      1. 服务器能够启动多个客户端进程,客户端数量可配置
      2. 每个客户端在不同的端口上监听
      3. 服务器能够连接到所有客户端
      4. 服务器能够同时向所有客户端发送消息
      5. 客户端接收到消息后发送确认
      6. 程序能够优雅地处理终止信号

      系统架构

      整个系统由两个主要组件组成:

      1. 客户端程序(Client)

        • 在指定端口上监听连接
        • 接收服务器发送的消息
        • 发送确认消息给服务器
      2. 服务器程序(Server)

        • 从配置文件读取设置
        • 启动多个客户端进程
        • 连接到每个客户端
        • 向所有客户端发送消息
        • 接收客户端的确认消息

      实现细节

      客户端实现(client.cpp)

      客户端程序需要创建一个Socket,在指定端口上监听连接,接收服务器发送的消息,并发送确认。

      #include <IOStream>
      #include <cstring>
      #include <unistd.h>
      #include <sys/socket.h>
      #include <netinet/in.h>
      #include <arpa/inet.h>
      #include <signal.h>
      
      #define BUFFER_SIZE 1024
      #define DEFAULT_PORT 8888
      
      bool running = true;
      
      void signalHandler(int signum) {
          std::cout << "Interrupt signal (" << signum << ") received.\n";
          running = false;
      }
      
      int main(int argc, char* argv[]) {
          // 注册信号处理函数,用于优雅关闭
          signal(SIGINT, signalHandler);
          signal(SIGTERM, signalHandler);
          
          // 解析命令行参数
          int port = DEFAULT_PORT;
          if (argc > 1) {
              port = std::stoi(argv[1]);
          }
          
          // 客户端ID(进程ID)
          pid_t pid = getpid();
          std::cout << "Client started with PID: " << pid << std::endl;
          
          // 创建Socket
          int clientSocket = socket(AF_INET, SOCK_STREAM, 0);
          if (clientSocket < 0) {
              std::cerr << "Error creating socket" << std::endl;
              return 1;
          }
          
          // 设置服务器地址
          struct sockaddr_in serverAddr;
          memset(&serverAddr, 0, sizeof(serverAddr));
          serverAddr.sin_family = AF_INET;
          serverAddr.sin_port = htons(port);
          serverAddr.sin_addr.s_addr = INADDR_ANY;
          
          // 绑定Socket到地址
          if (bind(clientSocket, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) < 0) {
              std::cerr << "Error binding socket to port " << port << std::endl;
              close(clientSocket);
              return 1;
          }
          
          // 监听连接
          if (listen(clientSocket, 5) < 0) {
              std::cerr << "Error listening on socket" << std::endl;
              close(clientSocket);
              return 1;
          }
          
          std::cout << "Client listening on port " << port << std::endl;
          
          // 接受服务器连接
          struct sockaddr_in serverConnAddr;
          socklen_t serverLen = sizeof(serverConnAddr);
          int serverConnection = accept(clientSocket, (struct sockaddr*)&serverConnAddr, &serverLen);
          
          if (serverConnection < 0) {
              std::cerr << "Error accepting connection" << std::endl;
              close(clientSocket);
              return 1;
          }
          
          std::cout << "Connected to server at " 
                    << inet_ntoa(serverConnAddr.sin_addr) 
                    << ":" << ntohs(serverConnAddr.sin_port) 
                    << std::endl;
          
          // 接收服务器消息
          char buffer[BUFFER_SIZE];
          while (running) {
              memset(buffer, 0, BUFFER_SIZE);
              int bytesReceived = recv(serverConnection, buffer, BUFFER_SIZE - 1, 0);
              
              if (bytesReceived > 0) {
                  std::cout << "Message received: " << buffer << std::endl;
                  
                  // 发送确认
                  std::string ack = "Client " + std::to_string(pid) + " received message";
                  send(serverConnection, ack.c_str(), ack.length(), 0);
              } 
              else if (bytesReceived == 0) {
                  std::cout << "Server disconnected" << std::endl;
                  break;
              }
              else {
                  if (errno != EINTR) { // 忽略信号中断
                      std::cerr << "Error receiving data: " << strerror(errno) << std::endl;
                      break;
                  }
              }
          }
          
          // 清理资源
          close(serverConnection);
          close(clientSocket);
          std::cout << "Client terminated" << std::endl;
          
          return 0;
      }
      

      服务器实现(server.cpp)

      服务器程序需要启动多个客户端进程,连接到每个客户端,并向所有客户端发送消息。

      #include <iostream>
      #include <vector>
      #include <string>
      #include <cstring>
      #include <unistd.h>
      #include <sys/socket.h>
      #include <netinet/in.h>
      #include <arpa/inet.h>
      #include <thread>
      #include <chrono>
      #include <fstream>
      #include <sstream>
      #include <signal.h>
      #include <sys/wait.h>
      #include <mutex>
      #include <condition_variable>
      
      #define BUFFER_SIZE 1024
      #define DEFAULT_BASE_PORT 8888
      #define DEFAULphpT_NUM_CLIENTS 3
      #define CONFIG_FILE "server_config.txt"
      
      std::mutex mtx;
      std::condition_variable cv;
      bool allClientsConnected = false;
      int connectedClients = 0;
      int totalClients = 0;
      std::vector<pid_t> clientPids;
      std::vector<int> clientSockets;
      
      // 从配置文件读取配置
      bool readConfig(int& numClients, int& basePort) {
          std::ifstream configFile(CONFIG_FILE);
          if (!configFile.is_open()) {
              std::cout << "Config file not found, using defaults" << std::endl;
              return false;
          }
          
          std::string line;
          while (std::getline(configFile, line)) {
              std::istringstream iss(line);
              std::string key;
              if (std::getline(iss, key, '=')) {
                  std::string value;
                  if (std::getline(iss, value)) {
                      if (key == "NUM_CLIENTS") {
                          numClients = std::stoi(value);
                      } else if (key == "BASE_PORT") {
                          basePort = std::stoi(value);
                      }
                  }
              }
          }
          
          configFile.close();
          return true;
      }
      
      // 写入默认配置到文件
      void writeDefaultConfig() {
          std::ofstream configFile(CONFIG_FIUJdYRLE);
          if (configFile.is_open()) {
              configFile << "NUM_CLIENTS=" << DEFAULT_NUM_CLIENTS << std::endl;
              configFile << "BASE_PORT=" << DEFAULT_BASE_PORT << std::endl;
              configFile.close();
              std::cout << "Created default configuration file" << std::endl;
          } else {
              std::cerr << "Unable to create configuration file" << std::endl;
          }
      }
      
      // 启动客户端进程
      pid_t launchClient(int port) {
          pid_t pid = fork();
          
          if (pid == 0) {
              // 子进程
              std::string portStr = std::to_string(port);
              execl("./Client", "Client", portStr.c_str(), nullptr);
              
              // 如果execl返回,说明出错了
              std::cerr << "Error launching client: " << strerror(errno) << std::endl;
              exit(1);
          } else if (pid < 0) {
              // fork失败
              std::cerr << "Fork failed: " << strerror(errno) << std::endl;
              return -1;
          }
          
          // 父进程
          return pid;
      }
      
      // 连接到客户端
      bool connectToClient(int& clientSocket, int port) {
          clientSocket = socket(AF_INET, SOCK_STREAM, 0);
          if (clientSocket < 0) {
              std::cerr << "Error creating socket for client on port " << port << std::endl;
              return false;
          }
          
          struct sockaddr_in clientAddr;
          memset(&clientAddr, 0, sizeof(clientAddr));
          clientAddr.sin_family = AF_INET;
          clientAddr.sin_port = htons(port);
          clientAddr.sin_addr.s_addr = inet_addr("127.0.0.1");
          
          // 尝试连接,带重试
          int retries = 10;
          while (retries > 0) {
              if (connect(clientSocket, (struct sockaddr*)&clientAddr, sizeof(clientAddr)) == 0) {
                  std::cout << "Connected to client on port " << port << std::endl;
                  return true;
              }
              
              std::cout << "Connection attempt failed, retrying in 1 second..." << std::endl;
              std::this_thread::sleep_for(std::chrono::seconds(1));
              retries--;
          }
          
          std::cerr << "Failed to connect to client on port " << port << std::endl;
          close(clientSocket);
          return false;
      }
      
      // 连接客户端的线程函数
      void clientConnectionThread(int port, int clientIndex) {
          int clientSocket;
          if (connectToClient(clientSocket, port)) {
              std::lock_guard<std::mutex> lock(mtx);
              clientSockets[clientIndex] = clientSocket;
              connectedClients++;
              
              if (connectedClients == totalClients) {
                  allClientsConnected = true;
                  cv.notify_one();
              }
          }
      }
      
      // 向所有已连接的客户端发送消息
      void sendToAllClients(const std::string& message) {
          for (int socket : clientSockets) {
              if (socket > 0) {
                  send(socket, message.c_str(), message.length(), 0);
                  
                  // 接收确认
                  char buffer[BUFFER_SIZE];
                  memset(buffer, 0, BUFFER_SIZE);
                  int bytesReceived = recv(socket, buffer, BUFFER_SIZE - 1, 0);
                  if (bytesReceived > 0) {
                      std::cout << "Acknowledgment: " << buffer << std::endl;
                  }
              }
          }
      }
      
      // 清理资源
      void cleanup() {
          // 关闭所有客户端Socket
          for (int socket : clientSockets) {
              if (socket > 0) {
                  close(socket);
              }
          }
          
          // 终止所有客户端进程
          for (pid_t pid : clientPids) {
              if (pid > 0) {
                  kill(pid, SIGTERM);
                  waitpid(pid, nullptr, 0);
              }
          }
      }
      
      // 信号处理函数
      void signalHandler(int signum) {
          std::cout << "Interrupt signal (" << signum << ") received.\n";
          cleanup();
          exit(signum);
      }
      
      int main() {
          // 注册信号处理函数
          signal(SIGINT, signalHandler);
          signal(SIGTERM, signalHandler);
          
          // 读取配置
          int numClients = DEFAULT_NUM_CLIENTS;
          int basePort = DEFAULT_BASE_PORT;
          
          if (!readConfig(numClients, basePort)) {
              writeDefaultConfig();
          }
          
          std::cout << "Starting server with " << numClients << " clients, base port: " << basePort << std::endl;
          
          // 初始化客户端向量
          totalClients = numClients;
          clientPids.resize(numClients, -1);
          clientSockets.resize(numClients, -1);
          
          // 启动客户端进程
          for (int i = 0; i < numClients; i++) {
              int port = basePort + i;
              pid_jst pid = launchClient(port);
              
              if (pid > 0) {
                  clientPids[i] = pid;
                  std::cout << "Launched client " << i + 1 << " with PID " << pid << " on port " << port << std::endl;
              } else {
                  std::cerr << "Failed to launch client " << i + 1 << std::endl;
              }
          }
          
          // 给客户端启动的时间
          std::cout << "Waiting for clients to start..." << std::endl;
          std::this_thread::sleep_for(std::chrono::seconds(2));
          
          // 连接到客户端
          std::vector<std::thread> connectionThreads;
          for (int i = 0; i < numClients; i++) {
              if (clientPids[i] > 0) {
                  int port = basePort + i;
                  connectionThreads.push_back(std::thread(clientConnectionThread, port, i));
              }
          }
          
          // 等待所有连接建立
          {
              std::unique_lock<std::mutex> lock(mtx);
              if (!allClientsConnected) {
                  std::cout << "Waiting for all clients to connect..." << std::endl;
                  cv.wait(lock, []{ return allClientsConnected; });
              }
          }
          
          // 等待所有连接线程结束
          for (auto& thread : connectionThreads) {
              thread.join();
          }
          
          std::cout << "All clients connected. Ready to send messages." << std::endl;
          
          // 主循环
          std::string message;
          while (true) {
              std::cout << "Enter message to send to all clients (or 'exit' to quit): ";
              std::getline(std::cin, message);
              
              if (message == "exit") {
                  break;
              }
              
              std::cout << "Sending message to all clients..." << std::endl;
              sendToAllClients(message);
          }
          
          // 清理
          cleanup();
          std::cout << "Server terminated" << std::endl;
          
          return 0;
      }
      

      编译和运行

      为了方便编译,我们可以创建一个Makefile:

      CC = g++
      CFLAGS = -std=c++11 -Wall -pthread
      LDFLAGS = -pthread
      
      all: Client Server
      
      Client: client.cpp
      	$(CC) $(CFLAGS) -o Client client.cpp $(LDFLAGS)
      
      Server: server.cpp
      	$(CC) $(CFLAGS) -o Server server.cpp $(LDFLAGS)
      
      clean:
      	rm -f Client Server server_config.txt
      
      .PHONY: all clean
      

      编译和运行的步骤:

      # 编译
      make
      
      # 运行服务器
      ./Server
      

      代码解析

      端口号传递机制

      服务器如何将端口号传递给客户端是本系统的一个关键点。整个过程如下:

      1. 服务器为每个客户端分配一个唯一的端口号(基础端口号 + 索引)
      2. 服务器通过fork()创建子进程
      3. 子进程通过execl()执行客户端程序,将端口号作为命令行参数传递
      4. 客户端程序解析命令行参数获取端口号
      5. 客户端使用该端口号创建和绑定Socket
      6. 服务器知道每个客户端的端口号,并使用这些端口号连接到客户端

      信号处理

      程序使用信号处理机制来实现优雅关闭:

      signal(SIGINT, signalHandler);
      signal(SIGTERM, signalHandler);
      

      这两行代码注册了信号处理函数,当程序接收到SIGINT(通常是按Ctrl+C)或SIGTERM(通常是系统发送的终止信号)时,会调用signalHandler函数。在这个函数中,程序会清理资源并正常退出。

      多线程连接

      服务器使用多线程来并行连接到所有客户端:

      std::vector<std::thread> connectionThreads;
      for (int i = 0; i < numClients; 编程i++) {
          if (clientPids[i] > 0) {
              int port = basePort + i;
              connectionThreads.push_back(std::thread(clientConnectionThread, port, i));
          }
      }
      

      这样可以同时尝试连接到所有客户端,而不是一个接一个地连接,提高了效率。

      条件变量同步

      服务器使用条件变量来等待所有客户端连接完成:

      {
          std::unique_lock<std::mutex> lock(mtx);
          if (!allClientsConnected) {
          javascript    std::cout << "Waiting for all clients to connect..." << std::endl;
              cv.wait(lock, []{ return allClientsConnected; });
          }
      }
      

      当所有客户端都连接成功后,allClientsConnected变量会被设置为true,条件变量会通知主线程继续执行。

      总结

      本文介绍了如何在linux环境下实现一个服务器程序,该程序能够启动多个客户端进程,并通过Socket与这些客户端进行通信。主要特点包括:

      1. 可配置的客户端数量
      2. 动态端口分配
      3. 并行连接
      4. 广播消息
      5. 确认机制
      6. 优雅关闭

      这个示例展示了多进程、Socket通信、多线程和同步机制的综合应用,可以作为网络编程的参考实现。

      进一步改进

      这个示例还可以进一步改进,例如:

      1. 添加错误恢复机制
      2. 实现客户端自动重连
      3. 添加消息队列
      4. 实现更复杂的通信协议
      5. 添加安全机制(如TLS加密)

      以上就是Linux环境下实现多进程Socket通信功能的详细内容,更多关于Linux多进程Socket通信的资料请关注编程客栈(www.devze.com)其它相关文章!

      0

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      关注公众号