4Manuals

  • PDF Cloud HOME

使用Rabbit的pika确认消息 Download

    如何将文本边界框与pyplot.Rectangle对齐? 导入类问题 在Python数据框中选择列时出错 如何在按住键的同时暂停VideoStream? Python OpenCV TypeError:无法处理此数据类型 使用buildozer不会下载sdl2_image SMTPSenderRefused,421,超出超时 Tensorflow多线程推理比单线程推理慢 关于python中变量的困惑。 python如何使用变量? 当使用pyodbc时,SQL Server DateTimeOffset将可识别tz的日期时间的偏移更改为系统偏移

我正在使用RabbitMQ在不同的Docker容器之间进行消息交换。在客户端,我使用pika与RabbitMQ进行通信。现在出现以下问题:

使用BlockingConnection,我在使用方中收到一条消息,该消息触发了一项非常耗时的任务。在处理此任务期间,RabbitMQ关闭了连接,因为在忙于处理任务时并未从使用方连接发送心跳。

我要解决的解决方案是,在将回调函数附加到channel.basic_consume(queue='myqueue', on_message_callback=my_callback, auto_ack=False)这样的使用者队列时,我会关闭pika中的自动确认。另外,我将耗时的处理移到了守护进程python线程中,以允许BlockingConnection发送RabbitMQ所需的心跳。

现在,一旦处理完成,我需要从线程确认RabbitMQ消息。为此,我从pika提供的delivery_tag属性中提取了method和channel_number属性中的channel。

channel_number = channel.channel_number
delivery_tag = method.delivery_tag

现在在线程中我创建了一个新的连接,因为pika不是线程安全的

conn = BlockingConnection(ConnectionParameters(host="rabbitmq"))
channel = conn.channel(channel_number=channel_number)
channel.queue_declare(queue='myqueue', durable=False)
channel.basic_ack(delivery_tag=delivery_tag)
conn.close()

确认消息。

但是由于pika会产生以下警告,因此确认不起作用:

WARNING:pika.channel:Received remote Channel.Close (406): 'PRECONDITION_FAILED - unknown delivery tag 1' on <Channel number=1 CLOSING conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f94f1dad9e8> params=<ConnectionParameters host=rabbitmq port=5672 virtual_host=/ ssl=False>>>
WARNING:pika.adapters.blocking_connection:Got ChannelClosed while closing channel from connection.close: ChannelClosedByBroker: (406) 'PRECONDITION_FAILED - unknown delivery tag 1'

Q1:是否可以确认来自连接B的消息,该消息是由连接A收到的?

Q2:知道为什么这行不通吗?

我了解了rabbitpy和AMQPStorm并立即尝试

1 个答案:

答案 0 :(得分:0)

在this示例之后,使用add_callback_threadsafe函数解决了该问题。



Similar searches
    python:捕获设备按钮单击–Powermic 我需要Excel VBA代码才能将工作簿另存为非vba Excel文件 解析错误:语法错误,意外的':',期望为')' 在熊猫中如果我们通过平均将1分钟间隔的数据重新采样为15分钟间隔,我们是否可以选择如何重新采样和分配数据 SQLite:group_concat似乎只拉第一行?