background In data warehouse modeling, the original business layer data that has not been processed in any way is called ODS (Operational Data Store) data. In Internet companies, common ODS data includes business log data (Log) and business DB data (DB). For business DB data, collecting business data from relational databases such as MySQL and then importing it into Hive is an important part of data warehouse production. How to synchronize MySQL data to Hive accurately and efficiently? The commonly used solution is to retrieve and load data in batches: directly connect to MySQL to select data from the table, then save it to a local file as intermediate storage, and finally load the file into the Hive table. The advantage of this solution is that it is easy to implement, but as the business develops, its disadvantages are gradually exposed:
In order to completely solve these problems, we gradually turned to the CDC (Change Data Capture) + Merge technical solution, that is, a solution of real-time Binlog collection + offline processing of Binlog to restore business data. Binlog is the binary log of MySQL, which records all data changes in MySQL. The master-slave synchronization of the MySQL cluster itself is based on Binlog. This article mainly introduces how to accurately and efficiently enter DB data into the data warehouse from two aspects: real-time collection of Binlog and offline processing of Binlog to restore business data. Overall architecture The overall architecture is shown in the figure above. In terms of real-time Binlog collection, we adopted Alibaba's open source project Canal, which is responsible for pulling Binlog from MySQL in real time and completing appropriate parsing. After Binlog is collected, it will be temporarily stored in Kafka for downstream consumption. The overall real-time acquisition part is shown by the red arrow in the figure. Offline Binlog processing, as shown by the black arrow in the figure, restores a MySQL table on Hive through the following steps:
Let's look back at the various problems encountered by the batch data retrieval and loading solution introduced in the background. Why can this solution solve the above problems?
Binlog real-time collection The real-time collection of Binlog includes two main modules: one is CanalManager, which is mainly responsible for the allocation of collection tasks, monitoring alarms, metadata management, and docking with external dependent systems; the other is Canal and CanalClient, which actually execute the collection tasks. When a user submits a request for collecting Binlogs for a certain DB, CanalManager will first call the relevant interface of the DBA platform to obtain relevant information about the MySQL instance where the DB is located, in order to select the machine that is most suitable for Binlog collection. Then distribute the collection instance (Canal Instance) to the appropriate Canal server, namely CanalServer. When selecting a specific CanalServer, CanalManager will consider factors such as load balancing and cross-data center transmission, and give priority to machines with lower loads and transmission in the same region. After receiving the collection request, CanalServer will register the collection information on ZooKeeper. The registration content includes:
This serves two purposes:
The subscription to Binlog is based on the granularity of MySQL DB, and the Binlog of a DB corresponds to a Kafka Topic. In the underlying implementation, all subscribed DBs under a MySQL instance are processed by the same Canal Instance. This is because Binlog is generated at the granularity of the MySQL instance. CanalServer will discard the unsubscribed Binlog data, and then CanalClient will distribute the received Binlog to Kafka according to DB granularity. Restore MySQL data offline After completing Binlog collection, the next step is to use Binlog to restore business data. The first problem to be solved is to synchronize Binlog from Kafka to Hive. Kafka2Hive The management of the entire Kafka2Hive task is carried out under the ETL framework of the Meituan data platform, including the expression of task primitives and scheduling mechanisms, which are similar to other ETLs. The underlying layer uses LinkedIn's open source project Camus, and carries out targeted secondary development to complete the real Kafka2Hive data transmission work. Secondary development of Camus The Binlog stored on Kafka does not have a schema, but the Hive table must have a schema, and the design of its partitions, fields, etc. must facilitate efficient downstream consumption. The first modification made to Camus is to parse the Binlog on Kafka into a format that conforms to the target schema. The second transformation of Camus was determined by Meituan’s ETL framework. In our task scheduling system, we currently only analyze the upstream and downstream dependencies of tasks in the same scheduling queue, and dependencies cannot be established across scheduling queues. In the entire process of MySQL2Hive, the Kafka2Hive task needs to be executed once every hour (hourly queue), and the Merge task needs to be executed once a day (day queue). The launch of the Merge task must strictly depend on the completion of the hourly Kafka2Hive task. To solve this problem, we introduced the Checkdone task. The Checkdone task is a daily task, which is mainly responsible for checking whether Kafka2Hive was successfully completed on the previous day. If it is completed successfully, the Checkdone task is executed successfully, so that the downstream Merge task can be started correctly. Checkdone detection logic How does Checkdone detect? After each Kafka2Hive task successfully completes data transmission, Camus is responsible for recording the start time of the task in the corresponding HDFS directory. Checkdone will scan all timestamps of the previous day. If the largest timestamp exceeds 0:00, it means that the Kafka2Hive tasks of the previous day have been successfully completed, and Checkdone has completed the detection. In addition, since Camus itself only completes the process of reading Kafka and then writing HDFS files, it must also complete the loading of Hive partitions before it can be queried downstream. Therefore, the last step of the entire Kafka2Hive task is to load the Hive partitions. In this way, the entire task is considered to be successfully executed. Each Kafka2Hive task is responsible for reading a specific Topic and writing the Binlog data into a table under the original_binlog library, namely original_binlog.db in the previous figure, which stores all the Binlogs corresponding to a MySQL DB. The figure above shows the directory structure of files on HDFS after Kafka2Hive is completed. If a MySQL DB is called user, the corresponding Binlog is stored in the original_binlog.user table. In the ready directory, the start time of all successfully executed Kafka2Hive tasks on that day is stored on a daily basis for use by Checkdone. The Binlog of each table is organized into a partition. For example, the Binlog of the userinfo table is stored in the partition table_name=userinfo. Under each table_name first-level partition, the second-level partitions are organized by dt. The xxx.lzo and xxx.lzo.index files in the figure store lzo-compressed Binlog data. Merge After the Binlog is successfully stored, the next step is to restore the MySQL data based on the Binlog. The Merge process does two things: first, it stores the Binlog data generated on the day into the Delta table, and then performs a primary key-based Merge with the existing stock data. The data in the Delta table is the latest data of the day. When a piece of data changes multiple times in one day, the Delta table only stores the data after the last change. When merging Delta data with stock data, a unique key is required to determine whether they are the same piece of data. If the same data appears in both the stock table and the Delta table, it means that the data has been updated, and the data in the Delta table is selected as the final result; otherwise, it means that no change has occurred, and the data in the original stock table is retained as the final result. The result data of the Merge will be Insert Overwrite into the original table, which is origindb.table in the figure. Merge process example The following example illustrates the Merge process. The data table has two columns: id and value, where id is the primary key. When extracting Delta data, for multiple updates of the same data, only the most recently updated one is selected. Therefore, for the data with id=1, the Delta table records the last updated value value=120. After the Delta data and the existing data are merged, in the final result, a new data is inserted (id=4), two data are updated (id=1 and id=2), and one data remains unchanged (id=3). By default, we use the primary key of the MySQL table as the unique key for this duplicate detection. The business can also configure a unique key different from MySQL based on actual conditions. The above introduces the overall architecture of Binlog-based data collection and ODS data restoration. The following mainly introduces the actual business problems we solved from two aspects. Practice 1: Support for sharding As the business scale expands, MySQL has more and more sharded databases and tables, and the number of sharded tables for many businesses is in the thousands. Generally, data developers need to aggregate this data together for analysis. If we need to manually synchronize each shard table and then aggregate them on Hive, the cost will be difficult for us to accept. Therefore, we need to complete the aggregation of sub-tables at the ODS layer. First, when collecting Binlog in real time, we support writing Binlogs from different DBs into the same Kafka Topic. When applying for Binlog collection, users can select multiple physical DBs under the same business logic at the same time. By aggregating at the Binlog collection layer, the Binlogs of all shards will be written to the same Hive table, so that the downstream only needs to read one Hive table when performing a merge. Second, the configuration of the Merge task supports regular expression matching. By configuring a regular expression that complies with the naming rules of the business partition tables, the Merge task can understand which MySQL table Binlogs need to be aggregated, and thus select the data of the corresponding partition for execution. In this way, through the work at two levels, the merging of sub-databases and sub-tables at the ODS layer is completed. There is a technical optimization here. When performing Kafka2Hive, we processed the table names according to the business table partitioning rules and converted the physical table names into logical table names. For example, the table name userinfo123 will be converted to userinfo, and its Binlog data will be stored in the table_name=userinfo partition of the original_binlog.user table. This is done to prevent the underlying pressure caused by too many small HDFS files and Hive partitions. Practice 2: Support for deletion events Delete operations are very common in MySQL. Since Hive does not support Delete, if you want to delete the data deleted in MySQL in Hive, you need to use a "circumventive" method. For the Merge process that needs to handle the Delete event, the following two steps are used:
Outlook As the basis for data warehouse production, the Binlog-based MySQL2Hive service provided by Meituan's data platform basically covers all business lines within Meituan. It can currently meet the data synchronization needs of most businesses and realize accurate and efficient warehousing of DB data. In future development, we will focus on solving the single point problem of CanalManager and build a cross-data center disaster recovery architecture to more stably support business development. This article mainly introduces the architecture of this service from two aspects: Binlog streaming collection and Binlog-based ODS data restoration, and introduces some typical problems and solutions we encountered in practice. I hope this can provide some reference value to other developers, and everyone is welcome to communicate with us. Summarize The above is the full content of this article. I hope that the content of this article will have certain reference learning value for your study or work. Thank you for your support of 123WORDPRESS.COM. If you want to learn more about this, please check out the following links You may also be interested in:
|
<<: Native JS to implement the aircraft war game
>>: CentOS7 installation zabbix 4.0 tutorial (illustration and text)
Table of contents Seconds_Behind_Master Original ...
1. View openjdk rpm -qa|grep jdk 2. Delete openjd...
Table of contents Problem Analysis Why encapsulat...
(I) Basic concepts of web page color matching (1) ...
In addition to B-Tree indexes, MySQL also provide...
Introduction to jQuery The jQuery library can be ...
Table of contents Technology Stack Backend build ...
Table of contents 1. Enter the network card confi...
Make an animation of the eight planets in the sol...
Original address: https://blog.csdn.net/m0_465798...
In the previous article https://www.jb51.net/arti...
difficulty Two mask creation of svg graphics Firs...
Uninstall MariaDB CentOS7 installs MariaDB instea...
Table of contents Preface Reference Comparison Ma...