From 3fed0c2e4ee73760d37c166506158858bde468d7 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Thu, 15 Nov 2012 00:05:44 -0500 Subject: [PATCH] Implementing the producer side and breaking out common topology bits --- sylvilagus/go/Makefile | 3 +- .../main.go | 20 ++------ .../main.go | 46 +++++++++++++++++++ .../src/meatballhat.com/sylvilagus/hello.go | 37 +++++++++++++++ 4 files changed, 88 insertions(+), 18 deletions(-) create mode 100644 sylvilagus/go/src/meatballhat.com/sylvilagus-chapter02-hello-world-producer/main.go create mode 100644 sylvilagus/go/src/meatballhat.com/sylvilagus/hello.go diff --git a/sylvilagus/go/Makefile b/sylvilagus/go/Makefile index 3d33f75..1d32acc 100644 --- a/sylvilagus/go/Makefile +++ b/sylvilagus/go/Makefile @@ -2,7 +2,8 @@ CLEAN_GOPATH := $(shell echo $(GOPATH) | tr ":" "\n" | grep -v '^$$' | grep -v $ GOPATH := $(PWD):$(CLEAN_GOPATH) PACKAGES := \ meatballhat.com/sylvilagus-conntest \ - meatballhat.com/sylvilagus-chapter02-hello-world-consumer + meatballhat.com/sylvilagus-chapter02-hello-world-consumer \ + meatballhat.com/sylvilagus-chapter02-hello-world-producer export GOPATH diff --git a/sylvilagus/go/src/meatballhat.com/sylvilagus-chapter02-hello-world-consumer/main.go b/sylvilagus/go/src/meatballhat.com/sylvilagus-chapter02-hello-world-consumer/main.go index 315c197..e5afe2c 100644 --- a/sylvilagus/go/src/meatballhat.com/sylvilagus-chapter02-hello-world-consumer/main.go +++ b/sylvilagus/go/src/meatballhat.com/sylvilagus-chapter02-hello-world-consumer/main.go @@ -6,6 +6,7 @@ import ( import ( "github.com/streadway/amqp" + "meatballhat.com/sylvilagus" ) func main() { @@ -16,24 +17,9 @@ func main() { defer connection.Close() - channel, err := connection.Channel() + channel, err := sylvilagus.CreateHelloTopology(connection) if err != nil { - log.Fatal("Failed to get channel!: ", err) - } - - err = channel.ExchangeDeclare("hello-exchange", "direct", true, false, false, false, nil) - if err != nil { - log.Fatal("Failed to declare exchange!: ", err) - } - - _, err = channel.QueueDeclare("hello-queue", false, false, false, false, nil) - if err != nil { - log.Fatal("Failed to declare queue!: ", err) - } - - err = channel.QueueBind("hello-queue", "hola", "hello-exchange", false, nil) - if err != nil { - log.Fatal("Failed to bind to queue!: ", err) + log.Fatal("Failed to build topology!: ", err) } hellos, err := channel.Consume("hello-queue", "hello-consumer", false, false, false, false, nil) diff --git a/sylvilagus/go/src/meatballhat.com/sylvilagus-chapter02-hello-world-producer/main.go b/sylvilagus/go/src/meatballhat.com/sylvilagus-chapter02-hello-world-producer/main.go new file mode 100644 index 0000000..9672aae --- /dev/null +++ b/sylvilagus/go/src/meatballhat.com/sylvilagus-chapter02-hello-world-producer/main.go @@ -0,0 +1,46 @@ +package main + +import ( + "log" + "os" + "time" +) + +import ( + "github.com/streadway/amqp" + "meatballhat.com/sylvilagus" +) + +func main() { + if len(os.Args) < 2 { + log.Fatal("You must provide a message as first arg!") + } + + msgBody := string(os.Args[1]) + + connection, err := amqp.Dial("amqp://guest:guest@localhost:5672") + if err != nil { + log.Fatal("Failed to connect!: ", err) + } + + defer connection.Close() + + channel, err := sylvilagus.CreateHelloTopology(connection) + if err != nil { + log.Fatal("Failed to build topology!: ", err) + } + + msg := amqp.Publishing{ + DeliveryMode: amqp.Persistent, + Timestamp: time.Now(), + ContentType: "text/plain", + Body: []byte(msgBody), + } + + err = channel.Publish("hello-exchange", "hola", false, false, msg) + if err != nil { + log.Fatal("Failed to publish message!: ", err) + } else { + log.Printf("Published '%v'\n", msgBody) + } +} diff --git a/sylvilagus/go/src/meatballhat.com/sylvilagus/hello.go b/sylvilagus/go/src/meatballhat.com/sylvilagus/hello.go new file mode 100644 index 0000000..04c2f71 --- /dev/null +++ b/sylvilagus/go/src/meatballhat.com/sylvilagus/hello.go @@ -0,0 +1,37 @@ +package sylvilagus + +import ( + "log" +) + +import ( + "github.com/streadway/amqp" +) + +func CreateHelloTopology(connection *amqp.Connection) (*amqp.Channel, error) { + channel, err := connection.Channel() + if err != nil { + log.Println("Failed to get channel!: ", err) + return nil, err + } + + err = channel.ExchangeDeclare("hello-exchange", "direct", true, false, false, false, nil) + if err != nil { + log.Println("Failed to declare exchange!: ", err) + return nil, err + } + + _, err = channel.QueueDeclare("hello-queue", false, false, false, false, nil) + if err != nil { + log.Println("Failed to declare queue!: ", err) + return nil, err + } + + err = channel.QueueBind("hello-queue", "hola", "hello-exchange", false, nil) + if err != nil { + log.Println("Failed to bind to queue!: ", err) + return nil, err + } + + return channel, nil +}