目录
- 一、为什么要做数据迁移
- 二. 数据迁移实战
- 1. 需求分析
- 2. python脚本
一、为什么要做数据迁移
mysql数据库迁移是指将MySQL数据库中的数据和结构迁移到另一个MySQL实例,或者从一个MySQL实例迁移到另一个数据库系统(如从MySQL迁移到mariadb,或者从本地MySQL迁移到云数据库)。其作用通常包括以下几个方面:
1. 提升性能与扩展性
- 硬件升级:随着业务的发展,原来的MySQL数据库可能因为硬件或配置限制无法满足需求。迁移到更强大的服务器或云平台可以提高性能,支持更高的并发访问。
- 分布式架构:对于大型应用,将数据迁移到分布式数据库架构中,能提高系统的扩展性和负载均衡能力。
2. 降低成本
- 从本地迁移到云:将数据库从本地环境迁移到云平台(如AWS RDS、Google Cloud SQL、阿里云等),可以节省硬件和运维成本,且云平台提供自动备份、自动扩展等功能。
- 选择更具成本效益的数据库实例:迁移到性价比更高的MySQL版本或实例,帮助企业节省开支。
3. 数据恢复和灾难恢复
数据迁移有时作为灾难恢复的一部分,帮助确保数据在主数据库不可用时能迅速恢复。例如,将数据从主数据中心迁移到备用数据中心,确保业务不中断。
4. 技术或版本更新
- 随着MySQL数据库技术的演进,新的版本(例如MySQL 8.0)提供了更强大的功能、性能优化和安全性。迁移到新版本有助于利用这些改进,例如更高效的查询执行、更强的加密功能以及更灵活的配置。
- 兼容性问题:数据库迁移有时是由于原有的MySQL版本或特性与新系统或需求不兼容,迁移到支持更好兼容性的新版本可以解决此问题。
5. 高可用性和负载均衡
- 迁移到支持高可用性(如主从复制、Galera Cluster等)和负载均衡的架构,可以提高数据库的容错能力和并发处理能力,减少单点故障。
- 可以通过迁移到支持集群的MySQL系统,配置多个主节点或从节点,分摊数据库的访问压力,保障服务的稳定性。
6. 数据备份和清理
- 在进行数据库迁移的过程中,往往也会对数据进行清理和备份,去除冗余、无效或过期的数据,并进行数据库的优化(如表结构调整、索引优化等)。这不仅能提高迁移后的数据质量,也能提升新数据库的性能。
7. 业务整合与统一
- 多数据库整合:一些公司可能存在多个MySQL实例,迁移数据到统一的数据库平台可以整合数据,简化管理和运维工作,提高数据的一致性和完整性。
- 合并系统:例如,企业收购了其他公司,原有的系统中有不同的MySQL数据库,迁移合并到统一的数据库平台,方便统一管理和分析。
二. 数据迁移实战
1. 需求分析
只筛选,源表中S_SMSSendLogs的字段RobotTaskId = 源表T_RobotTasks字段Id且字段TenantId = 17409350669509才需要迁移
2. Python脚本
import mysql.connector import json # 数据库连接配置 source_db_config = { 'host': 'localhost', 'port': 3306, 'user': 'CallSystemProd', 'password': '******', 'database': ''******',' # 源数据库名称 } target_db_config = { 'host': 'localhost', 'port': 3306, 'user': 'CallSystemTest', 'password': ''******',', 'database': ''******',' # 目标数据库名称 } # 连接到源数据库 try: source_conn = mysql.connector.connect(**source_db_config) source_cursor = source_conn.cursor(dictionary=True) print("成功连接到源数据库") except mysql.connector.Error as err: print(f"连接源数据编程库失败: {err}") exit() # 连接到目标数据库 try: target_conn = mysql.connector.connect(**target_db_config) target_cursor = target_conn.cursor() print("成功连接到目标数据库") except mysql.connector.Error as err: print(f"连接目标数据库失败: {err}") exit() # 获取源表数据(S_SMSSendLogs) source_cursor.execute(""" SELECT Id, SMSId, Phone, Status, IsSucceed, Trade_id, RequestStr, ResponseStr, RobotTaskId, ExtendCode, CreateTime, UpdateTime, CreateUserId, UpdateUserId, IsDelete, TaskCallResultId, ProviderType FROM S_SMSSendLogs """) columns = source_cursor.fetchall() # 获取源表数据(S_SMSTemplates),用于RequestStr中的替换 source_cursor.execute(""" SELECT SMSId, Content, Id FROM S_SMSTemplates """) templates = {template['SMSId']: template for template in source_cursor.fetchall()} # 获取与TaskCallRhttp://www.devze.comesultId相关的数据(t_taskcallresults和t_repeattaskcallresults) source_cursor.execute(""" SELECT id, CreateUserId, UpdateUserId, ResidentsId, CustomerName FROM T_TaskCallResults """) taskcallresults = {result['id']: result for result in source_cursor.fetchall()} source_cursor.execute(""" SELECT id, CreateUserId, UpdateUserId, ResidentsId, Chttp://www.devze.comustomerName FROM T_RepeatTaskCallResults """) repeattaskcallresults = {result['id']: result for result in source_cursor.fetchall()} # 获取RobotTaskId和OrgId相关的表数据(t_taskcallresults, t_robotrepeattasks, sysuser) source_cursor.execute(""" SELECT id, CreateUserId FROM T_TaskCallResults """) taskcallresults_user_map = {result['id']: result['CreateUserId'] for result in source_cursor.fetchall()} source_cursor.execute(""" SELECT id, CreateUserId FROM T_RobotRepeatTasks """) robotrepeattasks_user_map = {result['id']: result['CreateUserId'] for result in source_cursor.fetchall()} source_cursor.execute(""" SELECT Id, OrgId FROM SysUser """) sysuser_map = {user['Id']: user['OrgId'] for user in source_cursor.fetchall()} # 构造批量插入语句 insert_query = """ INSERT INTO S_SMSSendLogs (Id, SMSId, Phone, Status, IsSucceed, Trade_id, RequestStr, ResponseStr, RobotTaskId, ExtendCode, CreateTime, UpdateTime, CreateUserId, UpdateUserId, IsDelete, TaskCallResultId, ProviderType, OrgId, Content, CharCount, BillingCount, ResidentsId, CustomerName, TemplateId) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ # 构造插入的值(批量数据),应用转换规则 insert_values = [] for column in columns: # Status 字段直接迁移,无需转换 status = column['Status'] # 将 IsSucceed 转换为 tinyint类型的 0 is_succeed = 0 if column['IsSucceed'] == 1 else column['IsSucceed'] # RequestStr 替换操作 request_str = column['RequestStr'] template = templates.get(column['SMSId']) if template: template_content = template['Content'] try: # 获取 RequestStr 中的变量并替换 request_values = json.loads(request_str).get('arguments', {}) for key, value in request_values.items(): template_content = template_content.replace(f"#{key}#", 编程客栈str(value) if value is not None else "") except json.JSONDecodeError: pass # 如果 JSON 格式不正确,跳过替换 request_str = template_content # 计算字符数 char_count = len(request_str) # 计算计费条数 billing_count = (char_count + 66) // 67 # 每条超过67个字算第二条 # 获取相关的 UserId 和 OrgId create_user_id = taskcallresults.get(column['TaskCallResultId'], {}).get('CreateUserId') or \ repeattaskcallresults.get(column['TaskCallResultId'], {}).get('CreateUserId') update_user_id = taskcallresults.get(column['TaskCallResultId'], {}).get('UpdateUserId') or \ repeattaskcallresults.get(column['TaskCallResultId'], {}).get('UpdateUserId') # ProviderType 转换 provider_type = column['ProviderType'] if provider_type == 1: provider_type = 0 elif provider_type == 2: provider_type = 3 # OrgId 通过 CreateUserId 获取 create_user_id_for_org = taskcallresults_user_map.get(column['RobotTaskId']) or \ robotrepeattasks_user_map.get(column['RobotTaskId']) org_id = sysuser_map.get(create_user_id_for_org, None) # 构造数据插入 insert_values.append(( column['Id'], column['SMSId'], column['Phone'], status, is_succeed, column['Trade_id'], request_str, column['ResponseStr'], column['RobotTaskId'], column['ExtendCode'], column['CreateTime'], column['UpdateTime'], create_user_id, update_user_id, column['IsDelete'], column['TaskCallResultId'], provider_type, org_id, request_str, char_count, billing_count, taskcallresults.get(column['TaskCallResultId'], {}).get('ResidentsId'), taskcallresults.get(column['TaskCallResultId'], {}).get('CustomerName'), template['Id'] if template else None )) # 执行批量插入 target_cursor.executemany(insert_query, insert_values) target_conn.androidcommit() print(f"成功迁移了 {len(insert_values)} 条数据") # 关闭数据库连接 source_cursor.close() source_conn.close() target_cursor.close() target_conn.close()
以上就是Python脚本实现Mysql数据迁移的详细内容,更多关于Mysql数据迁移的资料请关注编程客栈(www.devze.com)其它相关文章!
精彩评论