目录
- 引言
- 使用pathlib库操作文件
- 基本路径操作
- 文件重命名与删除
- 文件查找与遍历
- 文件读写操作
- 使用zipfile、tarfile压缩解压文件
- ZIP文件操作
- TAR文件操作
- 实际应用场景
- 场景一:日志文件自动归档
- 场景二:批量文件格式转换
引言
在日常工作中,文件操作是最基础也是最常见的任务之一。无论是批量处理数据文件,还是整理工作文档,掌握高效的文件读写技巧都能极大提升工作效率。本文将介绍几种实用的python文件操作方法,帮助你轻松应对各种文件处理需求。
使用pathlib库操作文件
传统的文件路径处理往往依赖于os和os.path模块,代码繁琐且平台兼容性差。而Python 3.4引入的pathlib库提供了面向对象的文件系统路径处理方式,使代码更简洁、更易读。
基本路径操作
from pathlib import Path # 创建路径对象 file_path = Path('工作报告.docx') project_dir = Path('/Users/sunlei/projects') # 路径拼接(无需担心斜杠问题) doc_path = project_dir / 'documents' / file_path print(doc_path) # 输出: /Users/sunlei/projects/documents/工作报告.docx # 获取路径信息 print(doc_path.name) # 输出: 工作报告.docx print(doc_path.suffix) # 输出: .docx print(doc_path.stem) # 输出: 工作报告 print(doc_path.parent) # 输出: /Users/sunlei/projects/documents
文件重命名与删除
对文件进行重命名是常见的需求,通过pathlib库的Path.rename方法可以轻松实现对某文件的重命名操作。
from pathlib import Path import datetime # 获取当前日期 today = datetime.date.today().strftime('%Y%m%d') # 批量重命名文件(添加日期前缀) def rename_with_date(directory, pattern='*.txt'): dir_path = Path(directory) for file_path in dir_path.glob(pattern): # 构建新文件名 new_name = f"{today}_{file_path.name}" new_path = file_path.with_name(new_name) # 执行重命名 file_path.rename(new_path) print(f"已重命名: {file_path.name} -> {new_name}") # 使用示例 rename_with_date('./reports')
Path.unlink方法等价于os.remove方法,用于删除已存在的文件;Path.rmdir方法等价于os.rmdir方法,用于删除空的目录,如果目录非空,该方法会抛出异常。
# 安全删除文件 def safe_delete(file_path): path = Path(file_path) if path.exists(): if path.is_file(): path.unlink() print(f"已删除文件: {path}") elif path.is_dir() and not any(path.iterdir()): path.rmdir() print(f"已删除空目录: {path}") else: print(f"目录非空,无法删除: {path}") else: print(f"路径不存在: {path}")
文件查找与遍历
使用的listdir()函数返回的只是文件和子文件夹的名称,而pathlib的glob()函数返回的则是文件和子文件夹的完整路径对象,更加方便操作。
# 查找所有Excel文件并按修改时间排序 def find_excel_files(directory): dir_path = Path(directory) excel_files = list(dir_path.glob('**/*.xlsx')) + list(dir_path.glob('**/*.xls')) # 按修改时间排序 excel_files.sort(key=lambda x: x.stat().st_mtime, reverse=True) print(f"找到 {len(excel_files)} 个Excel文件:") for file in excel_files[:5]: # 只显示前5个 mod_time = datetime.datetime.fromtimestamp(file.stat().st_mtime) print(f"{file.name} - 修改时间: {mod_time.strftime('%Y-%m-%d %H:%M')}") return excel_files
文件读写操作
pathlib还提供了简便的文件读写方法,无需传统的open()函数:
# 读取文本文件 def read_log_file(log_path): path = Path(log_path) if path.exists() and path.is_file(): # 直接读取文本内容 content = path.read_text(encoding='utf-8') lines = content.split('\n') print(f"日志共 {len(lines)} 行") # 查找错误信息 error_lines = [line for line in lines if 'ERROR' in line] if error_lines: javascript print(f"发现 {len(error_lines)} 条错误记录:") 编程客栈 for line in error_lines[:3]: # 只显示前3条 print(f"- {line}") else: print(f"日志文件不存在: {path}") # 写入文本文件 def append_to_log(log_path, message): path = Path(log_path) timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_entry = f"[{timestamp}] {message}\n" # 追加内容到文件 path.write_text(log_entry, encoding='utf-8') if not path.exists() else path.open('a', encoding='utf-8').write(log_entry) print(f"日志已更新: {path}")
使用zipfile、tarfile压缩解压文件
在处理大量文件时,压缩和解压是常见的需求。Python中提供了zipfile与tarfile内置库来分别实现对两种常见压缩文件格式的操作。
ZIP文件操作
import zipfile from pathlib import Path import os # 创建ZIP压缩文件 def create_zip_archive(directory, zip_name=None): dir_path = Path(directory) # 如果没有指定压缩包名称,使用目录名 if zip_name is None: zip_name = f"{dir_path.name}.zip" zip_path = dir_path.parent / zip_name # 创建压缩文件 with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: # 遍历目录下所有文件 for file_path in dir_path.rglob('*'): if file_path.is_file(): # 计算相对路径作为压缩包内路径 rel_path = file_path.relative_to(dir_path.parent) zipf.write(file_path, rel_path) print(f"已添加: {rel_path}") print(f"压缩完成: {zip_path},大小: {zip_path.stat().st_size / 1024:.2f} KB") return zip_path # 解压ZIP文件 def extract_zip_archive(zip_path, extract_to=None): zip_path = Path(zip_path) # 如果没有指定解压目录,使用当前目录 if extract_to is None: extract_to = zip_path.parent / zip_path.stem extract_path = Path(extract_to) extract_path.mkdir(exist_ok=True) with zipfile.ZipFile(zip_path, 'r') as zipf: # 获取压缩包内文件列表 file_list = zipf.namelist() print(f"压缩包内共 {len(file_list)} 个文件") # 解压所有文件 zipf.extractall(extract_path) print(f"解压完成: {extract_path}") return extract_path
TAR文件操作
import tarfile from pathlib import Path # 创建TAR压缩文件 def create_tar_archive(directory, tar_name=None, compression='gz'): dir_path = Path(directory) # 如果没有指定压缩包名称,使用目录名 if tar_name is None: tar_name = f"{dir_path.name}.tar.{compression}" tar_path = dir_path.parent / tar_name # 设置压缩模式 mode = f"w:{compression}" if compression else "w" # 创建压缩文件 with tarfile.open(tar_path, mode) as tarf: www.devze.com# 添加整个目录 tarf.add(dir_path, arcname=dir_path.name) print(f"已添加目录: {dir_path}") print(f"压缩完成: {tar_path},大小: {tar_path.stat().st_size / 1024:.2f} KB") return tar_path # 解压TAR文件 def extract_tar_archive(tar_path, extract_to=None): tar_path = Path(tar_path) # 如果没有指定解压目录,使用当前目录 if extract_to is None: extract_to = tar_path.parent extract_path = Path(extract_to) extract_path.mkdir(exist_ok=True) # 自动检测压缩格式 with tarfile.open(tar_path, 'r:*') as tarf: # 获取压缩包内文件列表 file_list = tarf.getnames() print(f"压缩包内共 {len(file_list)} 个文件/目录") # 解压所有文件 tarf.extractall(extract_path) print(f"解压完成: {extract_path}") return extract_path
实际应用场景
场景一:日志文件自动归档
from pathlib import Path import zipfile import datetime import shutil def archive_logs(log_dir, days_to_keep=30): """自动归档超过指定天数的日志文件""" log_path = Path(log_dir) today = datetime.datetime.now() archive_dir = log_path / 'archives' archive_dir.mkdir(exist_ok=True) # 获取所有日志文件 log_files = list(log_path.glob('*.log')) archived_count = 0 for log_file in log_files: # 获取文件修改时间 mtime = datetime.datetime.fromtimestamp(log_file.stat().st_mtime) days_old = (today - mtime).days # 如果文件超过保留天数,进行归档 if days_old > days_to_keep: # 创建年月子目录 year_month = mtime.sthttp://www.devze.comrftime('%Y-%m') month_dir = archive_dir / year_month month_dir.mkdir(exist_ok=True) # 创建压缩文件 zip_name = f"{log_file.stem}_{mhttp://www.devze.comtime.strftime('%Y%m%d')}.zip" zip_path = month_dir / zip_name with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: zipf.write(log_file, log_file.name) # 删除原日志文件 log_file.unlink() archived_count += 1 print(f"已归档: {log_file.name} -> {zip_path}") print(f"归档完成,共处理 {archived_count} 个日志文件") return archived_count
场景二:批量文件格式转换
from pathlib import Path import csv import json def convert_csv_to_json(csv_dir, output_dir=None): """批量将CSV文件转换为JSON格式""" csv_path = Path(csv_dir) # 如果没有指定输出目录,在原目录创建json子目录 if output_dir is None: output_dir = csv_path / 'json_output' output_path = Path(output_dir) output_path.mkdir(exist_ok=True) # 获取所有CSV文件 csv_files = list(csv_path.glob('*.csv')) converted_count = 0 for csv_file in csv_files: # 读取CSV文件 data = [] try: with open(csv_file, 'r', encoding='utf-8', newline='') as f: reader = csv.DictReader(f) for row in reader: data.append(row) # 创建对应的JSON文件 json_file = output_path / f"{csv_file.stem}.json" with open(json_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) converted_count += 1 print(f"已转换: {csv_file.name} -> {json_file.name}") except Exception as e: print(f"转换失败: {csv_file.name} - {str(e)}") print(f"转换完成,共处理 {converted_count} 个CSV文件") return converted_count
通过这些实用的代码示例,你可以轻松实现各种文件操作自动化,大幅提高工作效率。无论是日常的文件整理,还是批量的数据处理,这些技巧都能帮你节省大量时间。
以上就是使用Python实现读写文件自动化的几种方法的详细内容,更多关于Python读写文件自动化的资料请关注编程客栈(www.devze.com)其它相关文章!
精彩评论