Problem description
Using the python pika framework, when consuming data from Rabbit MQ, I encountered an error in connection reset, with the error content as follows:
Traceback (most recent call last):
File "/app/utils/", line 27, in message_callback
channel.basic_ack(delivery_tag=method.delivery_tag)
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 2130, in basic_ack
self._flush_output()
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 1353, in _flush_output
self._connection._flush_output(lambda: self.is_closed, *waiters)
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output
raise self._closed_result.
: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/app/", line 14, in main
rabbit.start_listen()
File "/app/utils/", line 52, in start_listen
channel.start_consuming()
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 1883, in start_consuming
self._process_data_events(time_limit=None)
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 2044, in _process_data_events
.process_data_events(time_limit=time_limit)
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 851, in process_data_events
self._dispatch_channel_events()
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_events
impl_channel._get_cookie()._dispatch_events()
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 1510, in _dispatch_events
consumer_info.on_message_callback(self, ,
File "/app/utils/", line 34, in message_callback
channel.basic_nack(delivery_tag=method.delivery_tag)
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 2151, in basic_nack
self._impl.basic_nack(
File "/usr/local/lib/python3.11/site-packages/pika/", line 401, in basic_nack
self._raise_if_not_open()
File "/usr/local/lib/python3.11/site-packages/pika/", line 1403, in _raise_if_not_open
raise ('Channel is closed.')
: Channel is closed.
My code is as follows:
import pika
from import logger
import traceback
import json
from import get_rabbit_config
from import send_mail
import time
EXCHANGE = 'DEFAULT_EXCHANGE'
ROUTING_KEY = ''
QUEUE_NAME = 'PURCHASING_CONTRACT_CONSUMER'
class Rabbit():
def __init__(self, callback=None) -> None:
= callback
def message_callback(self, channel, method, properties, body):
try:
(f'receive message: {body}')
message = (body)
if :
result = (message)
# (5)
# result = False
else:
(' is None')
if result:
channel.basic_ack(delivery_tag=method.delivery_tag)
else:
channel.basic_nack(delivery_tag=method.delivery_tag)
except Exception as e:
(traceback.format_exc())
send_mail('purchasing-contract-consumer error',
traceback.format_exc())
channel.basic_nack(delivery_tag=method.delivery_tag)
(3)
def start_listen(self):
config = get_rabbit_config()
credentials = (
username=config['username'], password=config['password'])
parameters = (
# host=config['host'], port=config['port'], credentials=credentials, heartbeat=300, stack_timeout=300, socket_timeout=300, blocked_connection_timeout=300)
host=config['host'], port=config['port'], credentials=credentials, heartbeat=120, socket_timeout=60)
connection = (parameters=parameters)
channel = ()
# channel.exchange_declare(exchange=EXCHANGE,exchange_type='topic',durable=True)
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.queue_bind(queue=QUEUE_NAME, exchange=EXCHANGE,
routing_key=ROUTING_KEY)
channel.basic_consume(
queue=QUEUE_NAME, on_message_callback=self.message_callback, auto_ack=False)
('start listening...')
channel.start_consuming()
('warning...')
('warning...')
('warning...')
('warning...')
('warning...')
('warning...')
My application scenario is: consume data from rabbit, and call the web service (a web API call) through the restful API to report the consumed data. However, the web service response is very slow, and it takes more than 1 minute per call, and a large amount of messages accumulate in rabbit MQ.
Cause analysis
At first I thought it was caused by the timeout problem of rabbit MQ connection, so I set the connection timeout to 300s, and the code is as follows:
host=config['host'], port=config['port'], credentials=credentials, heartbeat=300, stack_timeout=300, socket_timeout=300, blocked_connection_timeout=300)
The connection reset problem still exists, so I suspect that the result = (message) call time was too long, more than 1 min, so I commented it out and changed it to
result = (message)
(5)
result = False
The error still exists, and it also ruled out that it is not caused by long-term blockage.
Looking at the relevant information, I found that it is because prefetch_count is not set. If this parameter is not set, the framework defaults to 0, which means unlimited consumption data. At this time, if the consumer side processes messages very slowly, and there is a large amount of message accumulation in Rabbit MQ. , the socket's cache area will be filled. At this time, the client socket will tell the server socket to set the sliding window to 0. Since the client socket's cache area is always full, the server cannot send data for a long time, and even the socket's heartbeat message cannot be issued, which will lead to a connection reset exception; if the client calls the restful API to complete the data upload, try to call channel.basic_ack(delivery_tag=method.delivery_tag), at this time, the connection has been reset, then a connection reset exception is thrown.
Solution
Add prefetch_count to the code, the code is as follows:
import pika
from import logger
import traceback
import json
from import get_rabbit_config
from import send_mail
Import time
EXCHANGE = 'DEFAULT_EXCHANGE'
ROUTING_KEY = ''
QUEUE_NAME = 'PURCHASING_CONTRACT_CONSUMER'
class Rabbit():
def __init__(self, callback=None) -> None:
= callback
def message_callback(self, channel, method, properties, body):
try:
(f'receive message: {body}')
message = (body)
if:
result = (message)
# (5)
# result = False
else:
(' is None')
If result:
channel.basic_ack(delivery_tag=method.delivery_tag)
else:
channel.basic_nack(delivery_tag=method.delivery_tag)
except Exception as e:
(traceback.format_exc())
send_mail('purchasing-contract-consumer error',
traceback.format_exc())
channel.basic_nack(delivery_tag=method.delivery_tag)
(3)
def start_listen(self):
config = get_rabbit_config()
credentials = (
username=config['username'], password=config['password'])
parameters = (
# host=config['host'], port=config['port'], credentials=credentials, heartbeat=300, stack_timeout=300, socket_timeout=300, blocked_conne ction_timeout=300)
host=config['host'], port=config['port'], credentials=credentials, heartbeat=120, socket_timeout=60)
connection = (parameters=parameters)
channel = ()
channel.basic_qos(prefetch_count=1) # Restrict data on consumer side consumption to prevent the cache area from being occupied.
# channel.exchange_declare(exchange=EXCHANGE,exchange_type='topic',durable=True)
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.queue_bind(queue=QUEUE_NAME, exchange=EXCHANGE,
routing_key=ROUTING_KEY)
channel.basic_consume(
queue=QUEUE_NAME, on_message_callback=self.message_callback, auto_ack=False)
('start listening...')
channel.start_consuming()
('warning...')
('warning...')
('warning...')
('warning...')
('warning...')
('warning...')
After the setup is completed, the data can be consumed normally and the exception message of the connection reset is exception.