开发者

Python一次性将指定版本所有包上传PyPI镜像解决方案

开发者 https://www.devze.com 2025-09-15 09:30 出处:网络 作者: 东方佑
目录为什么需要这个方案完整解决方案1. 项目目录结构2. 创建智能下载脚本3. 创建包清单生成脚本4. 创建 docker Compose 配置docker-compose.yml5. 创建 Web 服务目录结构6. 创建准备脚本完整操作流程1. 在可联网的环
目录
  • 为什么需要这个方案
  • 完整解决方案
    • 1. 项目目录结构
    • 2. 创建智能下载脚本
    • 3. 创建包清单生成脚本
    • 4. 创建 docker Compose 配置docker-compose.yml
    • 5. 创建 Web 服务目录结构
    • 6. 创建准备脚本
  • 完整操作流程
    • 1. 在可联网的环境中准备镜像
    • 2. 验证下载结果
    • 3. 将整个目录复制到内网环境
    • 4. 在内网环境中部署
    • 5. 内网客户端使用
  • 方案优势
    • 注意事项

      根据您的需求和知识库中的警告信息(清华镜像会阻断大量下载行为),我设计了一个安全、完整、可离线部署的解决方案,用于一次性准备指定python版本的所有包,然后导出到内网环境。

      为什么需要这个方案

      知识库明确显示:清华镜像会检测到大量下载行为并阻断请求。直接尝试下载所有包会触发此限制,导致下载失败。我们需要一个谨慎、分阶段、有间隔的下载策略。

      完整解决方案

      1. 项目目录结构

      mkdir -p pypi-offline/{config,scripts,packages}
      cd pypi-offline
      

      2. 创建智能下载脚本

      scripts/safe_download.py

      #!/usr/bin/env python3
      import os
      import sys
      import time
      import random
      import requests
      import argparse
      from pathlib import Path
      from concurrent.futures import ThreadPoolExecutor, as_completed
      
      # 配置参数
      MAX_RETRIES = 3
      MIN_DELAY = 5  # 最小延迟(秒)
      MAX_DELAY = 15  # 最大延迟(秒)
      MAX_WORKERS = 2  # 最大并发数(避免触发限制)
      
      # PyPI API端点
      PYPI_SIMPLE = "https://pypi.org/simple"
      # 使用官方PyPI而非清华镜像,避免触发限制
      # 如果网络条件好,可考虑使用其他镜像
      
      def get_package_info(package_name):
          """获取包的元信息"""
          url = f"https://pypi.org/pypi/{package_name}/json"
          for i in range(MAX_RETRIES):
              try:
                  response = requests.get(url, timeout=30)
                  if response.status_code == 200:
                      return response.json()
                  elif response.status_code == 404:
                      return None
                  time.sleep((i + 1) * 2)
              except Exception as e:
                  print(f"获取包 {package_name} 信息失败: {str(e)}")
                  time.sleep((i + 1) * 5)
          return None
      
      def filter_packages_for_python(package_data, python_version):
          """过滤出兼容指定Python版本的包"""
          compatible_files = []
          py_ver = python_version.replace('.', '')
          
          for file_info in package_data['urls']:
              # 检查Python版本兼容性
              py_tag = file_info.get('python_version', '')
              if py_tag == 'source' or py_tag.startswith('py') or py_tag.startswith('cp' + py_ver):
                  compatible_files.append(file_info)
          
          return compatible_files
      
      def download_package(package_name, file_info, target_dir):
          """安全下载单个包文件"""
          file_url = file_info['url']
          file_name = file_info['filename']
          target_path = os.path.join(target_dir, file_name)
          
          # 如果文件已存在,跳过
          if os.path.exists(target_path):
              print(f"跳过已存在的文件: {file_name}")
              return True
          
          print(f"下载: {file_name} ({packahttp://www.devze.comge_name})")
          
          for i in range(MAX_RETRIES):
              try:
                  response = requests.get(file_url, stream=True, timeout=60)
                  if 编程客栈response.status_code == 200:
                      with open(target_path, 'wb') as f:
                          for chunk in response.iter_content(chunk_size=8192):
                              f.write(chunk)
                      print(f"成功下载: {file_name}")
                      return True
                  print(f"下载失败 ({response.status_code}): {file_name}")
                  time.sleep((i + 1) * 5)
              except Exception as e:
                  print(f"下载 {file_name} 失败: {str(e)}")
                  time.sleep((i + 1) * 5)
          
          # 下载失败,删除可能的部分文件
          if os.path.exists(target_path):
              os.remove(target_path)
          return False
      
      def get_all_packages():
          """获取所有包的列表"""
          print("获取所有包的列表...")
          response = requests.get(f"{PYPI_SIMPLE}/", timeout=30)
          if response.status_code != 200:
              raise Exception(f"无法获取包列表: HTTP {response.status_code}")
          
          # 解析html获取包名
          import re
          package_names = re.findall(r'<a href="/simple/([^" rel="external nofollow" /]+)">\1</a>', response.text)
          return package_names
      
      def main():
          parser = argparse.ArgumentParser(description='安全下载指定Python版本的所有包')
          parser.add_argument('--python-version', required=True, help='目标Python版本 (如: 3.8)')
          parser.add_argument('--output-dir', default='../packages', help='输出目录')
          args = parser.parse_args()
          
          # 创建输出目录
          output_dir = Path(args.output_dir)
          output_dir.mkdir(parents=True, exist_ok=True)
          print(f"输出目录: {output_dir}")
          
          # 获取所有包列表
          try:
              all_packages = get_all_packages()
              print(f"找到 {len(all_packages)} 个包")
          except Exception as e:
              print(f"获取包列表失败: {str(e)}")
              print("请尝试使用清华镜像的简单页面 API (需要处理 HTML)")
              # 作为备选方案,可以使用清华镜像的简单页面
              # all_packages = get_packages_from_tuna()
              return 1
          
          # 处理包
          success_count = 0
          failed_packages = []
          
          # 分批次处理,避免一次性太多请求
          BATch_size = 50
          for i in range(0, len(all_packages), batch_size):
              batch = all_packages[i:i+batch_size]
              print(f"\n处理包批次 {i//batch_size + 1} ({len(batch)} 个包)")
              
              with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
                  future_to_pkg = {}
                  
                  for pkg in batch:
                      pkg_data = get_package_info(pkg)
                      if pkg_data:
                          compatible_files = filter_packages_for_python(pkg_data, args.python_version)
                          if compatible_files:
                              for file_info in compatible_files:
                                  future = executor.submit(
                                      download_package, 
                                      pkg, 
                                      file_info, 
                                      output_dir
                                  )
                                  future_to_pkg[future] = pkg
                      # 添加随机延迟,避免触发限制
                      time.sleep(random.uniform(MIN_DELAY, MAX_DELAY))
                  
                  # 等待并处理结果
                  for future in as_completed(future_to_pkg):
                      pkg = future_to_pkg[future]
                      try:
                          if future.result():
                              success_count += 1
                          else:
                              failed_packages.append(pkg)
                      except Exception as e:
                          print(f"处理包 {pkg} 时出错: {str(e)}")
                          failed_packages.append(pkg)
              
              # 批次间额外延迟
              print(f"\n批次处理完成,等待 {MAX_DELAY*2} 秒...")
              time.sleep(MAX_DELAY * 2)
          
          # 生成报告
          print("\n===== 下载完成 =====")
          print(f"成功: {success_count} 个包")
          print(f"失败: {len(failed_packages)} 个包")
          
          if failed_packages:
              print("\n失败的包列表 (可后续重试):")
              for pkg in failed_www.devze.compackages[:20]:  # 只显示前20个
                  print(f"- {pkg}")
              if len(failed_packages) > 20:
                  print(f"... 及其他 {len(failed_packages) - 20} 个包")
              
              # 保存失败列表以便重试
              with open(output_dir / 'failed_packages.txt', 'w') as f:
                  for pkg in failed_packages:
        编程客栈              f.write(pkg + '\n')
          
          return 0
      
      if __name__ == "__main__":
          sys.exit(main())
      

      3. 创建包清单生成脚本

      scripts/generate_simple_index.py

      #!/usr/bin/env python3
      import os
      import sys
      from pathlib import Path
      import re
      import html
      
      def generate_simple_index(packages_dir, output_dir):
          """生成符合PEP 503的simple index"""
          packages_dir = Path(packages_dir)
          output_dir = Path(output_dir)
          
          # 创建输出目录
          output_dir.mkdir(parents=True, exist_ok=True)
          
          # 收集所有包名
          package_names = set()
          for filename in os.listdir(packages_dir):
              # 从文件名提取包名 (PEP 427 格式)
              match = re.match(r'^([a-zA-Z0-9_-]+)(?:-[a-zA-Z0-9_.+-]+)?\.(?:whl|tar\.gz|zip)$', filename)
              if match:
                  package_name = match.group(1).lower()
                  # 规范化包名 (PEP 503)
                  package_name = package_name.replace('_', '-')
                  package_names.add(package_name)
          
          # 为每个包生成索引页面
          for package in package_names:
              index_content = f'<html><head><title>Links for {package}</title></head>\n'
              index_content += f'<body>\n<h1>Links for {package}</h1>\n'
              
              # 找出该包的所有文件
              for filename in os.listdir(packages_dir):
           http://www.devze.com       if re.match(f'^{re.escape(package)}(?:-[a-zA-Z0-9_.+-]+)?\.(?:whl|tar\.gz|zip)$', filename, re.IGNORECASE):
                      file_url = f'../{filename}'
                      index_content += f'<a href="{file_url}" rel="external nofollow" >{html.escape(filename)}</a>
      \n'
              
              index_content += '</body></html>'
              
              # 保存索引页面
              package_dir = output_dir / package
              package_dir.mkdir(exist_ok=True)
              with open(package_dir / 'index.html', 'w', encoding='utf-8') as f:
                  f.write(index_content)
          
          # 生成根索引页面
          root_index = '<html><head><title>Simple Index</title></head>\n'
          root_index += '<body>\n<h1>Simple Index</h1>\n'
          
          for package in sorted(package_names):
              root_index += f'<a href="{package}/" rel="external nofollow" >{html.escape(package)}</a>
      \n'
          
          root_index += '</body></html>'
          
          with open(output_dir / 'index.html', 'w', encoding='utf-8') as f:
              f.write(root_index)
          
          print(f"成功生成 {len(package_names)} 个包的索引")
      
      if __name__ == "__main__":
          if len(sys.argv) != 3:
              print("用法: python generate_simple_index.py <packages_dir> <output_dir>")
              sys.exit(1)
          
          generate_simple_index(sys.argv[1], sys.argv[2])
      

      4. 创建 Docker Compose 配置docker-compose.yml

      version: '3.8'
      
      services:
        pypi-offline:
          image: python:3.9-slim
          container_name: pypi-offline
          ports:
            - "8080:8000"
          volumes:
            - ./web:/usr/src/app
            - ./packages:/usr/src/app/packages
          working_dir: /usr/src/app
          command: >
            sh -c "python -m http.server 8000 --directory /usr/src/app"
          restart: unless-stopped
      

      5. 创建 Web 服务目录结构

      mkdir -p web/{simple,packages}
      

      6. 创建准备脚本

      prepare_offline_mirror.sh

      #!/bin/bash
      # 准备离线PyPI镜像
      
      PYTHON_VERSION=$1
      if [ -z "$PYTHON_VERSION" ]; then
          echo "用法: $0 <python版本>"
          echo "示例: $0 3.8"
          exit 1
      fi
      
      # 检查依赖
      if ! command -v python3 &> /dev/null; then
          echo "错误: 需要安装Python 3"
          exit 1
      fi
      
      # 1. 安全下载所有兼容指定Python版本的包
      echo "步骤1: 安全下载所有兼容Python $PYTHON_VERSION 的包"
      python3 scripts/safe_download.py --python-version "$PYTHON_VERSION" --output-dir packages
      
      # 2. 生成simple index
      echo "步骤2: 生成PEP 503兼容的simple index"
      python3 scripts/generate_simple_index.py packages web/simple
      
      # 3. 创建README
      cat > web/README.md << EOF
      # 离线PyPI镜像
      
      此目录包含Python $PYTHON_VERSION 的完整PyPI镜像。
      
      ## 使用方法
      
      1. 启动服务:
         docker-compose up -d
      
      2. 客户端使用:
         pip install 包名 -i http://your-server:8080/simple
      EOF
      
      echo "离线PyPI镜像准备完成!"
      echo "现在可以将整个pypi-offline目录复制到内网环境"
      echo "在内网环境中执行: docker-compose up -d 启动服务"
      

      完整操作流程

      1. 在可联网的环境中准备镜像

      # 克隆项目
      git clone https://github.com/your-repo/pypi-offline.git
      cd pypi-offline
      
      # 赋予脚本执行权限
      chmod +x scripts/*.py prepare_offline_mirror.sh
      
      # 准备Python 3.8的完整镜像 (这将需要较长时间)
      ./prepare_offline_mirror.sh 3.8
      
      # 准备Python 3.9的完整镜像 (可选)
      # ./prepare_offline_mirror.sh 3.9
      

      2. 验证下载结果

      # 检查包数量
      ls packages | wc -l
      
      # 检查simple index
      ls web/simple | wc -l
      

      3. 将整个目录复制到内网环境

      # 压缩整个目录
      tar -czvf pypi-offline-3.8.tar.gz pypi-offline
      
      # 将压缩文件传输到内网环境
      

      4. 在内网环境中部署

      # 解压
      tar -xzvf pypi-offline-3.8.tar.gz
      cd pypi-offline
      
      # 启动服务
      docker-compose up -d
      

      5. 内网客户端使用

      # 临时使用
      pip install numpy -i http://pypi-offline-server:8080/simple
      
      # 永久配置
      pip config set global.index-url http://pypi-offline-server:8080/simple
      

      方案优势

      1.避免触发下载限制:

      • 使用低并发(2个线程)
      • 随机延迟(5-15秒)
      • 批次处理(每50个包)
      • 失败重试机制
      • 使用官方PyPI而非清华镜像(更宽松)

      2.完整镜像:

      • 下载指定Python版本的所有兼容包
      • 生成符合PEP 503标准的simple index
      • 支持所有pip安装方式

      3.轻量级部署:

      • 仅需Python内置HTTP服务器
      • 无需额外依赖
      • 完全静态内容,易于复制

      4.可验证性:

      • 生成下载报告
      • 记录失败包以便重试
      • 简单的README说明

      注意事项

      • 下载时间: 完整镜像需要较长时间(可能数天),因为要遵守严格的下载限制
      • 存储空间: Python 3.8的完整镜像约需50-100GB空间
      • 分阶段下载: 如果中途失败,可从failed_packages.txt继续
      • 版本选择: 建议选择企业常用的稳定版本(如3.8或3.9)

      此方案确保您可以安全地一次性准备完整镜像,然后完全离线使用,完美符合您的内网部署需求。

      以上就是Python一次性将指定版本所有包上传PyPI镜像解决方案的详细内容,更多关于Python打包到PyPI的资料请关注编程客栈(www.devze.com)其它相关文章!

      0

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      关注公众号