开发者

MySQL和Elasticsearch数据同步方案详解

开发者 https://www.devze.com 2025-10-25 09:03 出处:网络 作者: 匆匆忙忙游刃有余
目录一、同步方案对比二、详细实现方案1. 同步双写2. 异步双写3. Canal + 消息队列 方案3.1 环境准备3.2 自定义消费者实现三、数据一致性保障策略1. 幂等性设计2. 重试机制3. 全量校验与修复四、性能优化策略1. ES 批
目录
  • 一、同步方案对比
  • 二、详细实现方案
    • 1. 同步双写
    • 2. 异步双写
  • 3. Canal + 消息队列 方案
    • 3.1 环境准备
    • 3.2 自定义消费者实现
  • 三、数据一致性保障策略
    • 1. 幂等性设计
    • 2. 重试机制
    • 3. 全量校验与修复
  • 四、性能优化策略
    • 1. ES 批量写入
    • 2. 优化 Canal 配置
    • 3. mysql binlog 优化
  • 五、最佳实践建议
    • 1. 方案选型建议
    • 2. 监控与告警
    • 3. 数据同步异常处理流程
  • 六、总结

    在现代电商系统中,MySQL 作为关系型数据库负责数据的持久化存储,而 Elasticsearch 则作为搜索引擎提供高效的全文检索能力。保证两者之间的数据一致性是系统设计的关键挑战。本文将详细介绍主流的同步方案、实现方式及其优缺点。

    一、同步方案对比

    同步方案实时性实现复杂度一致性保证性能影响适用场景
    同步双写极高简单强一致性金融交易、核心订单
    异步双写较高中等最终一致性电商商品、用户信息
    Canal + MQ较高最终一致性大规模数据同步
    Logstash 定时简单弱一致性报表、分析数据
    Debezium最终一致性复杂数据同步

    二、详细实现方案

    1. 同步双写

    原理:在业务代码中同时写入 MySQL 和 Elasticsearch,确保两者数据同步更新。

    实现代码

    @Service
    @Transactional
    public class ProductServiceImpl implements ProductService {
        
        @Autowired
        private ProductMapper productMapper;
        
        @Autowired
        private ElasticsearchClient esClient;
        
        @Override
        public void createProduct(Product product) {
            // 1. 写入 MySQL
            productMapper.insert(product);
            
            // 2. 同步写入 ES
            try {
                ProductIndex productIndex = convertToIndex(product);
                IndexRequest<ProductIndex> request = IndexRequest.of(i -> i
                    .index("product_index")
                    .id(product.getId().toString())
                    .document(productIndex)
                );
                esClient.index(request);
            } catch (Exception e) {
                // 处理 ES 写入失败的情况
                log.error("ES同步失败: {}", e.getMessage());
                // 可以选择抛出异常回滚事务,或者记录失败日志后续补偿
                throw new RuntimeException("数据同步失败", e);
            }
        }
        
        @Override
        public void updateProduct(Product product) {
            // 类似 createProduct,同时更新 MySQL 和 ES
            productMapper.updateById(product);
            // ES 更新逻辑...
        }
        
        private ProductIndex convertToIndex(Product product) {
            // 实体转换逻辑
            ProductIndex index = new ProductIndex();
            index.setId(product.getId());
            index.setTitle(product.getTitle());
            index.setPrice(product.getPrice());
            // 其他字段转换...
            return index;
        }
    }
    

    优缺点

    • 优点:实现简单,数据一致性强,实时性最高
    • 缺点
      • 代码耦合度高,业务逻辑与数据同步混合
      • ES 写入延迟影响主业务性能
      • 故障处理复杂,需考虑回滚机制

    2. 异步双写

    原理:通过消息队列解耦,业务代码只负责写 MySQL,然后发送消息到 MQ,由消费者异步更新 ES。

    实现代码

    // 生产者端
    @Service
    public class ProductServiceImpl implements ProductService {
        
        @Autowired
        private ProductMapper productMapper;
        
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        @Override
        @Transactional
        public void createProduct(Product product) {
            // 1. 写入 MySQL
            productMapper.insert(product);
            
            // 2. 发送消息到队列
            ProductEvent event = new ProductEvent();
            event.setType("CREATE");
            event.setProductId(product.getId());
            rabbitTemplate.convertAndSend("product-event-exchange", "product.create", event);
        }
    }
    
    // 消费者端
    @Component
    public class ProductSyncConsumer {
        
        @Autowired
        private ProductMapper productMapper;
        
        @Autowired
        private ElasticsearchClient esClient;
        
        @RabbitListener(queues = "product-sync-queue")
        public void handleProductEvent(ProductEvent event) {
            try {
                Product product = productMapper.selectById(event.getProductId());
                if (product == null) {
                    // 删除 ES 中的数据
                    deleteFromEs(event.getProductId());
                    return;
                }
                
                // 同步到 ES
                ProductIndex index = convertToIndex(product);
                IndexRequest<ProductIndex> request = IndexRequest.of(i -> i
                    .index("product_index")
                    .id(product.getId().toString())
                    .document(index)
                );
                esClient.index(request);
                
                log.info("产品 {} 同步到 ES 成功", event.getProductId());
            } catch (Exception e) {
                log.error("同步 ES 失败: {}", e.getMessage());
                // 可以根据需要进行重试或记录到死信队列
            }
        }
    }
    

    优缺点

    • 优点
      • 解耦业务与同步逻辑
      • 消息队列提供削峰填谷能力
      • ES 故障不影响主业务流程
    • 缺点
      • 存在短暂的数据不一致
      • 增加了系统复杂度
      • 需要处理消息丢失、重复消费等问题

    3. Canal + 消息队列 方案

    原理:利用 Canal 监听 MySQL 的 binlog,解析数据变更并发送到消息队列,再由消费者同步到 ES。

    3.1 环境准备

    MySQL 配置

    # 开启 binlog
    log-bin=mysql-bin
    # 选择 ROW 模式
    binlog-format=ROW
    # 服务器唯一ID
    server-id=1
    # 开启 binlog 实时更新
    sync_binlog=1
    

    Canal Server 配置

    # canal-server/conf/example/instance.properties
    canal.instance.master.address=127.0.0.1:3306
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    canal.instance.connectionCharset=UTF-8
    canal.instance.filter.regex=.*\..*
    

    Canal Adapter 配置

    # canal-adapter/conf/application.yml
    server:
      port: 8081
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    
    canal.conf:
      mode: kafka
      zookeeperHosts:
      syncBATchSize: 1000
      retries: 0
      timeout:
      AccessKey:
      secretKey:
      consumerProperties:
        kafka.bootstrap.servers: 127.0.0.1:9092
        kafka.enable.auto.commit: false
        kafka.auto.commit.interval.ms: 1000
        kafka.auto.offset.reset: latest
        kafka.request.timeout.ms: 40000
        kafka.session.timeout.ms: 30000
        kafka.isolation.level: read_committed
        kafka.max.poll.records: 1000
      srcDataSources:
        defaultDS:
          url: jdbc:mysql://127.0.0.1:3306/shop?useUnicode=true
          username: root
          password: root
      canalAdapters:
      - instance: example  # canal instance Name or mq topic name
        groups:
        - groupId: g1
          outerAdapters:
          - name: es
            hosts: 127.0.0.1:9200
            properties:
              mode: rest
              cluster.name: elasticsearch
    

    表映射配置

    # canal-adapter/conf/es/mytest_user.yml
    dataSourceKey: defaultDS
    destination: example
    groupId:
    topic: example
    database: shop
    table: tb_product
    esMapping:
      _index: product_index
      _type: _doc
      _id: _id
      upsert: true
      sql: |
        select 
          p.id as _id, 
          p.title, 
          p.sub_title as subTitle, 
          p.price, 
          p.sales, 
          c.name as categoryName 
        from tb_product p 
        left join tb_category c on p.cid1 = c.id
      commitBatch: 3000
    

    3.2 自定义消费者实现

    如果需要更灵活的处理,可以自定义 Kafka 消费者:

    @Component
    public class ProductSyncConsumer {
        
        @Autowired
        private ElasticsearchClient esClient;
        
        @Kawww.devze.comfkaListener(topics = "example")
        public void processMessage(String message) {
            try {
                // 解析 Canal 消息
                CanalMessage canalMsg = jsON.parseobject(message, CanalMessage.class);
                
                for (CanalData data : canalMsg.getData()) {
                    // 根据操作类型处理
                    switch (canalMsg.getEventType()) {
                        case INSERT:
                        case UPDATE:
                            syncToEs(data);
                            break;
                        case DELETE:
                            deleteFromEs(data);
                            break;
                    }
                }
            } catch (Exception e) {
                log.error("处理 Canal 消息失败: {}", e.getMessage());
            }
        }
        
        private void syncToEs(CanalData data) throws IOException {
            // 构建 ES 文档
            ProductIndex index = new ProductIndex();
            index.setId(Long.valueOf(data.get("id").toString()));
            index.setTitle(data.get("title").toString());
            // 其他字段映射...
            
            // 写入 ES
            esClient.index(i -> i
                .index("product_index")
                .id(index.getId().toString())
                .document(index)
            );
        }
    }
    

    优缺点

    • 优点
      • 完全解耦,对业务代码零侵入
      • 高性能,只同步变更数据
      • 支持全量和增量同步
      • 可靠性高,基于 binlog 保证不丢失
    • 缺点
      • 部署复杂度高
      • 配置相对复杂
      • 对 MySQL binlog 有依赖

    三、数据一致性保障策略

    1. 幂等性设计

    确保重复同步不会导致数据异常:

    // ES 操作幂等性实现
    public void syncProduct(Long productId) {
        // 使用文档ID作为唯一标识
        IndexRequest<ProductIndex> request = IndexRequest.of(i -> i
            .index("product_index")
            .id(productId.toString())
            .document(buildProductIndex(productId))
            // 设置乐观锁版本控制
            .versionType(VersionType.EXTERNAL)
            .version(getCurrentVersion(productId))
        );
        
        try {
            esClient.index(request);
        } catch (VersionConflictException e) {
            // 版本冲突,需要重新获取最新数据
            log.warn("版本冲突,重新同步: {}", productId);
            // 重试逻辑...
        }
    }
    

    2. 重试机制

    @Service
    public class EsSyncService {
        
        @Autowired
        private ElasticsearchClient esClient;
        
        @Autowired
        private RedisTemplate redisTemplate;
        
        // 最大重试次数
        private static final int MAX_RETRY_COUNT = 3;
        
        public void syncWithRetry(ProductIndex index) {
            String key = "es:retry:" + index.getId();
            
            for (int i = 0; i < MAX_RETRY_COUNT; i++) {
                try {
                    esClient.index(req -> req
                        .index("product_index")
                        .id(index.getId().toString())
                        .document(index)
                    );
                    // 成功后删除重试标记
                    redisTemplate.delete(key);
                    return;
                } catch (Exception e) {
                    log.error("第{}次同步失败: {}", i+1, e.getMessage());
                    
                    if (i == MAX_RETRY_COUNT - 1) {
                        // 达到最大重试次数,记录失败任务
                        redisTemplate.opsForValue().set(key, JSON.toJSONString(index), 7, TimeUnit.DAYS);
                        log.error("同步失败,已记录到失败队列: {}", index.getId());
                    } else {
               php         // 指数退避重试
                        try {
                            Thread.sleep((long) (Math.pow(2, i) * 1000));
                        } catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            }
        }
        
        // 定时任务处理失败的同步任务
        @Scheduled(cron = "0 0/5 * * * ?")
        public void processFailedTasks() {
            Set<String> keys = redisTemplate.keys("es:retry:*");
            if (keys != null) {
                for (String key : keys) {
                    String json = (String) redisTemplate.opsForValue().get(key);
                    ProductIndex index = JSON.parseObject(json, ProductIndex.class);
                    // 重新尝试同步
                    syncWithRetry(index);
                }
            }
        }
    }
    

    3. 全量校验与修复

    定期全量对比 MySQL 和 ES 数据,修复不一致:

    @Service
    public class DataConsistencyService {
        
        @Autowired
        private ProductMapper productMapper;
        
        @Autowired
        private ElasticsearchClient esClient;
        
        @Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
        public void checkAndRepair() {
            log.info("开始数据一致性校验");
            
            // 分页查询 MySQL 数据
            int page = 1;
            int pageSize = 1000;
            
            while (true) {
                Page<Product> productPage = productMapper.selectPage(
                    new Page<>(page, pageSize), null);
                
                for (Product product : productPage.getRecords()) {
                    try {
                        // 查询 ES 数据
                        GetResponse<ProductIndex> response = esClient.get(req -> req
                            .index("product_index")
        编程客栈                    .id(product.getId().toString()),
                            ProductIndex.class
                        );
                        
                        if (!response.found()) {
                            // ES 中不存在,需要插入
                            syncToEs(product);
                            log.warn("修复缺失数据: {}", product.getId());
                        } else {
                            // 对比数据是否一致
                            ProductIndex esData = response.source();
                            if (!isConsistent(product, esData)) {
                                // 数据不一致,更新 ES
                                syncToEs(product);
                                log.warn("修复不一致数据: {}", product.getId());
                            }
                        }
                    } catch (Exception e) {
                        log.error("校验商品 {} 失败: {}", product.getId(), e.getMessage());
                    }
                }
     javascript           
                if (productPage.hasNext()) {
                    page++;
                } else {
                    break;
                }
            }
            
            log.info("数据一致性校验完成");
        }
        
        private boolean isConsistent(Product mysql, ProductIndex es) {
            // 比较关键字段
            return Objects.equals(mysql.getTitle(), es.getTitle()) &&
                   Objects.equals(mysql.getPrice(), es.getPrice()) &&
                   Objects.equals(mysql.getSales(), es.getSales());
        }
    }
    

    四、性能优化策略

    1. ES 批量写入

    public void batchSyncToEs(List<Product> products) {
        if (CollectionUtils.isEmpty(products)) {
            return;
        }
        
        try {
            List<BulkOperation> operations = new ArrayList<>();
            
            for (Product product : products) {
                ProductIndex index = convertToIndex(product);
                operations.add(BulkOperation.of(op -> op
                    .index(idx -> idx
                        .index("product_index")
                        .id(product.getId().toString())
                        .document(index)
                    )
                ));
            }
            
            BulkRequest request = BulkRequest.of(req -> req.operations(operations));
            BulkResponse response = esClient.bulk(request);
            
            if (response.errors()) {
                // 处理错误
                for (BulkResponseItem item : response.items()) {
                    if (item.error() != null) {
                        log.error("批量同步失败: {} - {}", 
                                  item.id(), item.error().reason());
                    }
                }
            }
        } catch (Exception e) {
            log.error("批量同步异常: {}", e.getMessage());
        }
    }
    

    2. 优化 Canal 配置

    # 增加批处理大小
    syncBatchSize = 2000
    
    # 优化网络参数
    tcp.so.sndbuf = 1048576
    tcp.so.rcvbuf = 1048576
    
    # 调整消费线程数
    canal.instance.parser.parallel = true
    canal.instance.parser.parallelThreadSize = 8
    

    3. MySQL binlog 优化

    # 增加 binlog 大小限制
    binlog-file-size = 1G
    
    # 优化 binlog 刷新策略
    sync_binlog = 1
    innodb_flush_log_at_trx_commit = 1
    
    # 调整 binlog 保留时间
    expire_logs_days = 7
    

    五、最佳实践建议

    1. 方案选型建议

    • 小型系统/快速迭代:异步双写(MQ)
    • 大型系统/高可靠:Canal + MQ
    • 实时性要求极高:同步双写(权衡性能影响)
    • 历史数据迁移:Logstash 或 Canal 全量同步

    2. 监控与告警

    @Service
    public class SyncMonitorService {
        
        @Autowired
        private RedisTemplate redisTemplate;
        
        // 记录同步时间戳
        public void recordSyncTimestamp(String tableName, Long id) {
            String key = "sync:timestamp:" + tableName + ":" + id;
            redisTemplate.opsForValue().set(key, System.currentTimeMillis(), 24, TimeUnit.HOURS);
        }
        
        // 检查同步延迟
        @Scheduled(fixedRate = 60000)
        public void checkSyncDelay() {
            // 查询最近5分钟内更新的数据
            List<Product> recentProducts = productMapper.selectRecentUpdated(5);
            
            for (Product product : recentProducts) {
                String key = "sync:timestamp:tb_product:" + product.getId();
                Long syncTime = (Long) redisTemplate.opsForValue().get(key);
                
                if (syncTime == null) {编程客栈
                    // 未同步
                    sendAlarm("数据未同步", product.getId());
                } else {
                    long delay = System.currentTimeMillis() - syncTime;
                    if (delay > 300000) { // 5分钟
                        // 同步延迟过大
                        sendAlarm("数据同步延迟:" + (delay/1000) + "秒", product.getId());
                    }
                }
            }
        }
        
        private void sendAlarm(String message, Long productId) {
            // 发送告警(邮件、短信、钉钉等)
            log.error("告警: {} - 商品ID: {}", message, productId);
            // 实际告警逻辑...
        }
    }
    

    3. 数据同步异常处理流程

    1. 重试机制:指数退避策略,避免立即重试造成雪崩
    2. 死信队列:记录无法通过重试解决的异常
    3. 手动干预:提供管理界面手动触发同步
    4. 数据校验:定期全量比对,发现并修复不一致

    六、总结

    MySQL 和 Elasticsearch 数据同步是电商系统中的关键技术挑战。选择合适的同步方案需要综合考虑实时性要求、系统复杂度、团队技术栈等因素。在实际项目中,推荐采用 Canal + 消息队列 的方案,它提供了良好的实时性、可靠性和扩展性,同时对业务代码零侵入。

    无论选择哪种方案,都需要特别关注数据一致性保障、异常处理、性能优化和监控告警等方面,确保系统在生产环境中的稳定运行。

    到此这篇关于MySQL和Elasticsearch数据同步方案详解的文章就介绍到这了,更多相关MySQL和Elasticsearch数据同步内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    关注公众号