package amqpfun import ( "log" "os" "os/signal" "runtime" "time" ) import ( "github.com/streadway/amqp" ) func Publish() { // Connects opens an AMQP connection from the credentials in the URL. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("connection.open: %s", err) } // This waits for a server acknowledgment which means the sockets will have // flushed all outbound publishings prior to returning. It's important to // block on Close to not lose any publishings. defer conn.Close() c, err := conn.Channel() if err != nil { log.Fatalf("channel.open: %s", err) } // We declare our topology on both the publisher and consumer to ensure they // are the same. This is part of AMQP being a programmable messaging model. // // See the Channel.Consume example for the complimentary declare. err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil) if err != nil { log.Fatal("exchange.declare: %s", err) } // Prepare this message to be persistent. Your publishing requirements may // be different. msg := amqp.Publishing{ DeliveryMode: amqp.Persistent, Timestamp: time.Now(), ContentType: "text/plain", Body: []byte("Go Go AMQP!"), } // This is not a mandatory delivery, so it will be dropped if there are no // queues bound to the logs exchange. err = c.Publish("logs", "info", false, false, msg) if err != nil { // Since publish is asynchronous this can happen if the network connection // is reset or if the server has run out of resources. log.Fatal("basic.publish: %s", err) } } func Consume() { // Connects opens an AMQP connection from the credentials in the URL. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("connection.open: %s", err) } defer conn.Close() c, err := conn.Channel() if err != nil { log.Fatalf("channel.open: %s", err) } // We declare our topology on both the publisher and consumer to ensure they // are the same. This is part of AMQP being a programmable messaging model. // // See the Channel.Publish example for the complimentary declare. err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil) if err != nil { log.Fatal("exchange.declare: %s", err) } // Establish our queue topologies that we are responsible for type bind struct { queue string key string } bindings := []bind{ bind{"page", "alert"}, bind{"email", "info"}, bind{"firehose", "#"}, } for _, b := range bindings { _, err = c.QueueDeclare(b.queue, true, false, false, false, nil) if err != nil { log.Fatal("queue.declare: %s", err) } err = c.QueueBind(b.queue, b.key, "logs", false, nil) if err != nil { log.Fatal("queue.bind: %s", err) } } // Set our quality of service. Since we're sharing 3 consumers on the same // channel, we want at least 3 messages in flight. err = c.Qos(3, 0, false) if err != nil { log.Fatal("basic.qos: %s", err) } // Establish our consumers that have different responsibilities. Our first // two queues do not ack the messages on the server, so require to be acked // on the client. pages, err := c.Consume("page", "pager", false, false, false, false, nil) if err != nil { log.Fatal("basic.consume: %s", err) } go func() { for page := range pages { // ... this consumer is responsible for sending pages per log log.Printf("Processing page: %+v\n", page) page.Ack(false) } }() // Notice how the concern for which messages arrive here are in the AMQP // topology and not in the queue. We let the server pick a consumer tag this // time. emails, err := c.Consume("email", "", false, false, false, false, nil) if err != nil { log.Fatal("basic.consume: %s", err) } go func() { for email := range emails { // ... this consumer is responsible for sending emails per log log.Printf("Processing email: %+v\n", email) email.Ack(false) } }() // This consumer requests that every message is acknowledged as soon as it's // delivered. firehose, err := c.Consume("firehose", "", true, false, false, false, nil) if err != nil { log.Fatal("basic.consume: %s", err) } // To show how to process the items in parallel, we'll use a work pool. for i := 0; i < runtime.NumCPU(); i++ { go func(work <-chan amqp.Delivery) { for drop := range work { // ... this consumer pulls from the firehose and doesn't need to acknowledge log.Printf("Processing firehose drop: %+v\n", drop) } }(firehose) } // // Wait until you're ready to finish, could be a signal handler here. // time.Sleep(10 * time.Second) q := make(chan os.Signal) signal.Notify(q, os.Interrupt) log.Println("Waiting...") <-q log.Println("Shutting down...") // Cancelling a consumer by name will finish the range and gracefully end the // goroutine err = c.Cancel("pager", false) if err != nil { log.Fatal("basic.cancel: %s", err) } // deferred closing the Connection will also finish the consumer's ranges of // their delivery chans. If you need every delivery to be processed, make // sure to wait for all consumers goroutines to finish before exiting your // process. }