Implementing rpc example in Go

This commit is contained in:
Dan Buch 2012-12-02 08:41:05 -05:00
parent b0553c1b8d
commit 9fd03cab2f
4 changed files with 122 additions and 3 deletions

View File

@ -3,7 +3,8 @@ PACKAGES := \
$(REPO_BASE)/sylvilagus-conntest \ $(REPO_BASE)/sylvilagus-conntest \
$(REPO_BASE)/sylvilagus-ch02-hello-world-consumer \ $(REPO_BASE)/sylvilagus-ch02-hello-world-consumer \
$(REPO_BASE)/sylvilagus-ch02-hello-world-producer \ $(REPO_BASE)/sylvilagus-ch02-hello-world-producer \
$(REPO_BASE)/sylvilagus-ch04-rpc-server $(REPO_BASE)/sylvilagus-ch04-rpc-server \
$(REPO_BASE)/sylvilagus-ch04-rpc-client
test: build test: build
go test -x -v $(PACKAGES) go test -x -v $(PACKAGES)

View File

@ -0,0 +1,63 @@
package main
import (
"encoding/json"
"log"
"time"
)
import (
"github.com/meatballhat/box-o-sand/sylvilagus/go/sylvilagus"
"github.com/streadway/amqp"
)
func main() {
connection, err := amqp.Dial(sylvilagus.AMQP_URI)
if err != nil {
log.Fatal("Failed to connect!:", err)
}
defer connection.Close()
channel, err := sylvilagus.CreateRPCTopology(connection)
if err != nil {
log.Fatal("Failed to build topology!:", err)
}
qResult, err := channel.QueueDeclare("", false, false, true, false, nil)
if err != nil {
log.Fatal("Failed to build topology!:", err)
}
msgBody := &sylvilagus.Ping{
ClientName: "RPC Client 1.0",
Time: time.Now(),
}
msgBytes, err := json.Marshal(msgBody)
if err != nil {
log.Fatal("Failed to json.Marshal the ping!:", err)
}
msg := amqp.Publishing{
ContentType: "application/json",
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
Body: msgBytes,
ReplyTo: qResult.Name,
}
channel.Publish("rpc", "ping", false, false, msg)
log.Println("Sent 'ping' RPC call:", string(msg.Body))
log.Println("Waiting for reply...")
pongs, err := channel.Consume(qResult.Name, qResult.Name, false, false, false, false, nil)
for pong := range pongs {
log.Println("RPC Reply ---", string(pong.Body))
if err = channel.Close(); err != nil {
log.Fatal("Failed to close channel!:", err)
} else {
return
}
}
}

View File

@ -1,14 +1,63 @@
package main package main
import ( import (
"encoding/json"
"fmt"
"log" "log"
"time"
) )
import ( import (
"github.com/meatballhat/box-o-sand/sylvilagus/go/sylvilagus" "github.com/meatballhat/box-o-sand/sylvilagus/go/sylvilagus"
"github.com/streadway/amqp"
) )
func main() { func main() {
log.Println("Would be using AMQP_URI =", sylvilagus.AMQP_URI) connection, err := amqp.Dial(sylvilagus.AMQP_URI)
log.Println("OH HAI RPC") if err != nil {
log.Fatal("Failed to connect!:", err)
}
defer connection.Close()
channel, err := sylvilagus.CreateRPCTopology(connection)
if err != nil {
log.Fatal("Failed to build topology!:", err)
}
pings, err := channel.Consume("ping", "ping", false, false, false, false, nil)
if err != nil {
log.Fatal("Failed to start consuming!:", err)
}
quit := make(chan bool)
go func(quit chan bool) {
log.Println("Waiting for RPC calls...")
for ping := range pings {
if err = ping.Ack(false); err == nil {
log.Println("Received API call... replying...")
pingInst := &sylvilagus.Ping{}
err = json.Unmarshal(ping.Body, pingInst)
if err == nil {
msg := amqp.Publishing{
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
ContentType: "text/plain",
Body: []byte(fmt.Sprintf("Pong! %s", pingInst.Time)),
}
channel.Publish("", ping.ReplyTo, false, false, msg)
} else {
log.Println("Failed to json.Unmarshal the ping!:", err)
}
} else {
log.Println("Failed to ACK the ping!:", err)
}
}
}(quit)
<-quit
} }

View File

@ -2,12 +2,18 @@ package sylvilagus
import ( import (
"log" "log"
"time"
) )
import ( import (
"github.com/streadway/amqp" "github.com/streadway/amqp"
) )
type Ping struct {
ClientName string `json:"client_name"`
Time time.Time `json:"time"`
}
func CreateRPCTopology(connection *amqp.Connection) (channel *amqp.Channel, err error) { func CreateRPCTopology(connection *amqp.Connection) (channel *amqp.Channel, err error) {
if channel, err = connection.Channel(); err != nil { if channel, err = connection.Channel(); err != nil {
log.Println("Failed to get channel!: ", err) log.Println("Failed to get channel!: ", err)