Analysis of the principle of Rabbitmq heartbea heartbeat detection mechanism

Analysis of the principle of Rabbitmq heartbea heartbeat detection mechanism

Preface

When using RabbitMQ, if there is no traffic between your client and the RabbitMQ server for a period of time, the server will disconnect the TCP connection with the client.

And you will see logs like this on the server:

missed heartbeats from client, timeout: xxs

This interval is the heartbeat interval.

Heartbeat is usually used to detect whether the other end of the communication is alive (abnormal crash caused by failure to close the socket connection normally). The basic principle is to detect whether the data transmission and reception on the corresponding socket connection is normal. If no data is sent or received for a period of time, a heartbeat detection packet is sent to the other end. If there is no response within a period of time, it is considered that the heartbeat has timed out, that is, it is considered that the other end may have crashed abnormally.

RabbitMQ is no exception. Heatbeat is used between the client and the server to detect whether the other end is normal, that is, whether the TCP link between the client and the server is normal.

About RabbitMQ heartbeat

1. The heartbeat detection time interval can be configured by adding the configuration item {heartbeat,Timeout} in the configuration file rabbitmq.config, where Timeout specifies the time interval in seconds. In addition, the client can also configure the heartbeat time.

If the server is not configured

Default proxy heartbeat time:

RabbitMQ 3.2.2: 580 seconds
RabbitMQ 3.5.5: 60 seconds

2. The official recommendation is not to disable heartbeat, and the recommended heartbeat time is 60 seconds.

3. The heartbeat is sent every heartbeat timeout / 2 seconds. If the server does not receive it twice, the TCP connection will be disconnected, the previous connection will be invalid, and the client needs to reconnect.

4. If you use Java, .NET and Erlang clients, the server and client will negotiate the heartbeat time

If either value is 0, the larger of the two is used.

Otherwise, use the smaller of the two

If both values ​​are 0, it means that the heartbeat is disabled, and the server and the client maintain this TCP connection and will not disconnect.

Note: Setting this to 0 directly on the Python client will disable the heartbeat.

How to disable heartbeat in Python client:

Just set heartbeat_interval=0 in py3:ConnectionParameters.

Just set heartbeat=0 in py2:ConnectionParameters.

5. Any traffic on the connection (transmitted valid data, confirmation, etc.) will be counted as a valid heartbeat, including heartbeat frames.

6. I saw someone ask this question online:

Why does the server crash? Under the heartbeat detection mechanism, the server side is disconnected, but the client side cannot detect the TCP disconnection. I have tested that the client cannot detect the TCP connection disconnection. It can only be detected when the client has an operation on this TCP. Of course, performing operations on a disconnected TCP connection will result in an error (such as sending a message).

import pika 
import time 

credit = pika.PlainCredentials(username='cloud', password='cloud')
connection = pika.BlockingConnection(pika.ConnectionParameters(
  host='10.32.1.12', credentials=credit))
channel = connection.channel() 
while True:
  connect_close = connection.is_closed
  connect_open = connection.is_open
  channel_close = channel.is_closed
  channel_open = channel.is_open
  
  print("connection is_closed ", connect_close)
  print("connection is_open ", connect_open)
  print("channel is_closed ", channel_close)
  print("channel is_open ", channel_open)
  print("")
  time.sleep(5)

7. Some RabbitMQ clients (Bunny, Java, .NET, Objective-C, Swift) provide a mechanism to automatically restore connections after network failures, while pika can only detect connection anomalies and then re-establish connections.

Sample code: Recreate the connection by detecting connection exceptions:

import pika

while True:
  try:
    connection = pika.BlockingConnection()
    channel = connection.channel()
    channel.basic_consume('test', on_message_callback)
    channel.start_consuming()
  # Don't recover if connection was closed by broker
  except pika.exceptions.ConnectionClosedByBroker:
    break
  # Don't recover on channel errors
  except pika.exceptions.AMQPChannelError:
    break
  # Recover on all other connection errors
  except pika.exceptions.AMQPConnectionError:
    continue

You can also use an operation retry library such as retry.

from retry import retry

@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume():
  connection = pika.BlockingConnection()
  channel = connection.channel()
  channel.basic_consume('test', on_message_callback)
  try:
    channel.start_consuming()
  # Don't recover connections closed by server
  except pika.exceptions.ConnectionClosedByBroker:
    pass
stub()

Implementation of heartbeat

After receiving the connection.tune-ok signaling from the client, rabbitmq enables heartbeat detection. rabbitmq will create two processes for each tcp connection for heartbeat detection. One process periodically detects whether there is data sent on the tcp connection (the sending here means that rabbitmq sends data to the client). If no data is sent to the client for a period of time, a heartbeat packet is sent to the client, and then the next detection is performed in a loop; the other process periodically detects whether there is data received on the tcp connection. If no data is received for a period of time, it is determined that the heartbeat has timed out and the tcp connection will eventually be closed. In addition, RabbitMQ's flow control mechanism may suspend heartbeat detection, which is not described here.

