Solution to the data asymmetry problem between MySQL and Elasticsearch

Solution to the data asymmetry problem between MySQL and Elasticsearch

Solution to the data asymmetry problem between MySQL and Elasticsearch

jdbc-input-plugin can only implement database append and incremental write to elasticsearch, but the database on the jdbc source side may often perform database deletion or update operations. This creates an asymmetry between the database and the search engine's database.

Of course, if you have a development team, you can write a program to synchronize search engine operations when deleting or updating. If you don't have this ability, you can try the following method.

Here is a data table article, the mtime field is defined as ON UPDATE CURRENT_TIMESTAMP, so the time of each update of mtime will change

mysql> desc article;
+-------------+--------------+------+-----+--------------------------------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+--------------+------+-----+--------------------------------+-------+
| id | int(11) | NO | | 0 | |
| title | mediumtext | NO | | NULL | |
| description | mediumtext | YES | | NULL | |
| author | varchar(100) | YES | | NULL | |
| source | varchar(100) | YES | | NULL | |
| content | longtext | YES | | NULL | |
| status | enum('Y','N')| NO | | 'N' | |
| ctime | timestamp | NO | | CURRENT_TIMESTAMP | |
| mtime | timestamp | YES | | ON UPDATE CURRENT_TIMESTAMP | |
+-------------+--------------+------+-----+--------------------------------+-------+
7 rows in set (0.00 sec)

Logstash adds query rules for mtime

jdbc {
  jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
  jdbc_user => "cms"
  jdbc_password => "password"
  schedule => "* * * * *" #Timed cron expression, here it is executed once every minute statement => "select * from article where mtime > :sql_last_value"
  use_column_value => true
  tracking_column => "mtime"
  tracking_column_type => "timestamp" 
  record_last_run => true
  last_run_metadata_path => "/var/tmp/article-mtime.last"
 }

Create a recycle bin table, which is used to solve the problem of database deletion or disabling status = 'N'.

CREATE TABLE `elasticsearch_trash` (
 `id` int(11) NOT NULL,
 `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

Create a trigger for the article table

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW
BEGIN
 -- The logic here is to solve the problem that when the article status becomes N, the corresponding data in the search engine needs to be deleted.
 IF NEW.status = 'N' THEN
 insert into elasticsearch_trash(id) values(OLD.id);
 END IF;
 -- The logic here is that when the status is changed to Y, the article ID still exists in the elasticsearch_trash method, resulting in accidental deletion. Therefore, you need to delete the recycling records in the Recycle Bin.
  IF NEW.status = 'Y' THEN
 delete from elasticsearch_trash where id = OLD.id;
 END IF;
END

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW
BEGIN
 -- The logic here is that when an article is deleted, the article will be put into the search engine recycle bin.
 insert into elasticsearch_trash(id) values(OLD.id);
END

Next, we need to write a simple Shell that runs once every minute to retrieve data from the elasticsearch_trash data table, and then use the curl command to call the elasticsearch restful interface to delete the retrieved data.

You can also develop related programs. Here is a Spring boot scheduled task example.

entity

package cn.netkiller.api.domain.elasticsearch;

import java.util.Date;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table
public class ElasticsearchTrash {
 @Id
 private int id;

 @Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
 private Date ctime;

 public int getId() {
 return id;
 }

 public void setId(int id) {
 this.id = id;
 }

 public Date getCtime() {
 return ctime;
 }

 public void setCtime(Date ctime) {
 this.ctime = ctime;
 }

}

storehouse

package cn.netkiller.api.repository.elasticsearch;

import org.springframework.data.repository.CrudRepository;

import com.example.api.domain.elasticsearch.ElasticsearchTrash;

public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{


}

Scheduled tasks

package cn.netkiller.api.schedule;

import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.example.api.domain.elasticsearch.ElasticsearchTrash;
import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;

@Component
public class ScheduledTasks {
 private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);

 @Autowired
 private TransportClient client;

 @Autowired
 private ElasticsearchTrashRepository alasticsearchTrashRepository;

 public ScheduledTasks() {
 }

 @Scheduled(fixedRate = 1000 * 60) // Run the scheduled task once every 60 seconds public void cleanTrash() {
 for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {
  DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();
  RestStatus status = response.status();
  logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString());
  if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) {
  alasticsearchTrashRepository.delete(elasticsearchTrash);
  }
 }
 }
}

Spring boot starts the main program.

package cn.netkiller.api;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class Application {

 public static void main(String[] args) {
 SpringApplication.run(Application.class, args);
 }
}
 

The above is an explanation of the solution to the data asymmetry problem between MySQL and Elasticsearch. If you have any questions, please leave a message or discuss in the community of this site. Thank you for reading and I hope it can help you. Thank you for your support of this site!

You may also be interested in:
  • Installation and configuration of MySQL 5.6 under Windows with screenshots and detailed instructions
  • Usage instructions for the Mysql string interception function SUBSTRING
  • Summary of MySQL date data type and time type usage
  • MySQL user creation and authorization method
  • Instructions for using the MySQL CASE WHEN statement
  • Detailed usage of mysql update statement
  • MySQL Tips: Solution to the problem of server quit without updating PID file

<<:  Try Docker+Nginx to deploy single page application method

>>:  Nginx try_files directive usage examples

Recommend

How to calculate the value of ken_len in MySQL query plan

The meaning of key_len In MySQL, you can use expl...

js dynamically adds example code for a list of circled numbers

1. Add the ul tag in the body first <!-- Unord...

mysql8.0 windows x64 zip package installation and configuration tutorial

MySQL 8 Windows version zip installation steps (d...

Vue2 implements provide inject to deliver responsiveness

1. Conventional writing in vue2 // The parent com...

JavaScript realizes the generation and verification of random codes

The generation and verification of random codes i...

Detailed explanation of new relational database features in MySQL 8.0

Preface The latest version of MySQL 8.0 is 8.0.4 ...

Solution to the failure of 6ull to load the Linux driver module

Table of contents 0x01 Failed to load the driver ...

Facebook's nearly perfect redesign of all Internet services

<br />Original source: http://www.a-xuan.cn/...

Introduction to fuzzy query method using instr in mysql

Using the internal function instr in MySQL can re...

3 ways to create JavaScript objects

Table of contents 1. Object literals 2. The new k...

Summary of methods for querying MySQL user permissions

Introduce two methods to view MySQL user permissi...

Record of the actual process of packaging and deployment of Vue project

Table of contents Preface 1. Preparation - Server...

Select does not support double click dbclick event

XML/HTML CodeCopy content to clipboard < div c...