186 lines
5.1 KiB
Go
186 lines
5.1 KiB
Go
package amqpfun
|
|
|
|
import (
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"runtime"
|
|
"time"
|
|
)
|
|
|
|
import (
|
|
"github.com/streadway/amqp"
|
|
)
|
|
|
|
func Publish() {
|
|
// Connects opens an AMQP connection from the credentials in the URL.
|
|
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
|
if err != nil {
|
|
log.Fatalf("connection.open: %s", err)
|
|
}
|
|
|
|
// This waits for a server acknowledgment which means the sockets will have
|
|
// flushed all outbound publishings prior to returning. It's important to
|
|
// block on Close to not lose any publishings.
|
|
defer conn.Close()
|
|
|
|
c, err := conn.Channel()
|
|
if err != nil {
|
|
log.Fatalf("channel.open: %s", err)
|
|
}
|
|
|
|
// We declare our topology on both the publisher and consumer to ensure they
|
|
// are the same. This is part of AMQP being a programmable messaging model.
|
|
//
|
|
// See the Channel.Consume example for the complimentary declare.
|
|
err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
|
|
if err != nil {
|
|
log.Fatal("exchange.declare: %s", err)
|
|
}
|
|
|
|
// Prepare this message to be persistent. Your publishing requirements may
|
|
// be different.
|
|
msg := amqp.Publishing{
|
|
DeliveryMode: amqp.Persistent,
|
|
Timestamp: time.Now(),
|
|
ContentType: "text/plain",
|
|
Body: []byte("Go Go AMQP!"),
|
|
}
|
|
|
|
// This is not a mandatory delivery, so it will be dropped if there are no
|
|
// queues bound to the logs exchange.
|
|
err = c.Publish("logs", "info", false, false, msg)
|
|
if err != nil {
|
|
// Since publish is asynchronous this can happen if the network connection
|
|
// is reset or if the server has run out of resources.
|
|
log.Fatal("basic.publish: %s", err)
|
|
}
|
|
}
|
|
|
|
func Consume() {
|
|
// Connects opens an AMQP connection from the credentials in the URL.
|
|
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
|
if err != nil {
|
|
log.Fatalf("connection.open: %s", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
c, err := conn.Channel()
|
|
if err != nil {
|
|
log.Fatalf("channel.open: %s", err)
|
|
}
|
|
|
|
// We declare our topology on both the publisher and consumer to ensure they
|
|
// are the same. This is part of AMQP being a programmable messaging model.
|
|
//
|
|
// See the Channel.Publish example for the complimentary declare.
|
|
err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
|
|
if err != nil {
|
|
log.Fatal("exchange.declare: %s", err)
|
|
}
|
|
|
|
// Establish our queue topologies that we are responsible for
|
|
type bind struct {
|
|
queue string
|
|
key string
|
|
}
|
|
|
|
bindings := []bind{
|
|
bind{"page", "alert"},
|
|
bind{"email", "info"},
|
|
bind{"firehose", "#"},
|
|
}
|
|
|
|
for _, b := range bindings {
|
|
_, err = c.QueueDeclare(b.queue, true, false, false, false, nil)
|
|
if err != nil {
|
|
log.Fatal("queue.declare: %s", err)
|
|
}
|
|
|
|
err = c.QueueBind(b.queue, b.key, "logs", false, nil)
|
|
if err != nil {
|
|
log.Fatal("queue.bind: %s", err)
|
|
}
|
|
}
|
|
|
|
// Set our quality of service. Since we're sharing 3 consumers on the same
|
|
// channel, we want at least 3 messages in flight.
|
|
err = c.Qos(3, 0, false)
|
|
if err != nil {
|
|
log.Fatal("basic.qos: %s", err)
|
|
}
|
|
|
|
// Establish our consumers that have different responsibilities. Our first
|
|
// two queues do not ack the messages on the server, so require to be acked
|
|
// on the client.
|
|
|
|
pages, err := c.Consume("page", "pager", false, false, false, false, nil)
|
|
if err != nil {
|
|
log.Fatal("basic.consume: %s", err)
|
|
}
|
|
|
|
go func() {
|
|
for page := range pages {
|
|
// ... this consumer is responsible for sending pages per log
|
|
log.Printf("Processing page: %+v\n", page)
|
|
page.Ack(false)
|
|
}
|
|
}()
|
|
|
|
// Notice how the concern for which messages arrive here are in the AMQP
|
|
// topology and not in the queue. We let the server pick a consumer tag this
|
|
// time.
|
|
|
|
emails, err := c.Consume("email", "", false, false, false, false, nil)
|
|
if err != nil {
|
|
log.Fatal("basic.consume: %s", err)
|
|
}
|
|
|
|
go func() {
|
|
for email := range emails {
|
|
// ... this consumer is responsible for sending emails per log
|
|
log.Printf("Processing email: %+v\n", email)
|
|
email.Ack(false)
|
|
}
|
|
}()
|
|
|
|
// This consumer requests that every message is acknowledged as soon as it's
|
|
// delivered.
|
|
|
|
firehose, err := c.Consume("firehose", "", true, false, false, false, nil)
|
|
if err != nil {
|
|
log.Fatal("basic.consume: %s", err)
|
|
}
|
|
|
|
// To show how to process the items in parallel, we'll use a work pool.
|
|
for i := 0; i < runtime.NumCPU(); i++ {
|
|
go func(work <-chan amqp.Delivery) {
|
|
for drop := range work {
|
|
// ... this consumer pulls from the firehose and doesn't need to acknowledge
|
|
log.Printf("Processing firehose drop: %+v\n", drop)
|
|
}
|
|
}(firehose)
|
|
}
|
|
|
|
// // Wait until you're ready to finish, could be a signal handler here.
|
|
// time.Sleep(10 * time.Second)
|
|
q := make(chan os.Signal)
|
|
signal.Notify(q, os.Interrupt)
|
|
|
|
log.Println("Waiting...")
|
|
<-q
|
|
log.Println("Shutting down...")
|
|
|
|
// Cancelling a consumer by name will finish the range and gracefully end the
|
|
// goroutine
|
|
err = c.Cancel("pager", false)
|
|
if err != nil {
|
|
log.Fatal("basic.cancel: %s", err)
|
|
}
|
|
|
|
// deferred closing the Connection will also finish the consumer's ranges of
|
|
// their delivery chans. If you need every delivery to be processed, make
|
|
// sure to wait for all consumers goroutines to finish before exiting your
|
|
// process.
|
|
}
|