Source code involved:

start(SupPid, Sock, SendTimeoutSec,
   SendFun, ReceiveTimeoutSec, ReceiveFun) ->
  %%Data sending detection process {ok, Sender} = start_heartbeater(SendTimeoutSec, SupPid, Sock,
                   SendFun, heartbeat_sender,
                   start_heartbeat_sender),
  %%Data receiving detection process {ok, Receiver} = start_heartbeater(ReceiveTimeoutSec, SupPid,
                    Sock, ReceiveFun,
                    heartbeat_receiver,
                    start_heartbeat_receiver),
  {Sender, Receiver}.

start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
  %% the 'div 2' is there so that we don't end up waiting for
  %% nearly 2 * TimeoutSec before sending a heartbeat in the
  %% boundary case
  heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
         fun () -> SendFun(), continue end}).

start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
  %% we check for incoming data every interval, and time out after
  %% two checks with no change. As a result we will time out
  %% between 2 and 3 intervals after the last data has been
  %% received
  heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
        fun () -> ReceiveFun(), stop end}).

heartbeater({Sock, TimeoutMillisec, 
       StatName, Threshold, Handler} = Params,
      Deb,
      {StatVal, SameCount} = State) ->
  Recurse = fun (State1) -> heartbeater(Params, Deb, State1) end,
  receive
    ...
  %% Timed detection after TimeoutMillisec ->
    case rabbit_net:getstat(Sock, [StatName]) of
      {ok, [{StatName, NewStatVal}]} ->
        %% The sent and received data has changed if NewStatVal =/= StatVal ->
            %%Restart detection Recurse({NewStatVal, 0});
          %% If the specified number of times is not reached, the sending is 0 and the receiving is 1
          SameCount<Threshold->
            %%Count plus 1, check againRecurse({NewStatVal, SameCount + 1});
          %%heartbeat timeout true ->
            %% For sending detection timeout, send a heartbeat packet to the client %% For receiving detection timeout, send a timeout notification to the parent process %% The parent process triggers TCP shutdown and other operations case Handler() of
              %%Receive detection timeout stop -> ok;
              %%Send detection timeout continue -> Recurse({NewStatVal, 0})
            end;
      ...

When sending and receiving detection, getstat of the inet module is used to view the statistical information of the socket

recv_oct: View the number of bytes received on the socket

send_oct: View the number of bytes sent on the socket

For details about inet, see here: http://www.erlang.org/doc/man/inet.html

The above is the full content of this article. I hope it will be helpful for everyone’s study. I also hope that everyone will support 123WORDPRESS.COM.

You may also be interested in:
  • Docker deployment RabbitMQ container implementation process analysis
  • Detailed tutorial on using RabbitMQ in SpringBoot
  • C# uses RabbitMq queue (simple use of Sample, Work, Fanout, Direct and other modes)
  • How to install rabbitmq-server using yum on centos
  • Implementation example of sending and receiving messages using SpringBoot+RabbitMQ
  • How to implement the production and consumer model in Python rabbitMQ
  • How to quickly install RabbitMQ in Docker
  • How to implement message confirmation mechanism in springboot + rabbitmq (pitfall experience)
  • C# calls RabbitMQ to implement message queue sample code
  • Detailed explanation of the process of building RabbitMq message middleware in Java

<<:  The most complete 50 Mysql database query exercises

>>:  HTML table tag tutorial (8): background image attribute BACKGROUND

Recommend

How to remove the underline of a hyperlink using three simple examples

To remove the underline of a hyperlink, you need t...

Detailed explanation of the use of Join in Mysql

In the previous chapters, we have learned how to ...

9 ways to show and hide CSS elements

In web page production, displaying and hiding ele...

New settings for text and fonts in CSS3

Text Shadow text-shadow: horizontal offset vertic...

Basic HTML directory problem (difference between relative path and absolute path)

Relative path - a directory path established based...

uni-app WeChat applet authorization login implementation steps

Table of contents 1. Application and configuratio...

Specific use of pthread_create in linux to create threads

pthread_create function Function Introduction pth...

5 ways to make your JavaScript codebase cleaner

Table of contents 1. Use default parameters inste...

Detailed process of deploying Docker to WSL2 in IDEA

The local environment is Windows 10 + WSL2 (Ubunt...

How to install Jenkins on CentOS 8

To install Jenkins on CentOS 8, you need to use t...

Installation process of MySQL5.7.22 on Mac

1. Use the installation package to install MySQL ...

How to import txt into mysql in Linux

Preface When I was writing a small project yester...

MySQL 8.0.13 download and installation tutorial with pictures and text

MySQL is the most commonly used database. You mus...