Steps to enable MySQL database monitoring binlog

Steps to enable MySQL database monitoring binlog

Preface

We often need to do something based on some operations that users perform on their own data.

For example, if a user deletes his account, we will send him a text message to scold him and beg him to come back.

A similar function can certainly be implemented in the business logic layer, and this operation can be performed after receiving the user's deletion request, but the database binlog provides us with another operation method.

To monitor binlog, two steps are required. The first step is of course to enable this function in your MySQL, and the second step is to write a program to read the log.

mysql turns on binlog.

First of all, MySQL's binlog is usually not open, so we need:

Find the mysql configuration file my.cnf. The location may vary depending on the operating system. You can find it yourself.

Add the following content to it:

[mysqld]
server_id = 1
log-bin = mysql-bin
binlog-format = ROW

Then restart mysql.

/ubuntu
service mysql restart
// mac
mysql.server restart

Monitor whether it is enabled successfully

Enter the mysql command line and execute:

show variables like '%log_bin%' ;

If the result is as shown below, it is successful:


View the binlog status being written:


Code to read binlog

Introducing dependencies

We use some open source implementations. For some strange reasons, I chose the mysql-binlog-connector-java package (official github repository) [github.com/shyiko/mysq…] The specific dependencies are as follows:

<!-- https://mvnrepository.com/artifact/com.github.shyiko/mysql-binlog-connector-java -->
 <dependency>
 <groupId>com.github.shyiko</groupId>
 <artifactId>mysql-binlog-connector-java</artifactId>
 <version>0.17.0</version>
 </dependency>

Of course, there are many open source implementations for binlog processing, Alibaba's cancl is one, and you can also use it.

Write a demo

According to the readme in the official repository, let's simply write a demo.

 public static void main(String[] args) {
 BinaryLogClient client = new BinaryLogClient("hostname", 3306, "username", "passwd");
 EventDeserializer eventDeserializer = new EventDeserializer();
 eventDeserializer.setCompatibilityMode(
 EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
 EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
 );
 client.setEventDeserializer(eventDeserializer);
 client.registerEventListener(new BinaryLogClient.EventListener() {

 @Override
 public void onEvent(Event event) {
 //TODO
 dosomething();
 logger.info(event.toString());
 }
 });
 client.connect();
 }

This is written completely according to the official tutorial. You can write your own business logic in onEvent. Since I am just testing, I print out every event in it.

After that, I manually logged into MySQL and performed add, modify, and delete operations respectively. The logs monitored were as follows:

00:23:13.331 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=0, eventType=ROTATE, serverId=1, headerLength=19, dataLength=28, nextPosition=0, flags=32}, data=RotateEventData{binlogFilename='mysql-bin.000001', binlogPosition=886}}
00:23:13.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468403000, eventType=FORMAT_DESCRIPTION, serverId=1, headerLength=19, dataLength=100, nextPosition=0, flags=0}, data=FormatDescriptionEventData{binlogVersion=4, serverVersion='5.7.23-0ubuntu0.16.04.1-log', headerLength=19, dataLength=95}}
00:23:23.715 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=951, flags=0}, data=null}
00:23:23.716 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1021, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:23.721 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1072, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:23.724 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1114, flags=0}, data=WriteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
[[B@546a03af, 2]
]}}
00:23:23.725 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1145, flags=0}, data=XidEventData{xid=28}}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1210, flags=0}, data=null}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1280, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:55.873 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1331, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=31, nextPosition=1381, flags=0}, data=UpdateRowsEventData{tableId=108, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[
{before=[[B@6833ce2c, 1], after=[[B@725bef66, 3]}
]}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1412, flags=0}, data=XidEventData{xid=41}}
00:24:22.333 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1477, flags=0}, data=null}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1547, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1598, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1640, flags=0}, data=DeleteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
[[B@1888ff2c, 3]
]}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1671, flags=0}, data=XidEventData{xid=42}}

Encapsulate a better and more customized tool class according to your own business

I was going to post the code at first,,, but as the code got longer and longer, I decided to upload it to GitHub. Here I will only post part of the implementation. Code transfer portal

Implementation ideas

  1. Support monitoring of a single table, because we don't really want to monitor all data tables in all databases.
  2. Can be consumed by multiple threads.
  3. Convert the monitored content into a form that we like (the data structure in the article may not be very good, I can't think of a more suitable one).

So the implementation ideas are roughly as follows:

  1. Encapsulate a client, only provide the acquisition method to the outside, and shield the initialization details code.
  2. Provides a method to register a listener (pseudo), which can register a listener for a table (redefine a listener interface, and all registered listeners just need to implement this).
  3. The only real listener is the client, which listens to all operations on this database instance and converts them into the desired format LogItem and puts them into the blocking queue.
  4. Start multiple threads, consume blocking queues, call the corresponding data table listener for a LogItem, and do some business logic.

Initialization code:

 public MysqlBinLogListener(Conf conf) {
 BinaryLogClient client = new BinaryLogClient(conf.host, conf.port, conf.username, conf.passwd);
 EventDeserializer eventDeserializer = new EventDeserializer();
 eventDeserializer.setCompatibilityMode(
 EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
 EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
 );
 client.setEventDeserializer(eventDeserializer);
 this.parseClient = client;
 this.queue = new ArrayBlockingQueue<>(1024);
 this.conf = conf;
 listeners = new ConcurrentHashMap<>();
 dbTableCols = new ConcurrentHashMap<>();
 this.consumer = Executors.newFixedThreadPool(consumerThreads);
 }

Registration Code:

 public void regListener(String db, String table, BinLogListener listener) throws Exception {
 String dbTable = getdbTable(db, table);
 Class.forName("com.mysql.jdbc.Driver");
 // Save the column information of the currently registered table Connection connection = DriverManager.getConnection("jdbc:mysql://" + conf.host + ":" + conf.port, conf.username, conf.passwd);
 Map<String, Colum> cols = getColMap(connection, db, table);
 dbTableCols.put(dbTable, cols);

 //Save the currently registered listener
 List<BinLogListener> list = listeners.getOrDefault(dbTable, new ArrayList<>());
 list.add(listener);
 listeners.put(dbTable, list);
 }

In this step, we obtain the schema information of the table while registering the listener and save it in the map to facilitate subsequent data processing.

Listening code:

 @Override
 public void onEvent(Event event) {
 EventType eventType = event.getHeader().getEventType();

 if (eventType == EventType.TABLE_MAP) {
 TableMapEventData tableData = event.getData();
 String db = tableData.getDatabase();
 String table = tableData.getTable();
 dbTable = getdbTable(db, table);
 }

 // Only handle the three operations of adding, deleting and updating if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) {
 if (isWrite(eventType)) {
 WriteRowsEventData data = event.getData();
 for (Serializable[] row : data.getRows()) {
  if (dbTableCols.containsKey(dbTable)) {
  LogItem e = LogItem.itemFromInsert(row, dbTableCols.get(dbTable));
  e.setDbTable(dbTable);
  queue.add(e);
  }
 }
 }
 }
 }

