订阅队列,接收1条消息,然后取消订阅

| 我有一个场景,我需要非常快速地分发和处理作业。我将快速在队列中填充约45个作业,并且我可以同时处理约20个(5台计算机,每个4核)。每个作业花费的时间是可变的,使垃圾收集变得复杂是一个问题,因此我需要使消费者脱机进行垃圾收集。 目前,我可以使用pop进行所有操作(每个消费者每5毫秒会弹出一次)。这似乎是不可取的,因为它将每秒600个pop请求转换为rabbitmq。 如果有一个pop命令像订阅一样,但仅发送一条消息,我很喜欢。 (该过程将阻塞,等待来自RabbitMQ连接的输入,类似于与Kernel.select相似的内容) 我试图欺骗AMQP gem做这样的事情,但是它不起作用:在队列为空并且没有更多消息发送给使用者之前,我似乎无法取消订阅。我担心其他取消订阅的方法会丢失消息。 消耗_1.rb:
require \"amqp\"

EventMachine.run do
  puts \"connecting...\"
  connection = AMQP.connect(:host => \"localhost\", :user => \"guest\", :pass => \"guest\", :vhost => \"/\")
  puts \"Connected to AMQP broker\"

  channel = AMQP::Channel.new(connection)
  queue = channel.queue(\"tasks\", :auto_delete => true)
  exchange = AMQP::Exchange.default(channel)

  queue.subscribe do |payload|
    puts \"Received a message: #{payload}.\"
    queue.unsubscribe { puts \"unbound\" }
    sleep 3
  end
end
Consumer_many.rb:
require \"amqp\"

# Imagine the command is something CPU - intensive like image processing.
command = \"sleep 0.1\"

EventMachine.run do
  puts \"connecting...\"
  connection = AMQP.connect(:host => \"localhost\", :user => \"guest\", :pass => \"guest\", :vhost => \"/\")
  puts \"Connected to AMQP broker\"

  channel = AMQP::Channel.new(connection)
  queue = channel.queue(\"tasks\", :auto_delete => true)
  exchange = AMQP::Exchange.default(channel)

  queue.subscribe do |payload|
    puts \"Received a message: #{payload}.\"
  end
end
producer.rb:
require \"amqp\"

i = 0
EventMachine.run do
  connection = AMQP.connect(:host => \"localhost\", :user => \"guest\", :pass => \"guest\", :vhost => \"/\")
  puts \"Connected to AMQP broker\"

  channel = AMQP::Channel.new(connection)
  queue = channel.queue(\"tasks\", :auto_delete => true)
  exchange = AMQP::Exchange.default(channel)

  EM.add_periodic_timer(1) do
    msg = \"Message #{i}\"
    i+=1
    puts \"~ publishing #{msg}\"
  end
end
我将启动consumpt_many.rb和producer.rb。消息将按预期方式流动。 当我启动consump_1.rb时,它会收到所有其他消息(按预期)。但是它永远不会退订,因为它永远不会完成对所有消息的处理……这样就可以了。 我如何让consumer_1.rb订阅队列,获得一条消息,然后将自己从负载平衡器环中取出,使其能够完成工作,而又不会丢失任何可能在服务器中待处理的作业。队列,否则将被安排发送到进程? 提姆     
已邀请:
这是AMQP gem的一个简单但记录很少的功能,您需要的是: 在您的消费者中:
channel = AMQP::Channel.new(connection, :prefetch => 1)
然后使用您的订阅块,执行以下操作:
queue.subscribe(:ack => true) do |queue_header, payload|
  puts \"Received a message: #{payload}.\"
  # Do long running work here

  # Acknowledge message
  queue_header.ack
end
它的作用是告诉RabbitMQ一次只发送给消费者1条消息,而不会处理另一条消息,直到您在长时间运行任务后在队列头上调用
ack
为止。 我可能需要对此进行更正,但我相信6英镑的兑换会更适合此任务。     
有了我的环境 RabbitMQ版本:3.3.3 amqp gem版本:1.5.0 Ivan的解决方案仍然导致从队列中提取所有消息。 而是可以通过设置通道的QoS来限制订阅队列的未确认消息的数量。 根据AMQP :: Channel的API文档,
#qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) ⇒ Object
该方法的一个注意事项是,如果您在版本2.3.6之后运行RabbitMQ服务器,则不建议使用prefetch_size。
channel = AMQP::Channel.new(connection, :prefetch => 1)
channel.qos(0, 1)

queue = channel.queue(queue_name, :auto_delete => false)
queue.subscribe(:ack => true) do |metadata, payload|
  puts \"Received a message: #{payload}.\"

  # Do long running work here

  # Acknowledge message
  metadata.ack
end
希望解决方案可以帮助某人。 干杯。     

要回复问题请先登录注册