Playing with streadway/amqp's examples

cat-town
Dan Buch 12 years ago
parent 19b17354e0
commit ab9db7837d

@ -1,14 +1,23 @@
CLEAN_GOPATH := $(shell echo $(GOPATH) | tr ":" "\n" | grep -v '^$$' | grep -v $(PWD) | tr "\n" ":")
GOPATH := $(PWD):$(CLEAN_GOPATH)
PACKAGES := $(foreach pkg,\ PACKAGES := $(foreach pkg,\
$(shell ls src/meatballhat.com/gotour-artifacts),\ $(shell ls src/meatballhat.com/gotour-artifacts),\
$(patsubst %,meatballhat.com/gotour-artifacts/%,$(pkg))\ $(patsubst %,meatballhat.com/gotour-artifacts/%,$(pkg))\
) )
PACKAGES += meatballhat.com/amqp-fun
all: test: build
go test $(PACKAGES)
build: deps fmt
go install $(PACKAGES) go install $(PACKAGES)
fmt: fmt:
go fmt $(PACKAGES) go fmt $(PACKAGES)
deps:
go list -f '{{range .Imports}}{{.}} {{end}}' $(PACKAGES) | xargs go get
clean: clean:
rm -v bin/* rm -v bin/*
@ -20,4 +29,4 @@ env:
@echo GOPATH=$(GOPATH) @echo GOPATH=$(GOPATH)
@echo PACKAGES=$(PACKAGES) @echo PACKAGES=$(PACKAGES)
.PHONY: all clean publish env fmt .PHONY: test build clean publish env fmt

@ -0,0 +1,193 @@
package main
import (
"log"
"os"
"os/signal"
"runtime"
"time"
)
import (
"github.com/streadway/amqp"
)
func publish() {
// Connects opens an AMQP connection from the credentials in the URL.
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("connection.open: %s", err)
}
// This waits for a server acknowledgment which means the sockets will have
// flushed all outbound publishings prior to returning. It's important to
// block on Close to not lose any publishings.
defer conn.Close()
c, err := conn.Channel()
if err != nil {
log.Fatalf("channel.open: %s", err)
}
// We declare our topology on both the publisher and consumer to ensure they
// are the same. This is part of AMQP being a programmable messaging model.
//
// See the Channel.Consume example for the complimentary declare.
err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
if err != nil {
log.Fatal("exchange.declare: %s", err)
}
// Prepare this message to be persistent. Your publishing requirements may
// be different.
msg := amqp.Publishing{
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
ContentType: "text/plain",
Body: []byte("Go Go AMQP!"),
}
// This is not a mandatory delivery, so it will be dropped if there are no
// queues bound to the logs exchange.
err = c.Publish("logs", "info", false, false, msg)
if err != nil {
// Since publish is asynchronous this can happen if the network connection
// is reset or if the server has run out of resources.
log.Fatal("basic.publish: %s", err)
}
}
func consume() {
// Connects opens an AMQP connection from the credentials in the URL.
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("connection.open: %s", err)
}
defer conn.Close()
c, err := conn.Channel()
if err != nil {
log.Fatalf("channel.open: %s", err)
}
// We declare our topology on both the publisher and consumer to ensure they
// are the same. This is part of AMQP being a programmable messaging model.
//
// See the Channel.Publish example for the complimentary declare.
err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
if err != nil {
log.Fatal("exchange.declare: %s", err)
}
// Establish our queue topologies that we are responsible for
type bind struct {
queue string
key string
}
bindings := []bind{
bind{"page", "alert"},
bind{"email", "info"},
bind{"firehose", "#"},
}
for _, b := range bindings {
_, err = c.QueueDeclare(b.queue, true, false, false, false, nil)
if err != nil {
log.Fatal("queue.declare: %s", err)
}
err = c.QueueBind(b.queue, b.key, "logs", false, nil)
if err != nil {
log.Fatal("queue.bind: %s", err)
}
}
// Set our quality of service. Since we're sharing 3 consumers on the same
// channel, we want at least 3 messages in flight.
err = c.Qos(3, 0, false)
if err != nil {
log.Fatal("basic.qos: %s", err)
}
// Establish our consumers that have different responsibilities. Our first
// two queues do not ack the messages on the server, so require to be acked
// on the client.
pages, err := c.Consume("page", "pager", false, false, false, false, nil)
if err != nil {
log.Fatal("basic.consume: %s", err)
}
go func() {
for page := range pages {
// ... this consumer is responsible for sending pages per log
log.Printf("Processing page: %+v\n", page)
page.Ack(false)
}
}()
// Notice how the concern for which messages arrive here are in the AMQP
// topology and not in the queue. We let the server pick a consumer tag this
// time.
emails, err := c.Consume("email", "", false, false, false, false, nil)
if err != nil {
log.Fatal("basic.consume: %s", err)
}
go func() {
for email := range emails {
// ... this consumer is responsible for sending emails per log
log.Printf("Processing email: %+v\n", email)
email.Ack(false)
}
}()
// This consumer requests that every message is acknowledged as soon as it's
// delivered.
firehose, err := c.Consume("firehose", "", true, false, false, false, nil)
if err != nil {
log.Fatal("basic.consume: %s", err)
}
// To show how to process the items in parallel, we'll use a work pool.
for i := 0; i < runtime.NumCPU(); i++ {
go func(work <-chan amqp.Delivery) {
for drop := range work {
// ... this consumer pulls from the firehose and doesn't need to acknowledge
log.Printf("Processing firehose drop: %+v\n", drop)
}
}(firehose)
}
// // Wait until you're ready to finish, could be a signal handler here.
// time.Sleep(10 * time.Second)
q := make(chan os.Signal)
signal.Notify(q, os.Interrupt)
log.Println("Waiting...")
<-q
log.Println("Shutting down...")
// Cancelling a consumer by name will finish the range and gracefully end the
// goroutine
err = c.Cancel("pager", false)
if err != nil {
log.Fatal("basic.cancel: %s", err)
}
// deferred closing the Connection will also finish the consumer's ranges of
// their delivery chans. If you need every delivery to be processed, make
// sure to wait for all consumers goroutines to finish before exiting your
// process.
}
func main() {
if os.Args[1] == "publish" {
publish()
} else if os.Args[1] == "consume" {
consume()
}
}
Loading…
Cancel
Save