订阅队列,接收1条消息,然后取消订阅
|
我有一个场景,我需要非常快速地分发和处理作业。我将快速在队列中填充约45个作业,并且我可以同时处理约20个(5台计算机,每个4核)。每个作业花费的时间是可变的,使垃圾收集变得复杂是一个问题,因此我需要使消费者脱机进行垃圾收集。
目前,我可以使用pop进行所有操作(每个消费者每5毫秒会弹出一次)。这似乎是不可取的,因为它将每秒600个pop请求转换为rabbitmq。
如果有一个pop命令像订阅一样,但仅发送一条消息,我很喜欢。 (该过程将阻塞,等待来自RabbitMQ连接的输入,类似于与Kernel.select相似的内容)
我试图欺骗AMQP gem做这样的事情,但是它不起作用:在队列为空并且没有更多消息发送给使用者之前,我似乎无法取消订阅。我担心其他取消订阅的方法会丢失消息。
消耗_1.rb:
require \"amqp\"
EventMachine.run do
puts \"connecting...\"
connection = AMQP.connect(:host => \"localhost\", :user => \"guest\", :pass => \"guest\", :vhost => \"/\")
puts \"Connected to AMQP broker\"
channel = AMQP::Channel.new(connection)
queue = channel.queue(\"tasks\", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts \"Received a message: #{payload}.\"
queue.unsubscribe { puts \"unbound\" }
sleep 3
end
end
Consumer_many.rb:
require \"amqp\"
# Imagine the command is something CPU - intensive like image processing.
command = \"sleep 0.1\"
EventMachine.run do
puts \"connecting...\"
connection = AMQP.connect(:host => \"localhost\", :user => \"guest\", :pass => \"guest\", :vhost => \"/\")
puts \"Connected to AMQP broker\"
channel = AMQP::Channel.new(connection)
queue = channel.queue(\"tasks\", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts \"Received a message: #{payload}.\"
end
end
producer.rb:
require \"amqp\"
i = 0
EventMachine.run do
connection = AMQP.connect(:host => \"localhost\", :user => \"guest\", :pass => \"guest\", :vhost => \"/\")
puts \"Connected to AMQP broker\"
channel = AMQP::Channel.new(connection)
queue = channel.queue(\"tasks\", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
EM.add_periodic_timer(1) do
msg = \"Message #{i}\"
i+=1
puts \"~ publishing #{msg}\"
end
end
我将启动consumpt_many.rb和producer.rb。消息将按预期方式流动。
当我启动consump_1.rb时,它会收到所有其他消息(按预期)。但是它永远不会退订,因为它永远不会完成对所有消息的处理……这样就可以了。
我如何让consumer_1.rb订阅队列,获得一条消息,然后将自己从负载平衡器环中取出,使其能够完成工作,而又不会丢失任何可能在服务器中待处理的作业。队列,否则将被安排发送到进程?
提姆
没有找到相关结果
已邀请:
2 个回复
凄挡
然后使用您的订阅块,执行以下操作:
它的作用是告诉RabbitMQ一次只发送给消费者1条消息,而不会处理另一条消息,直到您在长时间运行任务后在队列头上调用
为止。 我可能需要对此进行更正,但我相信6英镑的兑换会更适合此任务。
校勒魏寡
该方法的一个注意事项是,如果您在版本2.3.6之后运行RabbitMQ服务器,则不建议使用prefetch_size。
希望解决方案可以帮助某人。 干杯。