A bit of cleanup, hopefully shutting down consumer in a relatively sane way

This commit is contained in:
Dan Buch 2012-11-17 08:55:37 -05:00
parent c9c964d7c4
commit 74bd6aeaf0
3 changed files with 27 additions and 12 deletions

View File

@ -6,15 +6,16 @@ import com.rabbitmq.client.ConnectionFactory
module Sylvilagus::Ch02::HelloWorld module Sylvilagus::Ch02::HelloWorld
module ClassMethods module ClassMethods
def with_hello_world_conn(&block) def with_hello_world_channel(&block)
factory = ConnectionFactory.new factory = ConnectionFactory.new
factory.uri = ENV.fetch('SYLVILAGUS_AMQP_URI') factory.uri = ENV.fetch('SYLVILAGUS_AMQP_URI')
conn = factory.new_connection conn = factory.new_connection
channel = conn.create_channel channel = conn.create_channel
channel.exchange_declare('hello-exchange', 'direct', true) channel.exchange_declare('hello-exchange', 'direct', true)
channel.queue_declare('hello-queue', false, false, false, nil) channel.queue_declare('hello-queue', false, false, false, nil)
block.call(conn, channel) block.call(channel)
ensure ensure
conn.close conn.close
end end

View File

@ -8,21 +8,34 @@ class Sylvilagus::Ch02::HelloWorldConsumer < DefaultConsumer
include Sylvilagus::Ch02::HelloWorld::ClassMethods include Sylvilagus::Ch02::HelloWorld::ClassMethods
def main def main
with_hello_world_conn do |conn,channel| with_hello_world_channel do |channel|
channel.basic_consume('hello-queue', false, new(channel)) consumer = new(channel)
STDERR.puts 'Starting consume loop...'
channel.basic_consume('hello-queue', false, consumer)
loop do loop do
sleep 3 sleep 1
puts "Still waiting..." break if consumer.done?
end end
end end
end end
end end
def done?
@done ||= false
end
def handleDelivery(consumer_tag, envelope, properties, body) def handleDelivery(consumer_tag, envelope, properties, body)
delivery_tag = envelope.get_delivery_tag
body_string = RubyString.bytes_to_string(body) body_string = RubyString.bytes_to_string(body)
puts "Consumed #{body_string.inspect}" puts "Consumed #{body_string.inspect}"
get_channel.basic_ack(delivery_tag, false) channel.basic_ack(envelope.delivery_tag, false)
if body_string == 'quit'
STDERR.puts 'Quitting...'
@done = true
end
end end
end end

View File

@ -4,9 +4,10 @@ class Sylvilagus::Ch02::HelloWorldProducer
class << self class << self
include Sylvilagus::Ch02::HelloWorld::ClassMethods include Sylvilagus::Ch02::HelloWorld::ClassMethods
def main def main(args = ARGV.clone)
message = ARGV.first || 'snorg' raise 'Missing message arg!' if args.empty?
with_hello_world_conn do |conn,channel| message = args.fetch(0)
with_hello_world_channel do |channel|
channel.basic_publish('hello-exchange', 'hola', nil, message.to_java_bytes) channel.basic_publish('hello-exchange', 'hola', nil, message.to_java_bytes)
puts "Published #{message.inspect}" puts "Published #{message.inspect}"
end end
@ -15,5 +16,5 @@ class Sylvilagus::Ch02::HelloWorldProducer
end end
if $0 == __FILE__ if $0 == __FILE__
Sylvilagus::Ch02::HelloWorldProducer.main Sylvilagus::Ch02::HelloWorldProducer.main(ARGV)
end end