1. Original demandIt is necessary to synchronize the original full data as well as the incremental data of specific tables in specific MySQL libraries in real time, and the corresponding modifications and deletions must also be synchronized. Data synchronization must not be intrusive: business procedures must not be changed, and there must not be too much performance pressure on the business side. Application scenarios: data ETL synchronization and reducing the pressure on business servers. 2. Solution 3. Canal introduction and installationCanal is an open source project under Alibaba, developed in pure Java. Based on database incremental log analysis, it provides incremental data subscription and consumption, and currently mainly supports MySQL (also supports mariaDB). Working principle: MySQL master-slave replication implementation From a high level perspective, replication is divided into three steps:
How canal worksThe principle is relatively simple:
Architectureillustrate:
instance module:
Install1. MySQL and Kafka environment preparation 2. Download canal: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz 3. Unzip: tar -zxvf canal.deployer-1.1.3.tar.gz 4. Configure file parameters in directory conf Configure canal.properties: Enter conf/example and configure instance.properties: 5. Start: bin/startup.sh 6. Log viewing: 4. Verification1. Develop the corresponding kafka consumer package org.kafka; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; /** * * Title: KafkaConsumerTest * Description: * kafka consumer demo * Version:1.0.0 * @author pancm * @date January 26, 2018 */ public class KafkaConsumerTest implements Runnable { private final KafkaConsumer<String, String> consumer; private ConsumerRecords<String, String> msgList; private final String topic; private static final String GROUPID = "groupA"; public KafkaConsumerTest(String topicName) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.7.193:9092"); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "latest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<String, String>(props); this.topic = topicName; this.consumer.subscribe(Arrays.asList(topic)); } @Override public void run() { int messageNo = 1; System.out.println("---------Start consumption---------"); try { for (; ; ) { msgList = consumer.poll(1000); if (null != msgList && msgList.count() > 0) { for (ConsumerRecord<String, String> record : msgList) { //Print after consuming 100 records, but the printed data may not follow this pattern System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset()); // String v = decodeUnicode(record.value()); // System.out.println(v); // Exit when 1000 messages are consumed if (messageNo % 1000 == 0) { break; } messageNo++; } } else { Thread.sleep(11); } } } catch (InterruptedException e) { e.printStackTrace(); finally consumer.close(); } } public static void main(String args[]) { KafkaConsumerTest test1 = new KafkaConsumerTest("sample-data"); Thread thread1 = new Thread(test1); thread1.start(); } /* * Convert Chinese to unicode*/ public static String gbEncoding(final String gbString) { char[] utfBytes = gbString.toCharArray(); String unicodeBytes = ""; for (int i = 0; i < utfBytes.length; i++) { String hexB = Integer.toHexString(utfBytes[i]); if (hexB.length() <= 2) { hexB = "00" + hexB; } unicodeBytes = unicodeBytes + "\\u" + hexB; } return unicodeBytes; } /* * unicode encoding to Chinese*/ public static String decodeUnicode(final String dataStr) { int start = 0; int end = 0; final StringBuffer buffer = new StringBuffer(); while (start > -1) { end = dataStr.indexOf("\\u", start + 2); String charStr = ""; if (end == -1) { charStr = dataStr.substring(start + 2, dataStr.length()); } else { charStr = dataStr.substring(start + 2, end); } char letter = (char) Integer.parseInt(charStr, 16); // Parse hexadecimal integer string. buffer.append(new Character(letter).toString()); start = end; } return buffer.toString(); } } 2. Add data to table bak1 CREATE TABLE `bak1` ( `vin` varchar(20) NOT NULL, `p1` double DEFAULT NULL, `p2` double DEFAULT NULL, `p3` double DEFAULT NULL, `p4` double DEFAULT NULL, `p5` double DEFAULT NULL, `p6` double DEFAULT NULL, `p7` double DEFAULT NULL, `p8` double DEFAULT NULL, `p9` double DEFAULT NULL, `p0` double DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 show create table bak1; insert into bak1 select '李雷abcv', `p1`, `p2`, `p3`, `p4`, `p5`, `p6`, `p7`, `p8`, `p9`, `p0` from moci limit 10 3. View the output results: This concludes this article about synchronizing the full and incremental data of a specific MySQL table to a message queue - solution. For more information about synchronizing data in a specific MySQL table, please search for previous articles on 123WORDPRESS.COM or continue browsing the following related articles. I hope you will support 123WORDPRESS.COM in the future! You may also be interested in:
|
<<: Bootstrap3.0 study notes table related
>>: A detailed introduction to setting up Jenkins on Tencent Cloud Server
Table of contents 1. Conditions for joint index f...
mktemp Create temporary files or directories in a...
The layui table has multiple rows of data. Throug...
Table of contents Deploy tomcat 1. Download and d...
Through permission-based email marketing, not onl...
HTML form tag tutorial, this section mainly expla...
In Linux systems, especially server systems, it i...
The following are its properties: direction Set th...
1 Problem Description Vue3's combined API can...
After solving the form auto-fill problem discussed...
Table of contents Uncontrolled components Control...
Or write down the installation process yourself! ...
Today I found a problem in HTML. There are many d...
Recently, we received a request for help from a c...
How does "adaptive web design" work? It’...