box-o-sand/gotime/amqpfun/workers.go

186 lines
5.1 KiB
Go
Raw Normal View History

package amqpfun
2012-11-10 18:41:02 +00:00
import (
"log"
"os"
"os/signal"
"runtime"
"time"
)
import (
"github.com/streadway/amqp"
)
func Publish() {
2012-11-10 18:41:02 +00:00
// Connects opens an AMQP connection from the credentials in the URL.
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("connection.open: %s", err)
}
// This waits for a server acknowledgment which means the sockets will have
// flushed all outbound publishings prior to returning. It's important to
// block on Close to not lose any publishings.
defer conn.Close()
c, err := conn.Channel()
if err != nil {
log.Fatalf("channel.open: %s", err)
}
// We declare our topology on both the publisher and consumer to ensure they
// are the same. This is part of AMQP being a programmable messaging model.
//
// See the Channel.Consume example for the complimentary declare.
err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
if err != nil {
log.Fatal("exchange.declare: %s", err)
}
// Prepare this message to be persistent. Your publishing requirements may
// be different.
msg := amqp.Publishing{
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
ContentType: "text/plain",
Body: []byte("Go Go AMQP!"),
}
// This is not a mandatory delivery, so it will be dropped if there are no
// queues bound to the logs exchange.
err = c.Publish("logs", "info", false, false, msg)
if err != nil {
// Since publish is asynchronous this can happen if the network connection
// is reset or if the server has run out of resources.
log.Fatal("basic.publish: %s", err)
}
}
func Consume() {
2012-11-10 18:41:02 +00:00
// Connects opens an AMQP connection from the credentials in the URL.
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("connection.open: %s", err)
}
defer conn.Close()
c, err := conn.Channel()
if err != nil {
log.Fatalf("channel.open: %s", err)
}
// We declare our topology on both the publisher and consumer to ensure they
// are the same. This is part of AMQP being a programmable messaging model.
//
// See the Channel.Publish example for the complimentary declare.
err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
if err != nil {
log.Fatal("exchange.declare: %s", err)
}
// Establish our queue topologies that we are responsible for
type bind struct {
queue string
key string
}
bindings := []bind{
bind{"page", "alert"},
bind{"email", "info"},
bind{"firehose", "#"},
}
for _, b := range bindings {
_, err = c.QueueDeclare(b.queue, true, false, false, false, nil)
if err != nil {
log.Fatal("queue.declare: %s", err)
}
err = c.QueueBind(b.queue, b.key, "logs", false, nil)
if err != nil {
log.Fatal("queue.bind: %s", err)
}
}
// Set our quality of service. Since we're sharing 3 consumers on the same
// channel, we want at least 3 messages in flight.
err = c.Qos(3, 0, false)
if err != nil {
log.Fatal("basic.qos: %s", err)
}
// Establish our consumers that have different responsibilities. Our first
// two queues do not ack the messages on the server, so require to be acked
// on the client.
pages, err := c.Consume("page", "pager", false, false, false, false, nil)
if err != nil {
log.Fatal("basic.consume: %s", err)
}
go func() {
for page := range pages {
// ... this consumer is responsible for sending pages per log
log.Printf("Processing page: %+v\n", page)
page.Ack(false)
}
}()
// Notice how the concern for which messages arrive here are in the AMQP
// topology and not in the queue. We let the server pick a consumer tag this
// time.
emails, err := c.Consume("email", "", false, false, false, false, nil)
if err != nil {
log.Fatal("basic.consume: %s", err)
}
go func() {
for email := range emails {
// ... this consumer is responsible for sending emails per log
log.Printf("Processing email: %+v\n", email)
email.Ack(false)
}
}()
// This consumer requests that every message is acknowledged as soon as it's
// delivered.
firehose, err := c.Consume("firehose", "", true, false, false, false, nil)
if err != nil {
log.Fatal("basic.consume: %s", err)
}
// To show how to process the items in parallel, we'll use a work pool.
for i := 0; i < runtime.NumCPU(); i++ {
go func(work <-chan amqp.Delivery) {
for drop := range work {
// ... this consumer pulls from the firehose and doesn't need to acknowledge
log.Printf("Processing firehose drop: %+v\n", drop)
}
}(firehose)
}
// // Wait until you're ready to finish, could be a signal handler here.
// time.Sleep(10 * time.Second)
q := make(chan os.Signal)
signal.Notify(q, os.Interrupt)
log.Println("Waiting...")
<-q
log.Println("Shutting down...")
// Cancelling a consumer by name will finish the range and gracefully end the
// goroutine
err = c.Cancel("pager", false)
if err != nil {
log.Fatal("basic.cancel: %s", err)
}
// deferred closing the Connection will also finish the consumer's ranges of
// their delivery chans. If you need every delivery to be processed, make
// sure to wait for all consumers goroutines to finish before exiting your
// process.
}