Implementing the producer side and breaking out common topology bits
This commit is contained in:
parent
46933f0611
commit
3fed0c2e4e
@ -2,7 +2,8 @@ CLEAN_GOPATH := $(shell echo $(GOPATH) | tr ":" "\n" | grep -v '^$$' | grep -v $
|
|||||||
GOPATH := $(PWD):$(CLEAN_GOPATH)
|
GOPATH := $(PWD):$(CLEAN_GOPATH)
|
||||||
PACKAGES := \
|
PACKAGES := \
|
||||||
meatballhat.com/sylvilagus-conntest \
|
meatballhat.com/sylvilagus-conntest \
|
||||||
meatballhat.com/sylvilagus-chapter02-hello-world-consumer
|
meatballhat.com/sylvilagus-chapter02-hello-world-consumer \
|
||||||
|
meatballhat.com/sylvilagus-chapter02-hello-world-producer
|
||||||
|
|
||||||
export GOPATH
|
export GOPATH
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/streadway/amqp"
|
"github.com/streadway/amqp"
|
||||||
|
"meatballhat.com/sylvilagus"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@ -16,24 +17,9 @@ func main() {
|
|||||||
|
|
||||||
defer connection.Close()
|
defer connection.Close()
|
||||||
|
|
||||||
channel, err := connection.Channel()
|
channel, err := sylvilagus.CreateHelloTopology(connection)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Failed to get channel!: ", err)
|
log.Fatal("Failed to build topology!: ", err)
|
||||||
}
|
|
||||||
|
|
||||||
err = channel.ExchangeDeclare("hello-exchange", "direct", true, false, false, false, nil)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal("Failed to declare exchange!: ", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = channel.QueueDeclare("hello-queue", false, false, false, false, nil)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal("Failed to declare queue!: ", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = channel.QueueBind("hello-queue", "hola", "hello-exchange", false, nil)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal("Failed to bind to queue!: ", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
hellos, err := channel.Consume("hello-queue", "hello-consumer", false, false, false, false, nil)
|
hellos, err := channel.Consume("hello-queue", "hello-consumer", false, false, false, false, nil)
|
||||||
|
@ -0,0 +1,46 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/streadway/amqp"
|
||||||
|
"meatballhat.com/sylvilagus"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
if len(os.Args) < 2 {
|
||||||
|
log.Fatal("You must provide a message as first arg!")
|
||||||
|
}
|
||||||
|
|
||||||
|
msgBody := string(os.Args[1])
|
||||||
|
|
||||||
|
connection, err := amqp.Dial("amqp://guest:guest@localhost:5672")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Failed to connect!: ", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer connection.Close()
|
||||||
|
|
||||||
|
channel, err := sylvilagus.CreateHelloTopology(connection)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Failed to build topology!: ", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := amqp.Publishing{
|
||||||
|
DeliveryMode: amqp.Persistent,
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
ContentType: "text/plain",
|
||||||
|
Body: []byte(msgBody),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = channel.Publish("hello-exchange", "hola", false, false, msg)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Failed to publish message!: ", err)
|
||||||
|
} else {
|
||||||
|
log.Printf("Published '%v'\n", msgBody)
|
||||||
|
}
|
||||||
|
}
|
37
sylvilagus/go/src/meatballhat.com/sylvilagus/hello.go
Normal file
37
sylvilagus/go/src/meatballhat.com/sylvilagus/hello.go
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
package sylvilagus
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
)
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/streadway/amqp"
|
||||||
|
)
|
||||||
|
|
||||||
|
func CreateHelloTopology(connection *amqp.Connection) (*amqp.Channel, error) {
|
||||||
|
channel, err := connection.Channel()
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Failed to get channel!: ", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = channel.ExchangeDeclare("hello-exchange", "direct", true, false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Failed to declare exchange!: ", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = channel.QueueDeclare("hello-queue", false, false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Failed to declare queue!: ", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = channel.QueueBind("hello-queue", "hola", "hello-exchange", false, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Failed to bind to queue!: ", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return channel, nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user