From 1a278e5ae381a292da4d82497ae4221904f43863 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Sun, 18 Mar 2012 17:13:17 -0400 Subject: [PATCH] Filling in taskwork and tasksink parts of PUSH/PULL example --- zeromq/.gitignore | 2 ++ zeromq/Makefile | 2 +- zeromq/tasksink.c | 32 ++++++++++++++++++++++++++++++++ zeromq/taskwork.c | 27 +++++++++++++++++++++++++++ 4 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 zeromq/tasksink.c create mode 100644 zeromq/taskwork.c diff --git a/zeromq/.gitignore b/zeromq/.gitignore index 9a403d2..e36b962 100644 --- a/zeromq/.gitignore +++ b/zeromq/.gitignore @@ -3,3 +3,5 @@ hwclient wuserver wuclient taskvent +taskwork +tasksink diff --git a/zeromq/Makefile b/zeromq/Makefile index 92246e3..8ac202d 100644 --- a/zeromq/Makefile +++ b/zeromq/Makefile @@ -1,7 +1,7 @@ CFLAGS += -I. -I/usr/local/include LDFLAGS += -lstdc++ -lpthread -luuid -lrt LIBZMQ := /usr/local/lib/libzmq.a -TARGETS := hwserver hwclient wuserver wuclient taskvent +TARGETS := hwserver hwclient wuserver wuclient taskvent taskwork tasksink %:%.c diff --git a/zeromq/tasksink.c b/zeromq/tasksink.c new file mode 100644 index 0000000..4c26bea --- /dev/null +++ b/zeromq/tasksink.c @@ -0,0 +1,32 @@ +#include "zhelpers.h" + +int main(void) +{ + void *context = zmq_init(1); + void *receiver = zmq_socket(context, ZMQ_PULL); + zmq_bind(receiver, "tcp://*:5558"); + + char *string = s_recv(receiver); + free(string); + + int64_t start_time = s_clock(); + + int task_nbr; + for (task_nbr = 0; task_nbr < 100; task_nbr++) { + char *string = s_recv(receiver); + free(string); + if ((task_nbr / 10) * 10 == task_nbr) { + printf(":"); + } else { + printf("."); + } + fflush(stdout); + } + + printf("Total elapsed time: %d msec\n", + (int) (s_clock() - start_time)); + + zmq_close(receiver); + zmq_term(context); + return 0; +} diff --git a/zeromq/taskwork.c b/zeromq/taskwork.c new file mode 100644 index 0000000..547691b --- /dev/null +++ b/zeromq/taskwork.c @@ -0,0 +1,27 @@ +#include "zhelpers.h" + +int main(void) +{ + void *context = zmq_init(1); + + void *receiver = zmq_socket(context, ZMQ_PULL); + zmq_connect(receiver, "tcp://localhost:5557"); + + void *sender = zmq_socket(context, ZMQ_PUSH); + zmq_connect(sender, "tcp://localhost:5558"); + + while (1) { + char *string = s_recv(receiver); + fflush(stdout); + printf("%s.", string); + + s_sleep(atoi(string)); + free(string); + + s_send(sender, ""); + } + zmq_close(receiver); + zmq_close(sender); + zmq_term(context); + return 0; +}