1. Original demandIt is necessary to synchronize the original full data as well as the incremental data of specific tables in specific MySQL libraries in real time, and the corresponding modifications and deletions must also be synchronized. Data synchronization must not be intrusive: business procedures must not be changed, and there must not be too much performance pressure on the business side. Application scenarios: data ETL synchronization and reducing the pressure on business servers. 2. Solution 3. Canal introduction and installationCanal is an open source project under Alibaba, developed in pure Java. Based on database incremental log analysis, it provides incremental data subscription and consumption, and currently mainly supports MySQL (also supports mariaDB). Working principle: MySQL master-slave replication implementation From a high level perspective, replication is divided into three steps:
How canal worksThe principle is relatively simple:
Architectureillustrate:
instance module:
Install1. MySQL and Kafka environment preparation 2. Download canal: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz 3. Unzip: tar -zxvf canal.deployer-1.1.3.tar.gz 4. Configure file parameters in directory conf Configure canal.properties: Enter conf/example and configure instance.properties: 5. Start: bin/startup.sh 6. Log viewing: 4. Verification1. Develop the corresponding kafka consumer package org.kafka; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; /** * * Title: KafkaConsumerTest * Description: * kafka consumer demo * Version:1.0.0 * @author pancm * @date January 26, 2018 */ public class KafkaConsumerTest implements Runnable { private final KafkaConsumer<String, String> consumer; private ConsumerRecords<String, String> msgList; private final String topic; private static final String GROUPID = "groupA"; public KafkaConsumerTest(String topicName) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.7.193:9092"); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "latest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<String, String>(props); this.topic = topicName; this.consumer.subscribe(Arrays.asList(topic)); } @Override public void run() { int messageNo = 1; System.out.println("---------Start consumption---------"); try { for (; ; ) { msgList = consumer.poll(1000); if (null != msgList && msgList.count() > 0) { for (ConsumerRecord<String, String> record : msgList) { //Print after consuming 100 records, but the printed data may not follow this pattern System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset()); // String v = decodeUnicode(record.value()); // System.out.println(v); // Exit when 1000 messages are consumed if (messageNo % 1000 == 0) { break; } messageNo++; } } else { Thread.sleep(11); } } } catch (InterruptedException e) { e.printStackTrace(); finally consumer.close(); } } public static void main(String args[]) { KafkaConsumerTest test1 = new KafkaConsumerTest("sample-data"); Thread thread1 = new Thread(test1); thread1.start(); } /* * Convert Chinese to unicode*/ public static String gbEncoding(final String gbString) { char[] utfBytes = gbString.toCharArray(); String unicodeBytes = ""; for (int i = 0; i < utfBytes.length; i++) { String hexB = Integer.toHexString(utfBytes[i]); if (hexB.length() <= 2) { hexB = "00" + hexB; } unicodeBytes = unicodeBytes + "\\u" + hexB; } return unicodeBytes; } /* * unicode encoding to Chinese*/ public static String decodeUnicode(final String dataStr) { int start = 0; int end = 0; final StringBuffer buffer = new StringBuffer(); while (start > -1) { end = dataStr.indexOf("\\u", start + 2); String charStr = ""; if (end == -1) { charStr = dataStr.substring(start + 2, dataStr.length()); } else { charStr = dataStr.substring(start + 2, end); } char letter = (char) Integer.parseInt(charStr, 16); // Parse hexadecimal integer string. buffer.append(new Character(letter).toString()); start = end; } return buffer.toString(); } } 2. Add data to table bak1 CREATE TABLE `bak1` ( `vin` varchar(20) NOT NULL, `p1` double DEFAULT NULL, `p2` double DEFAULT NULL, `p3` double DEFAULT NULL, `p4` double DEFAULT NULL, `p5` double DEFAULT NULL, `p6` double DEFAULT NULL, `p7` double DEFAULT NULL, `p8` double DEFAULT NULL, `p9` double DEFAULT NULL, `p0` double DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 show create table bak1; insert into bak1 select '李雷abcv', `p1`, `p2`, `p3`, `p4`, `p5`, `p6`, `p7`, `p8`, `p9`, `p0` from moci limit 10 3. View the output results: This concludes this article about synchronizing the full and incremental data of a specific MySQL table to a message queue - solution. For more information about synchronizing data in a specific MySQL table, please search for previous articles on 123WORDPRESS.COM or continue browsing the following related articles. I hope you will support 123WORDPRESS.COM in the future! You may also be interested in:
|
<<: Bootstrap3.0 study notes table related
>>: A detailed introduction to setting up Jenkins on Tencent Cloud Server
1. Write a backup script rem auther:www.yumi-info...
Today, when I searched for a page on Baidu, becaus...
Copy code The code is as follows: <html> &l...
Table of contents Introduction to the Decorator P...
Table of contents Preface 1. Startup management b...
Table of contents Foreign Key How to determine ta...
Currently, Docker has an official mirror for Chin...
Table of contents Preface 1. Brief Analysis of th...
MySQL 5.7.8 introduced the json field. This type ...
Docker official documentation: https://docs.docke...
<br />For some time, I found that many peopl...
The color matching in website construction is ver...
The tutorial for installing OpenStack Ussuri with...
Preface I have been summarizing my front-end know...
Background description: On an existing load balan...