Preface We often need to do something based on some operations that users perform on their own data. For example, if a user deletes his account, we will send him a text message to scold him and beg him to come back. A similar function can certainly be implemented in the business logic layer, and this operation can be performed after receiving the user's deletion request, but the database binlog provides us with another operation method. To monitor binlog, two steps are required. The first step is of course to enable this function in your MySQL, and the second step is to write a program to read the log. mysql turns on binlog. First of all, MySQL's binlog is usually not open, so we need: Find the mysql configuration file my.cnf. The location may vary depending on the operating system. You can find it yourself. Add the following content to it: [mysqld] server_id = 1 log-bin = mysql-bin binlog-format = ROW Then restart mysql. /ubuntu service mysql restart // mac mysql.server restart Monitor whether it is enabled successfully Enter the mysql command line and execute: show variables like '%log_bin%' ; If the result is as shown below, it is successful: View the binlog status being written: Code to read binlog Introducing dependencies We use some open source implementations. For some strange reasons, I chose the mysql-binlog-connector-java package (official github repository) [github.com/shyiko/mysq…] The specific dependencies are as follows: <!-- https://mvnrepository.com/artifact/com.github.shyiko/mysql-binlog-connector-java --> <dependency> <groupId>com.github.shyiko</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>0.17.0</version> </dependency> Of course, there are many open source implementations for binlog processing, Alibaba's cancl is one, and you can also use it. Write a demo According to the readme in the official repository, let's simply write a demo. public static void main(String[] args) { BinaryLogClient client = new BinaryLogClient("hostname", 3306, "username", "passwd"); EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setCompatibilityMode( EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY ); client.setEventDeserializer(eventDeserializer); client.registerEventListener(new BinaryLogClient.EventListener() { @Override public void onEvent(Event event) { //TODO dosomething(); logger.info(event.toString()); } }); client.connect(); } This is written completely according to the official tutorial. You can write your own business logic in onEvent. Since I am just testing, I print out every event in it. After that, I manually logged into MySQL and performed add, modify, and delete operations respectively. The logs monitored were as follows:
Encapsulate a better and more customized tool class according to your own business I was going to post the code at first,,, but as the code got longer and longer, I decided to upload it to GitHub. Here I will only post part of the implementation. Code transfer portal Implementation ideas
So the implementation ideas are roughly as follows:
Initialization code: public MysqlBinLogListener(Conf conf) { BinaryLogClient client = new BinaryLogClient(conf.host, conf.port, conf.username, conf.passwd); EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setCompatibilityMode( EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY ); client.setEventDeserializer(eventDeserializer); this.parseClient = client; this.queue = new ArrayBlockingQueue<>(1024); this.conf = conf; listeners = new ConcurrentHashMap<>(); dbTableCols = new ConcurrentHashMap<>(); this.consumer = Executors.newFixedThreadPool(consumerThreads); } Registration Code: public void regListener(String db, String table, BinLogListener listener) throws Exception { String dbTable = getdbTable(db, table); Class.forName("com.mysql.jdbc.Driver"); // Save the column information of the currently registered table Connection connection = DriverManager.getConnection("jdbc:mysql://" + conf.host + ":" + conf.port, conf.username, conf.passwd); Map<String, Colum> cols = getColMap(connection, db, table); dbTableCols.put(dbTable, cols); //Save the currently registered listener List<BinLogListener> list = listeners.getOrDefault(dbTable, new ArrayList<>()); list.add(listener); listeners.put(dbTable, list); } In this step, we obtain the schema information of the table while registering the listener and save it in the map to facilitate subsequent data processing. Listening code: @Override public void onEvent(Event event) { EventType eventType = event.getHeader().getEventType(); if (eventType == EventType.TABLE_MAP) { TableMapEventData tableData = event.getData(); String db = tableData.getDatabase(); String table = tableData.getTable(); dbTable = getdbTable(db, table); } // Only handle the three operations of adding, deleting and updating if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) { if (isWrite(eventType)) { WriteRowsEventData data = event.getData(); for (Serializable[] row : data.getRows()) { if (dbTableCols.containsKey(dbTable)) { LogItem e = LogItem.itemFromInsert(row, dbTableCols.get(dbTable)); e.setDbTable(dbTable); queue.add(e); } } } } } I'm lazy,,, here I only implement the processing of the add operation, and I haven't written other operations. Consumption code: public void parse() throws IOException { parseClient.registerEventListener(this); for (int i = 0; i < consumerThreads; i++) { consumer.submit(() -> { while (true) { if (queue.size() > 0) { try { LogItem item = queue.take(); String dbtable = item.getDbTable(); listeners.get(dbtable).forEach(l -> { l.onEvent(item); }); } catch (InterruptedException e) { e.printStackTrace(); } } Thread.sleep(1000); } }); } parseClient.connect(); } When consuming, get the item from the queue, then get the corresponding one or more listeners to consume the item respectively. Test code: public static void main(String[] args) throws Exception { Conf conf = new Conf(); conf.host = "hostname"; conf.port = 3306; conf.username = conf.passwd = "hhsgsb"; MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener(conf); mysqlBinLogListener.parseArgsAndRun(args); mysqlBinLogListener.regListener("pf", "student", item -> { System.out.println(new String((byte[])item.getAfter().get("name"))); logger.info("insert into {}, value = {}", item.getDbTable(), item.getAfter()); }); mysqlBinLogListener.regListener("pf", "teacher", item -> System.out.println("teacher ====")); mysqlBinLogListener.parse(); } In this short code, two listeners are registered to listen to the student and teacher tables respectively, and print them respectively. After testing, when inserting data into the teacher table, the defined business logic can be run independently. Note: The tool class here cannot be used directly, because there are many exception handlings not done, and the function only listens to the insert statement, which can be used as a reference for implementation. References
Summarize The above is the full content of this article. I hope that the content of this article will have certain reference learning value for your study or work. Thank you for your support of 123WORDPRESS.COM. You may also be interested in:
|
<<: vitrualBox+ubuntu16.04 install python3.6 latest tutorial and detailed steps
>>: Detailed steps to install a virtual machine and use CentOS 8 using VMware 15
Table of contents Why choose react-beautiful-dnd ...
When learning mybatis, I encountered an error, th...
Table of contents 1. Why do we need vue3? 2. Adva...
Use of stored procedure in parameters IN paramete...
URL: http://hostname.com/contextPath/servletPath/...
<p><b>This is bold font</b></...
The following command is often used: chmod 777 文件...
The before/after pseudo-class is equivalent to in...
Table of contents Login business process Login fu...
Table of contents Event Loop miscroTask (microtas...
Usage of time difference functions TIMESTAMPDIFF ...
This article example shares the simple implementa...
Sometimes it is necessary to perform simple verif...
Table of contents 1. The default focus is on the ...
During the crawler development process, you must ...