28 September, 2013

Google ProtoBuff + ZeroMq -- C++

In the last series of posts we demonstrated ZeroMq as a technology that supports 'sockets on steroids', supporting multiple platforms as well as multiple languages.  The examples to-date have been transmitting strings between senders and receivers.  While interesting, to effectively create a distributed heterogeneous system we need to be capable of transmitting meaningful messages, preferably complex data structures rather than just strings.  That's where Google's Protobuff comes into play: http://code.google.com/p/protobuf/

Building off our previously created Ubuntu 12.04 32-bit VM, let's start by installing the additional necessary packages;


$ sudo apt-get install libprotoc-dev

With the developer libraries installed, we can now extend our previous C++ example to transmit a ProtoBuff message.

We'll extend our Makefile to add the necessary libraries and a target (e.g. msgs) to generate the C++ files for the message.


$ cat Makefile 
CC=g++
SRCS=main.cpp Messages.pb.cc
OBJS=$(subst .cpp,.o,$(SRCS))
INCLUDES += -I.
LIBS += -lpthread -lrt -lzmq -lprotobuf
.cpp.o:
$(CC) -c $<
main: msgs ${OBJS} 
${CC} ${CFLAGS} -o $@ ${OBJS} ${LIBS}
msgs:
${SH} protoc -I. --cpp_out=. Messages.proto
clean:
${RM} ${OBJS} main *.pb.*

Oh, we should take a look at our simple Protobuff message file:

$ cat Messages.proto 
message Person {
  required int32 id=1;
  required string name=2;
}

Finally, our extended main file:

$ cat main.cpp 
#include
#include
#include
#include
#include
#include
#include "Messages.pb.h"
void* ctx=zmq_init(1);
char* EndPoint="tcp://127.0.0.1:8000";
static const int N=100;
static const int BufferSize=128;
void* sender(void*)
{
  printf("(%s:%d) running\n",__FILE__,__LINE__);
  void* pub=zmq_socket(ctx, ZMQ_PUB);
  assert(pub);
  int rc=zmq_bind(pub,EndPoint);
  assert(rc==0);
  Person p;
  p.set_name("fatslowkid");
  p.set_id(01);
  for(int i=0; i
  {
    zmq_msg_t msg;
    std::string S=p.SerializeAsString();
    char* content=(char*)S.c_str();
    int rc=zmq_msg_init_size(&msg, BufferSize);
    assert(rc==0);
    rc=zmq_msg_init_data(&msg, content, strlen(content), 0,0);
    assert(rc==0);
    rc=zmq_send(pub, &msg, 0);
    assert(rc==0);
    ::usleep(100000);
  }
}
void* receiver(void*)
{
  printf("(%s:%d) running\n",__FILE__,__LINE__);
  void* sub=zmq_socket(ctx, ZMQ_SUB);
  assert(sub);
  int rc=zmq_connect(sub,EndPoint);
  assert(rc==0);
  char* filter="";
  rc=zmq_setsockopt(sub, ZMQ_SUBSCRIBE, filter, strlen(filter));
  assert(rc==0);
  for(int i=0; i
  {
    zmq_msg_t msg;
    zmq_msg_init_size(&msg, BufferSize);
    const int rc=zmq_recv (sub, &msg, 0);
    char* content=(char*)zmq_msg_data(&msg);
    Person p;
    p.ParseFromString(content);
    printf("(%s:%d) received: '%s'\n",__FILE__,__LINE__,p.name().c_str());
    zmq_msg_close(&msg);
  }
}
int main(int argc, char* argv[])
{
  printf("(%s:%d) main process initializing\n",__FILE__,__LINE__);
  int major, minor, patch;
  zmq_version (&major, &minor, &patch);
  printf("(%s:%d) zmq version: %d.%d.%d\n",__FILE__,__LINE__,major,minor,patch);
  pthread_t rId;
  pthread_create(&rId, 0, receiver, 0);
  pthread_t sId;
  pthread_create(&sId, 0, sender, 0);
  pthread_join(rId,0);
  pthread_join(sId,0);
  printf("(%s:%d) main process terminating\n",__FILE__,__LINE__);
}

Notice that we now transmit and receive Protobuf messages, serialized as strings.  The value of this is that the serialization mechanism is multi-platform & multi-language support.

Cheers.

No comments: