Synchronize the full and incremental data of a specific MySQL table to the message queue - Solution

Synchronize the full and incremental data of a specific MySQL table to the message queue - Solution

1. Original demand

It is necessary to synchronize the original full data as well as the incremental data of specific tables in specific MySQL libraries in real time, and the corresponding modifications and deletions must also be synchronized.

Data synchronization must not be intrusive: business procedures must not be changed, and there must not be too much performance pressure on the business side.

Application scenarios: data ETL synchronization and reducing the pressure on business servers.

2. Solution

3. Canal introduction and installation

Canal is an open source project under Alibaba, developed in pure Java. Based on database incremental log analysis, it provides incremental data subscription and consumption, and currently mainly supports MySQL (also supports mariaDB).

Working principle: MySQL master-slave replication implementation

From a high level perspective, replication is divided into three steps:

  1. The master records the changes in the binary log (these records are called binary log events and can be viewed with show binlog events).
  2. The slave copies the master's binary log events to its relay log;
  3. The slave redoes the events in the relay log, changing the data to reflect its own.

How canal works

The principle is relatively simple:

  1. Canal simulates the interactive protocol of MySQL slave, pretends to be MySQL slave, and sends dump protocol to MySQL master
  2. MySQL master receives the dump request and starts pushing binary log to slave (canal)
  3. Canal parses binary log objects (originally byte streams)

Architecture

illustrate:

  • Server represents a canal running instance, corresponding to a jvm
  • An instance corresponds to a data queue (one server corresponds to 1..n instances)

instance module:

  • eventParser (data source access, simulate slave protocol and master interaction, protocol parsing)
  • eventSink (Parser and Store connector, performs data filtering, processing, and distribution)
  • eventStore (data storage)
  • metaManager (incremental subscription & consumption information manager)

Install

1. MySQL and Kafka environment preparation

2. Download canal: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

3. Unzip: tar -zxvf canal.deployer-1.1.3.tar.gz

4. Configure file parameters in directory conf

Configure canal.properties:

Enter conf/example and configure instance.properties:

5. Start: bin/startup.sh

6. Log viewing:

4. Verification

1. Develop the corresponding kafka consumer

package org.kafka;

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;


/**
 *
 * Title: KafkaConsumerTest
 * Description:
 * kafka consumer demo
 * Version:1.0.0
 * @author pancm
 * @date January 26, 2018 */
public class KafkaConsumerTest implements Runnable {

    private final KafkaConsumer<String, String> consumer;
    private ConsumerRecords<String, String> msgList;
    private final String topic;
    private static final String GROUPID = "groupA";

    public KafkaConsumerTest(String topicName) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.7.193:9092");
        props.put("group.id", GROUPID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<String, String>(props);
        this.topic = topicName;
        this.consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
        int messageNo = 1;
        System.out.println("---------Start consumption---------");
        try {
            for (; ; ) {
                msgList = consumer.poll(1000);
                if (null != msgList && msgList.count() > 0) {
                    for (ConsumerRecord<String, String> record : msgList) {
                        //Print after consuming 100 records, but the printed data may not follow this pattern System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());


// String v = decodeUnicode(record.value());

// System.out.println(v);

                        // Exit when 1000 messages are consumed if (messageNo % 1000 == 0) {
                            break;
                        }
                        messageNo++;
                    }
                } else {
                    Thread.sleep(11);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        finally
            consumer.close();
        }
    }

    public static void main(String args[]) {
        KafkaConsumerTest test1 = new KafkaConsumerTest("sample-data");
        Thread thread1 = new Thread(test1);
        thread1.start();
    }


    /*
     * Convert Chinese to unicode*/
    public static String gbEncoding(final String gbString) {
        char[] utfBytes = gbString.toCharArray();
        String unicodeBytes = "";
        for (int i = 0; i < utfBytes.length; i++) {
            String hexB = Integer.toHexString(utfBytes[i]);
            if (hexB.length() <= 2) {
                hexB = "00" + hexB;
            }
            unicodeBytes = unicodeBytes + "\\u" + hexB;
        }
        return unicodeBytes;
    }

    /*
     * unicode encoding to Chinese*/
    public static String decodeUnicode(final String dataStr) {
        int start = 0;
        int end = 0;
        final StringBuffer buffer = new StringBuffer();
        while (start > -1) {
            end = dataStr.indexOf("\\u", start + 2);
            String charStr = "";
            if (end == -1) {
                charStr = dataStr.substring(start + 2, dataStr.length());
            } else {
                charStr = dataStr.substring(start + 2, end);
            }
            char letter = (char) Integer.parseInt(charStr, 16); // Parse hexadecimal integer string.
            buffer.append(new Character(letter).toString());
            start = end;
        }
        return buffer.toString();

    }
}

2. Add data to table bak1

CREATE TABLE `bak1` (
  `vin` varchar(20) NOT NULL,
  `p1` double DEFAULT NULL,
  `p2` double DEFAULT NULL,
  `p3` double DEFAULT NULL,
  `p4` double DEFAULT NULL,
  `p5` double DEFAULT NULL,
  `p6` double DEFAULT NULL,
  `p7` double DEFAULT NULL,
  `p8` double DEFAULT NULL,
  `p9` double DEFAULT NULL,
  `p0` double DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

show create table bak1;

insert into bak1 select '李雷abcv',
  `p1`,
  `p2`,
  `p3`,
  `p4`,
  `p5`,
  `p6`,
  `p7`,
  `p8`,
  `p9`,
  `p0` from moci limit 10

3. View the output results:

This concludes this article about synchronizing the full and incremental data of a specific MySQL table to a message queue - solution. For more information about synchronizing data in a specific MySQL table, please search for previous articles on 123WORDPRESS.COM or continue browsing the following related articles. I hope you will support 123WORDPRESS.COM in the future!

You may also be interested in:
  • Detailed explanation of how to synchronize data from MySQL to Elasticsearch
  • Tutorial on how to synchronize MySQL data to ElasticSearch using Python
  • Steps to synchronize MongoDB data to MySQL using node.js
  • MySQL5.6 master-slave replication (mysql data synchronization configuration)
  • Detailed explanation of reducing MySQL master-slave data synchronization delay
  • mysql trigger to synchronize data between two tables
  • Summary of solutions to the problem of Slave_IO_Running:No in MySQL data synchronization
  • MySQL backup and migration data synchronization method
  • MYSQL5 masterslave data synchronization configuration method
  • How to synchronize Mysql data

<<:  Bootstrap3.0 study notes table related

>>:  A detailed introduction to setting up Jenkins on Tencent Cloud Server

Recommend

MySQL joint index effective conditions and index invalid conditions

Table of contents 1. Conditions for joint index f...

Detailed explanation of mktemp, a basic Linux command

mktemp Create temporary files or directories in a...

Suggestions on creating business HTML emails

Through permission-based email marketing, not onl...

HTML form tag tutorial (3): input tag

HTML form tag tutorial, this section mainly expla...

HTML Marquee character fragment scrolling

The following are its properties: direction Set th...

WebStorm cannot correctly identify the solution of Vue3 combined API

1 Problem Description Vue3's combined API can...

Text mode in IE! Introduction to the role of DOCTYPE

After solving the form auto-fill problem discussed...

A brief analysis of controlled and uncontrolled components in React

Table of contents Uncontrolled components Control...

Detailed explanation of the process of installing msf on Linux system

Or write down the installation process yourself! ...

Introducing multiple custom fonts in CSS3

Today I found a problem in HTML. There are many d...

A record of a Linux server intrusion emergency response (summary)

Recently, we received a request for help from a c...