From 9fd03cab2fd094b00b5638f3dd9ffd0d68cd368c Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Sun, 2 Dec 2012 08:41:05 -0500 Subject: [PATCH] Implementing rpc example in Go --- sylvilagus/go/Makefile | 3 +- .../go/sylvilagus-ch04-rpc-client/main.go | 63 +++++++++++++++++++ .../go/sylvilagus-ch04-rpc-server/main.go | 53 +++++++++++++++- sylvilagus/go/sylvilagus/rpc.go | 6 ++ 4 files changed, 122 insertions(+), 3 deletions(-) create mode 100644 sylvilagus/go/sylvilagus-ch04-rpc-client/main.go diff --git a/sylvilagus/go/Makefile b/sylvilagus/go/Makefile index f6efbf4..93fe5f8 100644 --- a/sylvilagus/go/Makefile +++ b/sylvilagus/go/Makefile @@ -3,7 +3,8 @@ PACKAGES := \ $(REPO_BASE)/sylvilagus-conntest \ $(REPO_BASE)/sylvilagus-ch02-hello-world-consumer \ $(REPO_BASE)/sylvilagus-ch02-hello-world-producer \ - $(REPO_BASE)/sylvilagus-ch04-rpc-server + $(REPO_BASE)/sylvilagus-ch04-rpc-server \ + $(REPO_BASE)/sylvilagus-ch04-rpc-client test: build go test -x -v $(PACKAGES) diff --git a/sylvilagus/go/sylvilagus-ch04-rpc-client/main.go b/sylvilagus/go/sylvilagus-ch04-rpc-client/main.go new file mode 100644 index 0000000..a1b953d --- /dev/null +++ b/sylvilagus/go/sylvilagus-ch04-rpc-client/main.go @@ -0,0 +1,63 @@ +package main + +import ( + "encoding/json" + "log" + "time" +) + +import ( + "github.com/meatballhat/box-o-sand/sylvilagus/go/sylvilagus" + "github.com/streadway/amqp" +) + +func main() { + connection, err := amqp.Dial(sylvilagus.AMQP_URI) + if err != nil { + log.Fatal("Failed to connect!:", err) + } + + defer connection.Close() + + channel, err := sylvilagus.CreateRPCTopology(connection) + if err != nil { + log.Fatal("Failed to build topology!:", err) + } + + qResult, err := channel.QueueDeclare("", false, false, true, false, nil) + if err != nil { + log.Fatal("Failed to build topology!:", err) + } + + msgBody := &sylvilagus.Ping{ + ClientName: "RPC Client 1.0", + Time: time.Now(), + } + + msgBytes, err := json.Marshal(msgBody) + if err != nil { + log.Fatal("Failed to json.Marshal the ping!:", err) + } + + msg := amqp.Publishing{ + ContentType: "application/json", + DeliveryMode: amqp.Persistent, + Timestamp: time.Now(), + Body: msgBytes, + ReplyTo: qResult.Name, + } + + channel.Publish("rpc", "ping", false, false, msg) + log.Println("Sent 'ping' RPC call:", string(msg.Body)) + log.Println("Waiting for reply...") + + pongs, err := channel.Consume(qResult.Name, qResult.Name, false, false, false, false, nil) + for pong := range pongs { + log.Println("RPC Reply ---", string(pong.Body)) + if err = channel.Close(); err != nil { + log.Fatal("Failed to close channel!:", err) + } else { + return + } + } +} diff --git a/sylvilagus/go/sylvilagus-ch04-rpc-server/main.go b/sylvilagus/go/sylvilagus-ch04-rpc-server/main.go index 4b32cb0..069d05c 100644 --- a/sylvilagus/go/sylvilagus-ch04-rpc-server/main.go +++ b/sylvilagus/go/sylvilagus-ch04-rpc-server/main.go @@ -1,14 +1,63 @@ package main import ( + "encoding/json" + "fmt" "log" + "time" ) import ( "github.com/meatballhat/box-o-sand/sylvilagus/go/sylvilagus" + "github.com/streadway/amqp" ) func main() { - log.Println("Would be using AMQP_URI =", sylvilagus.AMQP_URI) - log.Println("OH HAI RPC") + connection, err := amqp.Dial(sylvilagus.AMQP_URI) + if err != nil { + log.Fatal("Failed to connect!:", err) + } + + defer connection.Close() + + channel, err := sylvilagus.CreateRPCTopology(connection) + if err != nil { + log.Fatal("Failed to build topology!:", err) + } + + pings, err := channel.Consume("ping", "ping", false, false, false, false, nil) + if err != nil { + log.Fatal("Failed to start consuming!:", err) + } + + quit := make(chan bool) + + go func(quit chan bool) { + log.Println("Waiting for RPC calls...") + + for ping := range pings { + if err = ping.Ack(false); err == nil { + log.Println("Received API call... replying...") + + pingInst := &sylvilagus.Ping{} + err = json.Unmarshal(ping.Body, pingInst) + if err == nil { + msg := amqp.Publishing{ + DeliveryMode: amqp.Persistent, + Timestamp: time.Now(), + ContentType: "text/plain", + Body: []byte(fmt.Sprintf("Pong! %s", pingInst.Time)), + } + + channel.Publish("", ping.ReplyTo, false, false, msg) + } else { + log.Println("Failed to json.Unmarshal the ping!:", err) + } + } else { + log.Println("Failed to ACK the ping!:", err) + } + } + }(quit) + + <-quit } diff --git a/sylvilagus/go/sylvilagus/rpc.go b/sylvilagus/go/sylvilagus/rpc.go index 0b9bb64..c14f6ae 100644 --- a/sylvilagus/go/sylvilagus/rpc.go +++ b/sylvilagus/go/sylvilagus/rpc.go @@ -2,12 +2,18 @@ package sylvilagus import ( "log" + "time" ) import ( "github.com/streadway/amqp" ) +type Ping struct { + ClientName string `json:"client_name"` + Time time.Time `json:"time"` +} + func CreateRPCTopology(connection *amqp.Connection) (channel *amqp.Channel, err error) { if channel, err = connection.Channel(); err != nil { log.Println("Failed to get channel!: ", err)