diff --git a/gotime/Makefile b/gotime/Makefile index 0df5341..901be7d 100644 --- a/gotime/Makefile +++ b/gotime/Makefile @@ -1,14 +1,23 @@ +CLEAN_GOPATH := $(shell echo $(GOPATH) | tr ":" "\n" | grep -v '^$$' | grep -v $(PWD) | tr "\n" ":") +GOPATH := $(PWD):$(CLEAN_GOPATH) PACKAGES := $(foreach pkg,\ $(shell ls src/meatballhat.com/gotour-artifacts),\ $(patsubst %,meatballhat.com/gotour-artifacts/%,$(pkg))\ ) +PACKAGES += meatballhat.com/amqp-fun -all: +test: build + go test $(PACKAGES) + +build: deps fmt go install $(PACKAGES) fmt: go fmt $(PACKAGES) +deps: + go list -f '{{range .Imports}}{{.}} {{end}}' $(PACKAGES) | xargs go get + clean: rm -v bin/* @@ -20,4 +29,4 @@ env: @echo GOPATH=$(GOPATH) @echo PACKAGES=$(PACKAGES) -.PHONY: all clean publish env fmt +.PHONY: test build clean publish env fmt diff --git a/gotime/src/meatballhat.com/amqp-fun/main.go b/gotime/src/meatballhat.com/amqp-fun/main.go new file mode 100644 index 0000000..639c7d4 --- /dev/null +++ b/gotime/src/meatballhat.com/amqp-fun/main.go @@ -0,0 +1,193 @@ +package main + +import ( + "log" + "os" + "os/signal" + "runtime" + "time" +) + +import ( + "github.com/streadway/amqp" +) + +func publish() { + // Connects opens an AMQP connection from the credentials in the URL. + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + if err != nil { + log.Fatalf("connection.open: %s", err) + } + + // This waits for a server acknowledgment which means the sockets will have + // flushed all outbound publishings prior to returning. It's important to + // block on Close to not lose any publishings. + defer conn.Close() + + c, err := conn.Channel() + if err != nil { + log.Fatalf("channel.open: %s", err) + } + + // We declare our topology on both the publisher and consumer to ensure they + // are the same. This is part of AMQP being a programmable messaging model. + // + // See the Channel.Consume example for the complimentary declare. + err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil) + if err != nil { + log.Fatal("exchange.declare: %s", err) + } + + // Prepare this message to be persistent. Your publishing requirements may + // be different. + msg := amqp.Publishing{ + DeliveryMode: amqp.Persistent, + Timestamp: time.Now(), + ContentType: "text/plain", + Body: []byte("Go Go AMQP!"), + } + + // This is not a mandatory delivery, so it will be dropped if there are no + // queues bound to the logs exchange. + err = c.Publish("logs", "info", false, false, msg) + if err != nil { + // Since publish is asynchronous this can happen if the network connection + // is reset or if the server has run out of resources. + log.Fatal("basic.publish: %s", err) + } +} + +func consume() { + // Connects opens an AMQP connection from the credentials in the URL. + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + if err != nil { + log.Fatalf("connection.open: %s", err) + } + defer conn.Close() + + c, err := conn.Channel() + if err != nil { + log.Fatalf("channel.open: %s", err) + } + + // We declare our topology on both the publisher and consumer to ensure they + // are the same. This is part of AMQP being a programmable messaging model. + // + // See the Channel.Publish example for the complimentary declare. + err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil) + if err != nil { + log.Fatal("exchange.declare: %s", err) + } + + // Establish our queue topologies that we are responsible for + type bind struct { + queue string + key string + } + + bindings := []bind{ + bind{"page", "alert"}, + bind{"email", "info"}, + bind{"firehose", "#"}, + } + + for _, b := range bindings { + _, err = c.QueueDeclare(b.queue, true, false, false, false, nil) + if err != nil { + log.Fatal("queue.declare: %s", err) + } + + err = c.QueueBind(b.queue, b.key, "logs", false, nil) + if err != nil { + log.Fatal("queue.bind: %s", err) + } + } + + // Set our quality of service. Since we're sharing 3 consumers on the same + // channel, we want at least 3 messages in flight. + err = c.Qos(3, 0, false) + if err != nil { + log.Fatal("basic.qos: %s", err) + } + + // Establish our consumers that have different responsibilities. Our first + // two queues do not ack the messages on the server, so require to be acked + // on the client. + + pages, err := c.Consume("page", "pager", false, false, false, false, nil) + if err != nil { + log.Fatal("basic.consume: %s", err) + } + + go func() { + for page := range pages { + // ... this consumer is responsible for sending pages per log + log.Printf("Processing page: %+v\n", page) + page.Ack(false) + } + }() + + // Notice how the concern for which messages arrive here are in the AMQP + // topology and not in the queue. We let the server pick a consumer tag this + // time. + + emails, err := c.Consume("email", "", false, false, false, false, nil) + if err != nil { + log.Fatal("basic.consume: %s", err) + } + + go func() { + for email := range emails { + // ... this consumer is responsible for sending emails per log + log.Printf("Processing email: %+v\n", email) + email.Ack(false) + } + }() + + // This consumer requests that every message is acknowledged as soon as it's + // delivered. + + firehose, err := c.Consume("firehose", "", true, false, false, false, nil) + if err != nil { + log.Fatal("basic.consume: %s", err) + } + + // To show how to process the items in parallel, we'll use a work pool. + for i := 0; i < runtime.NumCPU(); i++ { + go func(work <-chan amqp.Delivery) { + for drop := range work { + // ... this consumer pulls from the firehose and doesn't need to acknowledge + log.Printf("Processing firehose drop: %+v\n", drop) + } + }(firehose) + } + + // // Wait until you're ready to finish, could be a signal handler here. + // time.Sleep(10 * time.Second) + q := make(chan os.Signal) + signal.Notify(q, os.Interrupt) + + log.Println("Waiting...") + <-q + log.Println("Shutting down...") + + // Cancelling a consumer by name will finish the range and gracefully end the + // goroutine + err = c.Cancel("pager", false) + if err != nil { + log.Fatal("basic.cancel: %s", err) + } + + // deferred closing the Connection will also finish the consumer's ranges of + // their delivery chans. If you need every delivery to be processed, make + // sure to wait for all consumers goroutines to finish before exiting your + // process. +} + +func main() { + if os.Args[1] == "publish" { + publish() + } else if os.Args[1] == "consume" { + consume() + } +}