Example of how to import nginx logs into elasticsearch

Example of how to import nginx logs into elasticsearch

The nginx logs are collected by filebeat and passed to logstash, and then written to elasticsearch after being processed by logstash. Filebeat is only responsible for collection work, while logstash completes log formatting, data replacement, splitting, and creation of indexes after writing logs to elasticsearch.

1. Configure nginx log format

log_format main '$remote_addr $http_x_forwarded_for [$time_local] $server_name $request ' 
            '$status $body_bytes_sent $http_referer ' 
            '"$http_user_agent" '
            '"$connection" '
            '"$http_cookie" '
            '$request_time'
            '$upstream_response_time';

2. Install and configure filebeat and enable nginx module

tar -zxvf filebeat-6.2.4-linux-x86_64.tar.gz -C /usr/local
cd /usr/local;ln -s filebeat-6.2.4-linux-x86_64 filebeat
cd /usr/local/filebeat

Enable nginx module

./filebeat modules enable nginx

View Module

./filebeat modules list

Create a configuration file

vim /usr/local/filebeat/blog_module_logstash.yml
filebeat.modules:
- module: nginx
 access:
  enabled: true
  var.paths: ["/home/weblog/blog.cnfol.com_access.log"]
 #error:
 # enabled: true
 # var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"]


output.logstash:
 hosts: ["192.168.15.91:5044"]

Start filebeat

./filebeat -c blog_module_logstash.yml -e

3. Configure logstash

tar -zxvf logstash-6.2.4.tar.gz /usr/local
cd /usr/local;ln -s logstash-6.2.4 logstash
Create a pipline file for nginx log cd /usr/local/logstash

Logstash built-in template directory

vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns

Edit grok-patterns and add a regular pattern that supports multiple IPs

FORWORD (?:%{IPV4}[,]?[ ]?)+|%{WORD}

Official grok

http://grokdebug.herokuapp.com/patterns#

Create a logstash pipline configuration file

#input {
# stdin {}
#}
# Accept data input from filebeat {
 beats {
 port => 5044
 host => "0.0.0.0"
 }
}

filter {
 # Add a debugging switch mutate{add_field => {"[@metadata][debug]"=>true}}
 grok {
 # Filter nginx log #match => { "message" => "%{NGINXACCESS_TEST2}" }
 #match => { "message" => '%{IPORHOST:clientip} # (?<http_x_forwarded_for>[^\#]*) # \[%{HTTPDATE:[@metadata][webtime]}\] # %{NOTSPACE:hostname} # %{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} # %{NUMBER:response} # (?:%{NUMBER:bytes}|-) # (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) # (?:"(?<http_user_agent>[^#]*)") # (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) # (?:"(?<cookies>[^#]*)") # %{NUMBER:request_time:float} # (?:%{NUMBER:upstream_response_time:float}|-)' }
 #match => { "message" => '(?:%{IPORHOST:clientip}|-) (?:%{TWO_IP:http_x_forwarded_for}|%{IPV4:http_x_forwarded_for}|-) \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) (?:"(?<cookies>[^#]*)") %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' }
    match => { "message" => '(?:%{IPORHOST:clientip}|-) %{FORWORD:http_x_forwarded_for} \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) %{QS:cookie} %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' }
 }
 # Assign the default @timestamp (the time when beats collects logs) value to the new field @read_tiimestamp
 ruby { 
 #code => "event.set('@read_timestamp',event.get('@timestamp'))"
 #Change the time zone to East 8 code => "event.set('@read_timestamp',event.get('@timestamp').time.localtime + 8*60*60)"
 }
 # Format the nginx log record time # Format time 20/May/2015:21:05:56 +0000
 date {
 locale => "en"
 match => ["[@metadata][webtime]","dd/MMM/yyyy:HH:mm:ss Z"]
 }
 # Convert the bytes field from a string to a number mutate {
 convert => {"bytes" => "integer"}
 }
 # Parse the cookie field into a json
 #mutate {
 # gsub => ["cookies",'\;',',']
 #} 
 # If CDN acceleration is used, there will be multiple IP addresses for http_x_forwarded_for. The first IP address is the user's real IP address.
 if[http_x_forwarded_for] =~ ", "{
     ruby {
         code => 'event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])'
        }
    }
 # Parse the IP address and obtain the geographical location of the IP address geoip {
 source => "http_x_forwarded_for"
 # # Get only the latitude and longitude, country, city, and time zone fields of the IP => ["location","country_name","city_name","region_name"] 
 }
 # Parse the agent field to obtain specific information such as browser and system version useragent {
 source => "agent"
 target => "useragent"
 }
 #Specify the data to be deleted#mutate{remove_field=>["message"]}
 # Set the index name prefix according to the log name ruby ​​{
 code => 'event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])'
 } 
 # Format @timestamp to 2019.04.23
 ruby {
 code => 'event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%Y.%m.%d"))'
 }
 # Set the default index name for output mutate {
 add_field => {
  #"[@metadata][index]" => "%{@[metadata][index_pre]}_%{+YYYY.MM.dd}"
  "[@metadata][index]" => "%{@[metadata][index_pre]}_%{@[metadata][index_day]}"
 }
 }
 # Parse the cookies field into json
