1. 首页
  2. 技术知识

MySQL特定表全量、增量数据同步到消息队列-解决方案

目录

    1、原始需求2、解决方案3、cаnal介绍、安装

      cаnal的工作原理架构安装

    4、验证

1、原始需求

既要同步原始全量数据,也要实时同步MySQL特定库的特定表增量数据,同时对应的修改、删除也要对应。

数据同步不能有侵入性:不能更改业务程序,并且不能对业务侧有太大性能压力。

应用场景:数据ETL同步、降低业务服务器压力。

2、解决方案


3、cаnal介绍、安装

cаnal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。

工作原理:mysql主备复制实现

从上层来看,复制分成三步:

    master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);slave将master的binary log events拷贝到它的中继日志(relay log);slave重做中继日志中的事件,将改变反映它自己的数据。


cаnal的工作原理

原理相对比较简单:

    cаnal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议mysql master收到dump请求,开始Xbinary log给slave(也就是cаnal)cаnal解析binary log对象(原始为byte流)


架构

说明:

    server代表一个cаnal运行实例,对应于一个jvminstance对应于一个数据队列 (1个server对应1..n个instance)

instance模块:

    eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)eventStore (数据存储)metaManager (增量订阅&消费信息管理器)


安装

1、mysql、kafka环境准备

2、cаnal下载:wget https://github.com/alibaba/cаnal/releases/download/cаnal-1.1.3/cаnal.deployer-1.1.3.tar.gz

3、解压:tar -zxvf cаnal.deployer-1.1.3.tar.gz

4、对目录conf里文件参数配置

对cаnal.properties配置:

进入conf/example里,对instance.properties配置:

5、启动:bin/startup.sh

6、日志查看:

4、验证

1、开发对应的kafka消费者

package org.kafka;

import java.util.Arrays;

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.common.serialization.StringDeserializer;

/**

*

* Title: KafkaConsumerTest

* Description:

*  kafka消费者 demo

* Version:1.0.0

* @author pancm

* @date 2022年1月26日

*/

public class KafkaConsumerTest implements Runnable {

    private final KafkaConsumer<String, String> consumer;

    private ConsumerRecords<String, String> msgList;

    private final String topic;

    private static final String GROUPID = “groupA”;

    public KafkaConsumerTest(String topicName) {

        Properties props = new Properties();

        props.put(“bootstrap.servers”, “192.168.7.193:9092”);

        props.put(“group.id”, GROUPID);

        props.put(“enable.auto.commit”, “true”);

        props.put(“auto.commit.interval.ms”, “1000”);

        props.put(“session.timeout.ms”, “30000”);

        props.put(“auto.offset.reset”, “latest”);

        props.put(“key.deserializer”, StringDeserializer.class.getName());

        props.put(“value.deserializer”, StringDeserializer.class.getName());

        this.consumer = new KafkaConsumer<String, String>(props);

        this.topic = topicName;

        this.consumer.subscribe(Arrays.asList(topic));

    }

    @Override

    public void run() {

        int messageNo = 1;

        System.out.println(“———开始消费———“);

        try {

            for (; ; ) {

                msgList = consumer.poll(1000);

                if (null != msgList && msgList.count() > 0) {

                    for (ConsumerRecord<String, String> record : msgList) {

                        //消费100条就打印 ,但打印的数据不一定是这个规律的

                            System.out.println(messageNo + “=======receive: key = ” + record.key() + “, value = ” + record.value() + ” offset===” + record.offset());

//                            String v = decodeUnicode(record.value());

//                            System.out.println(v);

                        //当消费了1000条就退出

                        if (messageNo % 1000 == 0) {

                            break;

                        }

                        messageNo++;

                    }

                } else {

                    Thread.sleep(11);

                }

            }

        } catch (InterruptedException e) {

            e.printStackTrace();

        } finally {

            consumer.close();

        }

    }

    public static void main(String args[]) {

        KafkaConsumerTest test1 = new KafkaConsumerTest(“sample-data”);

        Thread thread1 = new Thread(test1);

        thread1.start();

    }

    /*

     * 中文转unicode编码

     */

    public static String gbEncoding(final String gbString) {

        char[] utfBytes = gbString.toCharArray();

        String unicodeBytes = “”;

        for (int i = 0; i < utfBytes.length; i++) {

            String hexB = Integer.toHexString(utfBytes
);
            if (hexB.length() <= 2) {
                hexB = “00” + hexB;
            }
            unicodeBytes = unicodeBytes + “\\u” + hexB;
        }
        return unicodeBytes;
    }

    /*
     * unicode编码转中文
     */
    public static String decodeUnicode(final String dataStr) {
        int start = 0;
        int end = 0;
        final StringBuffer buffer = new StringBuffer();
        while (start > -1) {
            end = dataStr.indexOf(“\\u”, start + 2);
            String charStr = “”;
            if (end == -1) {
                charStr = dataStr.substring(start + 2, dataStr.length());
            } else {
                charStr = dataStr.substring(start + 2, end);
            }
            char letter = (char) Integer.parseInt(charStr, 16); // 16进制parse整形字符串。
            buffer.APPend(new Character(letter).toString());
            start = end;
        }
        return buffer.toString();

    }
}2、对表bak1进行增加数据

CREATE TABLE `bak1` (
  `vin` varchar(20) NOT NULL,
  `p1` double DEFAULT NULL,
  `p2` double DEFAULT NULL,
  `p3` double DEFAULT NULL,
  `p4` double DEFAULT NULL,
  `p5` double DEFAULT NULL,
  `p6` double DEFAULT NULL,
  `p7` double DEFAULT NULL,
  `p8` double DEFAULT NULL,
  `p9` double DEFAULT NULL,
  `p0` double DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

show create table bak1;

insert into bak1 select ‘李雷abcv’,
  `p1` ,
  `p2` ,
  `p3` ,
  `p4` ,
  `p5` ,
  `p6` ,
  `p7` ,
  `p8` ,
  `p9` ,
  `p0`  from moci limit 103、查看输出结果:

到此这篇关于MySQL特定表全量、增量数据同步到消息队列-解决方案的文章就介绍到这了,更多相关MySQL特定表数据同步内容请搜索共生网络以前的文章或继续浏览下面的相关文章希望大家以后多多支持共生网络!

原创文章,作者:starterknow,如若转载,请注明出处:https://www.starterknow.com/117840.html

联系我们