开发者

Jmeter并发执行Python 脚本的完整流程

开发者 https://www.devze.com 2022-11-30 13:38 出处:网络 作者: AirPython
目录1.前言2.Python实现文件上传2-1获取文件信息及切片数目2-2切片及分段上传2-3合并文件2-4文件路径参数化3.Jmeter并发执行4.最后本篇文章以文件上传为例,聊聊Jme...
目录
  • 1. 前言
  • 2. python 实现文件上传
    • 2-1获取文件信息及切片数目
    • 2-2切片及分段上传
    • 2-3合并文件
    • 2-4文件路径参数化
  • 3. Jmeter 并发执行
    • 4. 最后

      本篇文章以文件上传为例,聊聊 Jmeter 并发执行 Python 脚本的完整流程

      1. 前言

      大家好,我是安果!

      最近有小伙伴后台给我留言,说自己用 Django 写了一个大文件上传的 Api 接口,现在想本地检验一下接口并发的稳定性,问我有没有好的方案

      本篇文章以文件上传为例,聊聊Jmeter 并发执行 Python 脚本的完整流程

      2. Python 实现文件上传

      大文件上传包含 3 个步骤,分别是:

      • 获取文件信息及切片数目
      • 分段切片,并上传- API
      • 文件合并- API
      • 文件路径参数化

      2-1获取文件信息及切片数目

      首先,获取文件的大小

      然后,利用预设的切片大小获取分段总数

      最后,获取文件名及 md5 值

      import os
      import math
      import hashlib
      
      def get_file_md5(self, file_path):
          """获取文件的md5值"""
          with open(file_path, 'rb') as f:
               data = f.read()
               return hashlib.md5(data).hexdigest()
      
      def get_filename(self, filepath):
          """获取文件原始名称"""
          # 文件名带后缀
          filename_with_suffix = os.path.basename(filepath)
          # 文件名
          filename = filename_with_suffix.split('.')[0]
          # 后缀名
          suffix = filename_with_suffix.split('.')[-1]
          return filename_with_suffix, filename, suffix
      
      def get_chunk_info(self, file_path):
          """获取分段信息"""
          # 获取文件总大小(字节)
          file_total_size = os.path.getsize(file_path)
          print(file_total_size)
      
          # 分段总数
          total_chunks_num = math.ceil(file_total_size / self.chunk_size)
          # 文件名(带后缀)
          filename = self.get_filename(file_path)[0]
          # 文件的md5值
          file_md5 = self.get_file_md5(file_path)
          return file_total_size, total_chunks_num, filename, file_md5

      2-2切片及分段上传

      利用分段总数和分段大小,对文件进行切片,调用分段文件上传接口

      import requests
      
      def do_chunk_and_upload(self, file_path):
      ​    """将文件分段处理,并上传"""
          file_total_size, total_chunks_num, filename, file_md5 = self.get_chunk_info(file_path)
      
          # 遍历
          for index in range(total_chunks_num):
              print('第{}次文件上传'.format(index + 1))
              if index + 1 == total_chunks_num:
                  partSize = file_total_size % chunk_size
              else:
                  partSize = chunk_size
      
              # 文件偏移量
              offset = index * chunk_size
      
              # 生成分片id,从1开始
              chunk_id = index + 1
      
              print('开始准备上传文件')
              print("分片id:", chunk_id, "文件偏移量:", offset, ",当前分片大小:", partSize, )
      
              # 分段上传文件
              self.__upload(offset, chunk_id, file_path, file_md5, filename, partSize, total_chunks_num)
      
      def __upload(self, offset, chunk_id, file_path, file_md5, filename, partSize, total):
          """分次上传文件"""
          url = 'http://**/file/brust/upload'
       www.cppcns.com   params = {'chunk': chunk_id,
                      'fileMD5': file_md5,
                      'fileName': filename,
                      'partSize': partSize,
                      'total': total
                      }
          # 根据文件路径及偏移量,读取文件二进制数据
          current_file = open(file_path, 'rb')
          current_file.seek(offset)
      
          files = {'filwww.cppcns.come': current_file.read(partSize)}
          resp = requests.post(url, params=params, files=files).text
          print(resp)

      2-3合并文件

      最后调用合并文件的接口,将分段小文件合成大文件

      def merge_file(self, filepath):
              """合并"""
              url = 'http://**/file/brust/merge'
              file_total_size, total_chunks_num, filename, file_md5 = self.get_chunk_info(filepath)
      ​        payload = json.dumps(
                  {
                      "fileMD5": file_md5,
                      "chunkTotal": total_chunks_num,
                      "fileName": filename
                  }
              )
              print(payload)
              headers = {
                  "Content-Type": "application/json"
              }
              resp = requests.post(url, headers=headers, data=payload).text
              print(resp)

      2-4文件路径参数化

      为了并发执行,将文件上传路径参数化

      # fileupload.py
      ...
      if __name__ == '__main__':
          filSAXPoepath = sys.argv[1]
      
          # 每一段切片的大小(MB)
          chunk_size = 2 * 1024 * 1024
      
      编程客栈    fileApi = FileApi(chunk_size)
          # 分段上传
          fileApi.do_chunk_and_upload(filepath)
      
          # 合并
          fileApi.merge_file(filepath)

      3. Jmeter 并发执行

      在使用 Jmeter 创建并发流程前,我们需要编写批处理脚本

      其中,执行批处理脚本时,需要跟上文件路径一起执行

      # cmd.bat
      
      @echo off
      set filepath=%1
      
      python  C:\Users\xingag\Desktop\rpc_demo\fileupload.py %*

      然后,在本地新建一个 CSV 文件,写入多个文件路径

      # 准备多个文件路径(csv)

      C:\\Users\\xingag\\Desktop\\charles-proxy-4.6.1-win64.msi

      C:\\Users\\xingag\\Desktop\\V2.0.pdf

      C:\\Users\\xingag\\Desktop\\HBuilder1.zip

      C:\\Users\\xingag\\Desktop\\HBuilder2.zip

      接着,就可以使用 Jmeter 创建并发流程了

      完整步骤如下:

      • 创建一个测试计划,下面添加一个线程组

      这里线程组数目与上面文件数目保持一致即可

      • 线程组下,添加「同步定时器」

      同步定时器中的「模拟用户组的数量」和上面参数数量保持一致

      • 添加 CSV 数据文件设置

      指向上面准备的 csv 数据文件,设置文件格式为 UTF-8,变量名称设置为file_path,最后将线程共享模式设置为「当前线程组」

      • 添加调试取样器,方便调试
      • 添加 OS 进程取样器

      选择上面创建的批处理文件,命令行参数设置为「${file_path}」

      • 添加查看结果数

      4. 最后

      运行上面创建的 Jmeter 并发流程,在结果数中可以查看并发上传文件的结果

      当然,我们可以增加并发数量去模拟真实的使用场景,只需要修改 CSV 数据源及 Jmeter 参数即可

      到此这篇关于Jmeter并发执行 PythonSAXPo 脚本的问题详解的文章就介绍到这了,更多相关Jmeter并发执行 Python 脚本内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

      0

      精彩评论

      暂无评论...
      验证码 换一张
      取 消