# mutate {
# gsub => [
# "cookies", ";", ",",
# "cookies", "=", ":"
# ]
# #split => {"cookies" => ","}
# }
# json_encode {
# source => "cookies"
# target => "cookies_json"
# }
# mutate {
# gsub => [
# "cookies_json", ',', '","',
# "cookies_json", ':', '":"'
# ]
# }
# json {
# source => "cookies_json"
# target => "cookies2"
# }
 # If there is an error in grok parsing, write the error to a separate index if "_grokparsefailure" in [tags] {
 #if "_dateparsefailure" in [tags] {
 mutate {
  replace => {
  #"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{+YYYY.MM.dd}"
  "[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}"
  }
 }
 # If there is no error, delete the message
 }else{
 mutate{remove_field=>["message"]}
 }
}

output {
 if [@metadata][debug]{
 # Output to rubydebuyg and output metadata
 stdout{codec => rubydebug{metadata => true}}
 }else{
 # Convert the output content to "."
 stdout{codec => dots} 
 # Output to the specified es
 elasticsearch
  hosts => ["192.168.15.160:9200"]
  index => "%{[@metadata][index]}"
  document_type => "doc"
 } 
 }
}

Start logstash

nohup bin/logstash -f test_pipline2.conf &

The above is the full content of this article. I hope it will be helpful for everyone’s study. I also hope that everyone will support 123WORDPRESS.COM.

You may also be interested in:
  • Detailed explanation of how to use ELK to analyze Nginx server logs
  • Detailed explanation of Nginx log cutting by date (cutting by day)
  • nginx log cutting shell script
  • Configuration example of logging in JSON format in nginx
  • Shell script analysis of nginx log access times and the most time-consuming pages (slow query)
  • How to automatically delete Nginx logs periodically
  • Nginx log processing script under Windows
  • Python parses nginx log files

<<:  JS uses canvas technology to imitate echarts bar chart

>>:  How to install MySQL under Linux (yum and source code compilation)

Recommend

JavaScript implements Tab bar switching effects

Here is a case that front-end developers must kno...

Use elasticsearch to delete index data regularly

1. Sometimes we use ES Due to limited resources o...

Pagination Examples and Good Practices

<br />Structure and hierarchy reduce complex...

Summary of common commands for Linux user and group management

This article summarizes the common commands for L...

Semantics: Is Html/Xhtml really standards-compliant?

<br />Original text: http://jorux.com/archiv...

Simple implementation method of Linux process monitoring and automatic restart

Purpose: Under Linux, the server program may be d...

Getting Started Guide to MySQL Sharding

Preface Relational databases are more likely to b...

Detailed explanation of the use of Join in Mysql

In the previous chapters, we have learned how to ...

Solution to the problem that docker nginx cannot be accessed after running

## 1 I'm learning docker deployment recently,...

Comprehensive analysis of isolation levels in MySQL

When the database concurrently adds, deletes, and...

Implementing a simple Christmas game with JavaScript

Table of contents Preface Achieve results Code CS...

Some summary of html to pdf conversion cases (multiple pictures recommended)

Due to work requirements, I recently spent some t...

Solution to Ubuntu not being able to connect to the Internet

Problem description: I used a desktop computer an...

Explanation of factors affecting database performance in MySQL

A story about database performance During the int...