I'm lazy,,, here I only implement the processing of the add operation, and I haven't written other operations.

Consumption code:

 public void parse() throws IOException {
 parseClient.registerEventListener(this);

 for (int i = 0; i < consumerThreads; i++) {
 consumer.submit(() -> {
 while (true) {
  if (queue.size() > 0) {
  try {
  LogItem item = queue.take();
  String dbtable = item.getDbTable();
  listeners.get(dbtable).forEach(l -> {
  l.onEvent(item);
  });

  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  }
  Thread.sleep(1000);
 }
 });
 }
 parseClient.connect();
 }

When consuming, get the item from the queue, then get the corresponding one or more listeners to consume the item respectively.

Test code:

 public static void main(String[] args) throws Exception {
 Conf conf = new Conf();
 conf.host = "hostname";
 conf.port = 3306;
 conf.username = conf.passwd = "hhsgsb";

 MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener(conf);
 mysqlBinLogListener.parseArgsAndRun(args);
 mysqlBinLogListener.regListener("pf", "student", item -> {
 System.out.println(new String((byte[])item.getAfter().get("name")));
 logger.info("insert into {}, value = {}", item.getDbTable(), item.getAfter());
 });
 mysqlBinLogListener.regListener("pf", "teacher", item -> System.out.println("teacher ===="));

 mysqlBinLogListener.parse();
 }

In this short code, two listeners are registered to listen to the student and teacher tables respectively, and print them respectively. After testing, when inserting data into the teacher table, the defined business logic can be run independently.

Note: The tool class here cannot be used directly, because there are many exception handlings not done, and the function only listens to the insert statement, which can be used as a reference for implementation.

References

  • github.com/shyiko/mysq…
  • https://www.jb51.net/article/166761.htm

Summarize

The above is the full content of this article. I hope that the content of this article will have certain reference learning value for your study or work. Thank you for your support of 123WORDPRESS.COM.

You may also be interested in:
  • MySQL series: redo log, undo log and binlog detailed explanation
  • Specific use of MySQL binlog_ignore_db parameter
  • How to choose the format when using binlog in MySQL
  • Detailed explanation of the binlog log analysis tool for monitoring MySQL: Canal
  • In-depth explanation of binlog in MySQL 8.0
  • Summary of some thoughts on binlog optimization in MYSQL
  • Detailed explanation of MySQL database binlog cleanup command
  • How to distinguish MySQL's innodb_flush_log_at_trx_commit and sync_binlog

<<:  vitrualBox+ubuntu16.04 install python3.6 latest tutorial and detailed steps

>>:  Detailed steps to install a virtual machine and use CentOS 8 using VMware 15

Recommend

Using react-beautiful-dnd to implement drag and drop between lists

Table of contents Why choose react-beautiful-dnd ...

About the configuration problem of MyBatis connecting to MySql8.0 version

When learning mybatis, I encountered an error, th...

Summary of the advantages of Vue3 vs. Vue2

Table of contents 1. Why do we need vue3? 2. Adva...

Detailed explanation of the entry-level use of MySql stored procedure parameters

Use of stored procedure in parameters IN paramete...

Detailed explanation of setting Context Path in Web application

URL: http://hostname.com/contextPath/servletPath/...

Linux common commands chmod to modify file permissions 777 and 754

The following command is often used: chmod 777 文件...

Detailed explanation of common usage of pseudo-classes before and after in CSS3

The before/after pseudo-class is equivalent to in...

Detailed explanation of Vue login and logout

Table of contents Login business process Login fu...

Analysis of the principle of Vue nextTick

Table of contents Event Loop miscroTask (microtas...

Usage of MySQL time difference functions TIMESTAMPDIFF and DATEDIFF

Usage of time difference functions TIMESTAMPDIFF ...

Vue implements simple production of counter

This article example shares the simple implementa...

JavaScript implements password box input verification

Sometimes it is necessary to perform simple verif...

Introduction to document.activeELement focus element in JavaScript

Table of contents 1. The default focus is on the ...

Example of using Docker Swarm to build a distributed crawler cluster

During the crawler development process, you must ...