Home modules.gotpike.org
Username: Password: [Create Account]
[Forgot Password?]

Modules

ADT
Database
GTK2
GUI
IP
PiJAX
Public
Sql
Stdio
Subversion
System
Tools
Xosd
lua
v4l2
wx

Recent Changes

Public.Parser.XML2 1.50
Public.ZeroMQ 1.1
Public.Template.Mustache 1.0
Public.Protocols.XMPP 1.4
Sql.Provider.jdbc 1.0

Popular Downloads

Public.Parser.JSON2 1.0
Public.Parser.JSON 0.2
GTK2 2.23
Public.Web.FCGI 1.8
Public.Parser.XML2 1.48


Module Information
Public.ZeroMQ
Viewing contents of Public_ZeroMQ-1.0/zeromq.cmod

/*! @module Public
 */

/*! @module ZeroMQ
 */

#define _GNU_SOURCE

#include "zeromq_config.h"
#include "util.h"
#include 
#define CHECK_CONTEXT() do{if((OBJ2_ZEROMQ_CONTEXT(THIS->context)==NULL)) Pike_error("ZeroMQ Context is not set. Destroyed?\n");}while(0)
#define SVALUE_WRAP(X, Y) SET_SVAL(X, T_SVALUE_PTR, 0, ptr, Y)



#if defined(ZEROMQ_DEBUG)
#define DEBUG(X...) printf(X)
#else
#define DEBUG(X...)	
#endif
DECLARATIONS

/*! @class Context
 */
 
PIKECLASS Context {

CVAR void * context;

/*! @decl void create()
 *!   Creates a new Public.ZeroMQ.Context object
 *!
 */
PIKEFUN void create()
{
  pop_n_elems(args);
}

PIKEFUN void set(int option, int value) {
  zmq_ctx_set(THIS->context, option, value);
  pop_n_elems(args);
}

PIKEFUN int get(int option) {
  int val;
  val = zmq_ctx_get(THIS->context, option);
  pop_n_elems(args);
  push_int(val);
}

// TODO create new_socket() as a convenience function.

  INIT {
    THIS->context = zmq_ctx_new();  
  }
  
  EXIT 
  {
    printf("destroy\n");
    if(THIS->context)
      zmq_ctx_term(THIS->context);
  }
}

void free_message(void *data, void *obj) {
  free(data);
}

PIKECLASS Message {

  PIKEVAR string dta;
  CVAR zmq_msg_t * msg;
  
  PIKEFUN void create(string data) {
    int retval;
    if(!data) Pike_error("ZeroMQ.Message.create(): No data provided.\n");
	
	add_ref(data);
	THIS->dta = data;
  }
  
  PIKEFUN int more() {
	if(!THIS->msg) Pike_error("ZeroMQ.Message.more(): Can only get property from received message.\n");
  	RETURN(zmq_msg_more(THIS->msg));
  }
  
  PIKEFUN string gets(string field) {
    const char * v;
	char * f;
	if(!THIS->msg) Pike_error("ZeroMQ.Message.gets(): Can only get property from received message.\n");

    f_string_to_utf8(1);
	v = zmq_msg_gets(THIS->msg, Pike_sp[-args].u.string->str);
	pop_stack();
	
	if(!v) Pike_error("Unable to fetch property.\n");
	
	push_text(v);
	f_utf8_to_string(1);
  }
  
  PIKEFUN string _sprintf(int t, mixed a) {
    push_text("Message(%O)");
	ref_push_string(THIS->dta);
	f_sprintf(2);
  }

  INIT {
  	THIS->dta = NULL;
	THIS->msg = NULL;
  }
  
  EXIT {
   if(THIS->dta) {
		free_string(THIS->dta);
		THIS->dta = NULL;
	}
   if(THIS->msg)
   {
   		zmq_msg_close(THIS->msg);
		free(THIS->msg);
		THIS->msg = NULL;
   }
  }
}


PIKECLASS Socket
{
  CVAR void * socket;
  PIKEVAR object context;
  PIKEVAR string destination;
  
  PIKEFUN void create(object context, int socket_type) {
	THIS->context = context;
    add_ref(context);
	THIS->socket = zmq_socket(OBJ2_ZEROMQ_CONTEXT(THIS->context)->context, socket_type);
	if(!THIS->socket) {
	  Pike_error("ZeroMQ.Socket: Failed to create the socket.\n");
	}
    pop_n_elems(args);
  }


// TODO handle wildcard
  PIKEFUN int connect(string endpoint) {
    int retval;
	void * socket;
    CHECK_CONTEXT();
	THIS->destination = endpoint;
	socket = THIS->socket;
	add_ref(endpoint);
    THREADS_ALLOW();
	retval = zmq_connect(socket, endpoint->str);
    THREADS_DISALLOW();
	RETURN(retval);
  }

  
// TODO handle wildcard
  PIKEFUN int disconnect(string endpoint) {
    int retval;
	void * socket;
	
    CHECK_CONTEXT();
	THIS->destination = endpoint;
	socket = THIS->socket;
	add_ref(endpoint);
    THREADS_ALLOW();
	retval = zmq_disconnect(socket, endpoint->str);
	THREADS_DISALLOW();
	RETURN(retval);
  }
  
  
// TODO handle wildcard
  PIKEFUN int bind(string endpoint) {
    int retval;
	void * socket;
	
    CHECK_CONTEXT();
	THIS->destination = endpoint;
	socket = THIS->socket;
	add_ref(endpoint);
    THREADS_ALLOW();
	retval = zmq_bind(socket, endpoint->str);
	THREADS_DISALLOW();
	RETURN(retval);
  }
  
// TODO handle wildcard
  PIKEFUN int unbind(string endpoint) {
    int retval;
	void * socket;
	
    CHECK_CONTEXT();
	THIS->destination = endpoint;
	socket = THIS->socket;
	add_ref(endpoint);
    THREADS_ALLOW();
	retval = zmq_unbind(socket, endpoint->str);
    THREADS_DISALLOW();
	RETURN(retval);
  }
  
  PIKEFUN string get_option_string(int option) {
    int rv;
	struct pike_string * str;
	size_t sz = 256;
	
	void * val = malloc(sz);
	
	if(val == NULL) Pike_error("ZeroMQ.Socket.get_option_string(): unable to allocate memory.\n");
	
	rv = zmq_getsockopt(THIS->socket, option, val, &sz);
	
	if(rv != 0) Pike_error("ZeroMQ.Socket.get_option(): unable to get socket option value.\n");
	
    str = make_shared_binary_string(val, sz);
	
	pop_stack();
	push_string(str);	
	
	free(val);
  }
  
  PIKEFUN int get_option_int(int option) {
    int rv;
	uint64_t val;
	size_t sz;

    sz = sizeof(val);
		
	rv = zmq_getsockopt(THIS->socket, option, &val, &sz);
	
	if(rv != 0) Pike_error("ZeroMQ.Socket.get_option_int(): unable to get socket option value.\n");
	
	pop_stack();
	push_int(val);	
  }
  
  PIKEFUN int set_option(int option, mixed value) {
    int rv;
     if(TYPEOF(*value) == PIKE_T_INT){
		 rv = zmq_setsockopt(THIS->socket, option, (void *)value->u.integer, sizeof(value->u.integer));
	 }
     else if(TYPEOF(*value) == PIKE_T_STRING){
		 rv = zmq_setsockopt(THIS->socket, option, (void *)value->u.string->str, value->u.string->len);	 
	 } else {
	   Pike_error("ZeroMQ.Socket.set_option(): invalid value type.\n");
	 }
     RETURN rv;
  }
  
  PIKEFUN int send(object message, int options) {
    int retval;
	char * mdata;
	zmq_msg_t msg;
	void * socket;
	
    CHECK_CONTEXT();
	mdata = malloc(OBJ2_ZEROMQ_MESSAGE(message)->dta->len);
	if(mdata == NULL) {
	  Pike_error("ZeroMQ.Message.create(): unable to allocate message data.\n");	
	}
	memcpy(mdata, OBJ2_ZEROMQ_MESSAGE(message)->dta->str, OBJ2_ZEROMQ_MESSAGE(message)->dta->len);
	retval = zmq_msg_init_data(&msg, mdata, OBJ2_ZEROMQ_MESSAGE(message)->dta->len, free_message, NULL);
	if(retval) {
	  Pike_error("ZeroMQ.Message.create(): unable to allocate message.\n");
	}

    socket = THIS->socket;
	
    THREADS_ALLOW();
	retval = zmq_sendmsg(socket, &msg, options);
    THREADS_DISALLOW();
	if(retval <= 0) {
	  printf("errno: %d, %s\n", errno, zmq_strerror(errno));
	}
	
    RETURN(retval);
  }
  
  PIKEFUN int send(array messages) {
    int retval;
	char * mdata;
	zmq_msg_t msg;
	int cnt = 0;
	int size = 0;
	void * socket;
	
    CHECK_CONTEXT();
	size = messages->size;
    socket = THIS->socket;
	
	for(cnt = 0; cnt < size; cnt++) {
	  struct object * message = messages->item[cnt].u.object;
	  mdata = malloc(OBJ2_ZEROMQ_MESSAGE(message)->dta->len);
	  if(mdata == NULL) {
	    Pike_error("ZeroMQ.Message.create(): unable to allocate message data.\n");	
	  }
	  memcpy(mdata, OBJ2_ZEROMQ_MESSAGE(message)->dta->str, OBJ2_ZEROMQ_MESSAGE(message)->dta->len);
	  retval = zmq_msg_init_data(&msg, mdata, OBJ2_ZEROMQ_MESSAGE(message)->dta->len, free_message, NULL);
	  if(retval) {
	    Pike_error("ZeroMQ.Message.create(): unable to allocate message.\n");
	  }
	  
	
//printf("sending %d, %d\n", cnt, (cnt == (size - 1))?0:ZMQ_SNDMORE);
	  retval = zmq_sendmsg(socket, &msg, (cnt == (size - 1))?0:ZMQ_SNDMORE);
	  if(retval < 0) {
	    printf("errno: %d, %s\n", errno, zmq_strerror(errno));
	    RETURN(retval);
	  }
	
	}
	
    RETURN(retval);  
  }
  
  PIKEFUN string receive(int flags) {
	struct pike_string * mdata;
	zmq_msg_t * msg;
	void * data;
	int rc;
    void * socket;
		
    CHECK_CONTEXT();
	msg = malloc(sizeof(zmq_msg_t));
	if(!msg) Pike_error("ZeroMQ.Socket.receive(): Unable to allocate message.\n");
	
	rc = zmq_msg_init (msg);
	if(rc)
	  Pike_error("ZeroMQ.Socket.receive(): Error initializing message.\n");

    socket = THIS->socket;

    THREADS_ALLOW();
    rc = zmq_msg_recv(msg, socket, flags);
    THREADS_DISALLOW();

	if(rc<0) {
	  printf("errno: %d, %s\n", errno, zmq_strerror(errno));
	  zmq_msg_close(msg);
	  free(msg);
	  Pike_error("ZeroMQ.Socket.receive(): Error receiving message.\n");
	}
	
	data = zmq_msg_data(msg);
	
	mdata = make_shared_binary_string(data, rc);
	
//	if(sizeof(data)) {
//	  printf("received %d bytes: %s\n", rc, data);
//	}
	
	zmq_msg_close(msg);
	free(msg);
	RETURN(mdata);
  }
  
  PIKEFUN object receive_message(int flags) {
	struct pike_string * mdata;
	zmq_msg_t * msg;
	struct object * message;
	void * data;
	int rc;
	void * socket;
	
    CHECK_CONTEXT();
	msg = malloc(sizeof(zmq_msg_t));
	if(!msg) Pike_error("ZeroMQ.Socket.receive(): Unable to allocate message.\n");
	
	rc = zmq_msg_init (msg);
	if(rc)
	  Pike_error("ZeroMQ.Socket.receive(): Error initializing message.\n");

	socket = THIS->socket;

    THREADS_ALLOW();
    rc = zmq_msg_recv(msg, socket, flags);
	THREADS_DISALLOW();

	if(rc<0) {
	  printf("errno: %d, %s\n", errno, zmq_strerror(errno));
	  zmq_msg_close(msg);
	  free(msg);
	  Pike_error("ZeroMQ.Socket.receive(): Error receiving message.\n");
	}
	
	data = zmq_msg_data(msg);
	
	mdata = make_shared_binary_string(data, rc);
	
	message = fast_clone_object(ZeroMQ_Message_program);
	OBJ2_ZEROMQ_MESSAGE(message)->dta = mdata;
	add_ref(mdata);
	OBJ2_ZEROMQ_MESSAGE(message)->msg = msg;
	
	RETURN(message);
  }
  

  PIKEFUN string _sprintf(int t, mixed a) {
    push_text("Socket(%O)");
	if(THIS->destination != NULL)
  	  ref_push_string(THIS->destination);
	else
	  push_text("UNCONNECTED");
	f_sprintf(2);
  }

  INIT {
    THIS->destination = NULL;
  }
  
  EXIT 
  {
    if(THIS->destination) {
	  free_string(THIS->destination);
	  THIS->destination = NULL;
	}
    if(THIS->socket) {
	  zmq_close(THIS->socket);
	  }
	if(THIS->context) {
	  free_object(THIS->context);
	}

	THIS->socket = NULL;
	THIS->context = NULL;
  }

}

PIKECLASS Poll
{
  CVAR zmq_pollitem_t * pollitems;
  CVAR struct object ** sockets;
  CVAR struct svalue * read_callbacks;
  CVAR struct svalue * write_callbacks;
  CVAR int allocation_size;
  CVAR int slots_used;
  CVAR short is_polling;
  
  INIT {
    THIS->pollitems = malloc(INITIAL_ARRAY_ALLOC * sizeof(zmq_pollitem_t));
	memset(THIS->pollitems, 0, INITIAL_ARRAY_ALLOC * sizeof(zmq_pollitem_t
	));
    THIS->sockets = malloc(INITIAL_ARRAY_ALLOC * sizeof(struct object *));
    THIS->read_callbacks = malloc(INITIAL_ARRAY_ALLOC * sizeof(struct svalue));
	memset(THIS->read_callbacks, 0, INITIAL_ARRAY_ALLOC * sizeof(struct svalue));
    THIS->write_callbacks = malloc(INITIAL_ARRAY_ALLOC * sizeof(struct svalue));
	memset(THIS->write_callbacks, 0, INITIAL_ARRAY_ALLOC * sizeof(struct svalue));
    THIS->allocation_size = INITIAL_ARRAY_ALLOC;
	THIS->slots_used = 0;
	THIS->is_polling = 0;
  }

  PIKEFUN void add_socket(object socket, function|void read_callback, function|void write_callback) {
    void * storage = get_storage(socket, ZeroMQ_Socket_program);
    void * socket_handle;
	struct svalue * sv;
	int polltype = 0;
	int slots_used = THIS->slots_used;
	
	// TODO perhaps we should use a mutex to prevent modifications to the poll list while polling.
	if(THIS->is_polling) {
	  pop_n_elems(args);
	  Pike_error("ZeroMQ.Poll.add_socket(): Unable to change polled socket list when actively polling.\n");
	}
	
	if(storage == NULL) {
	  pop_n_elems(args);
	  Pike_error("ZeroMQ.Poll.add_socket(): socket must be a ZeroMQ.Socket.\n");
	}
	
	if(read_callback == NULL && write_callback == NULL) {
	  pop_n_elems(args); 
	  Pike_error("ZeroMQ.Poll.add_socket(): either read or write callback must be provided.\n");
	}

    // if we don't have enough space for the new socket, let's resize the arrays.
    if(THIS->allocation_size < (slots_used+1)) {
	  zmq_pollitem_t * pollitems;
	  struct object ** sockets;
	  struct svalue * read_callbacks;
	  struct svalue * write_callbacks;
	  int new_allocation = THIS->allocation_size + INITIAL_ARRAY_ALLOC;
	  
	  // if realloc fails to expand the array, the existing allocation is still valid.
	  pollitems = realloc(THIS->pollitems, new_allocation * sizeof(zmq_pollitem_t));
	  if(pollitems == NULL) Pike_error("ZeroMQ.Poll.add_socket(): Unable to resize storage.\n");
	  memset(pollitems, 0, new_allocation * sizeof(zmq_pollitem_t));

	  sockets = realloc(THIS->sockets, new_allocation * sizeof(struct object *));
	  if(sockets == NULL) Pike_error("ZeroMQ.Poll.add_socket(): Unable to resize storage.\n");

	  read_callbacks = realloc(THIS->read_callbacks, new_allocation * sizeof(struct svalue));
	  if(read_callbacks == NULL) Pike_error("ZeroMQ.Poll.add_socket(): Unable to resize storage.\n");
	  memset(read_callbacks, 0, new_allocation * sizeof(struct svalue));
	  
	  write_callbacks = realloc(THIS->write_callbacks, new_allocation * sizeof(struct svalue));
	  if(write_callbacks == NULL) Pike_error("ZeroMQ.Poll.add_socket(): Unable to resize storage.\n");
	  memset(write_callbacks, 0, new_allocation * sizeof(struct svalue));

	  THIS->pollitems = pollitems;
	  THIS->sockets = sockets;
	  THIS->read_callbacks = read_callbacks;
	  THIS->write_callbacks = write_callbacks;
      THIS->allocation_size = new_allocation;
	}
	

    if(write_callback)
  	  assign_svalue(&(THIS->write_callbacks)[slots_used], write_callback);
    else 
	  memset(&(THIS->write_callbacks)[slots_used], 0, sizeof(struct svalue));
	  
	if(read_callback)
	  assign_svalue(&(THIS->read_callbacks[slots_used]), read_callback);
    else 
	  memset(&(THIS->read_callbacks)[slots_used], 0, sizeof(struct svalue));

	add_ref(socket);
	(THIS->sockets)[slots_used] = socket;
	
	if(read_callback != NULL)
	  polltype = ZMQ_POLLIN;
	if(write_callback != NULL)
      polltype |= ZMQ_POLLOUT;

	(THIS->pollitems + slots_used)->socket = OBJ2_ZEROMQ_SOCKET(socket)->socket;
	(THIS->pollitems + slots_used)->fd = 0;
	
	(THIS->pollitems + slots_used)->events = polltype;
	(THIS->pollitems + slots_used)->revents = 0;

    THIS->slots_used++;

	pop_n_elems(args);
  }
  
  void clear_item(int i) {
    if(THIS->read_callbacks + i) {
      free_svalue(&(THIS->read_callbacks[i]));
    }
    if(THIS->write_callbacks + i) {
      free_svalue(&(THIS->write_callbacks[i]));
    }
    free_object(THIS->sockets[i]);
   memset(&(THIS->pollitems[i]), 0, sizeof(zmq_pollitem_t));
  }
  
  PIKEFUN void remove_socket(object socket) {
    void * storage = get_storage(socket, ZeroMQ_Socket_program);
    void * socket_handle;
	struct svalue * sv;
	int i;
	int found_item = 0;
	//printf("remove socket\n");
	
	// TODO perhaps we should use a mutex to prevent modifications to the poll list while polling.
	if(THIS->is_polling) {
	  pop_n_elems(args);
	  Pike_error("ZeroMQ.Poll.add_socket(): Unable to change polled socket list when actively polling.\n");
	}
	
	if(storage == NULL) {
	  pop_n_elems(args);
	  Pike_error("ZeroMQ.Poll.add_socket(): socket must be a ZeroMQ.Socket.\n");
	}

    for(i = 0; i < THIS->slots_used; i++) {
	//printf("checking slot [%d]\n", i);
	  if(THIS->sockets[i] == socket) {
	    DEBUG("Found socket to remove at position %d\n", i);
		
		clear_item(i);
	
		found_item = 1;
	  } else if(found_item) {
        if(THIS->read_callbacks+i)
  	      assign_svalue(&(THIS->read_callbacks[i-1]), &(THIS->read_callbacks[i]));
        if(THIS->write_callbacks+i)
  	      assign_svalue(&(THIS->write_callbacks[i-1]), &(THIS->write_callbacks[i]));
		THIS->sockets[i-1] = THIS->sockets[i];
		memcpy(&(THIS->pollitems[i-1]), &(THIS->pollitems[i]), sizeof(zmq_pollitem_t));
		  
	    clear_item(i);
	  }
	}
	
	THIS->slots_used--;

	pop_n_elems(args);
  }

  PIKEFUN int poll(float timeout) {
    long to = (long)(timeout * 1000);
	int hits = 0;
	int loop = 0;
    int have_hit = 0;
	int res = 0;
	int slots_used;
	zmq_pollitem_t * pollitems;
		
	do {
//	printf("socket: %p\n", THIS->pollitems->socket);
    if(THIS->slots_used == 0) {
	  pop_n_elems(args);
	  Pike_error("ZeroMQ.Poll.poll(): No sockets to poll.\n");
	}
	THIS->is_polling = 1;
	pollitems = THIS->pollitems;
	slots_used = THIS->slots_used;	
    THREADS_ALLOW();
	if(loop > 0) to = 0;
	
	DEBUG("poll...");
	hits = zmq_poll(pollitems, slots_used, to);
DEBUG(" done: %d\n", hits);

	THREADS_DISALLOW();
	THIS->is_polling = 0;
	if(hits < 0) {
	  pop_n_elems(args);
	  DEBUG("zmq_poll error: %d => %s\n", errno, zmq_strerror(errno));
	  Pike_error("ZeroMQ.Poll.poll(): returned error.\n");
	}
    res = hits;
	if(hits > 0) {
	  int i = 0;
	  DEBUG("have hit\n");
	  for(i = 0; i < THIS->slots_used; i++) {
	    have_hit = 0;
	    
	    zmq_pollitem_t * pollitem = (THIS->pollitems + i);
		  
		  if(pollitem->revents & ZMQ_POLLIN) {
	  		struct pike_string * mdata;
	  		zmq_msg_t * msg;
	  		struct object * message;
	  	 	void * data;
  	  		int rc;
			int msg_count = 0;
			int more = 1;
		    void * socket = pollitem->socket;

			ref_push_object(THIS->sockets[i]);
			while(more) {
		  	DEBUG("poll returned an event on %d.\n", i);
			msg = malloc(sizeof(zmq_msg_t));
			if(!msg) Pike_error("ZeroMQ.Socket.receive(): Unable to allocate message.\n");
	
			rc = zmq_msg_init(msg);
			if(rc)
			  Pike_error("ZeroMQ.Socket.receive(): Error initializing message.\n");

//		    THREADS_ALLOW();
		    rc = zmq_msg_recv(msg, socket, 0);
//			THREADS_DISALLOW();
			DEBUG("received message\n");
			if(rc == -1) {
			  printf("errno: %p, %d, %s\n", OBJ2_ZEROMQ_SOCKET(THIS->sockets[i])->socket, errno, zmq_strerror(errno));
			  zmq_msg_close(msg);
			  free(msg);
			  pop_n_elems(args + msg_count + 1);
			  Pike_error("ZeroMQ.Socket.receive(): Error receiving message.\n");
			}
	
			data = zmq_msg_data(msg);
	
			mdata = make_shared_binary_string(data, rc);
	
			message = fast_clone_object(ZeroMQ_Message_program);
			OBJ2_ZEROMQ_MESSAGE(message)->dta = mdata;
			//add_ref(mdata);
			OBJ2_ZEROMQ_MESSAGE(message)->msg = msg;
			msg_count++;
			push_object(message);
			
			//printf("msg_count: %d\n", msg_count);
			more = (zmq_msg_get(msg, ZMQ_MORE));
			//printf("recv msg_count: %d %d\n", msg_count, more);
			}
//			printf("msg_count: %d\n", msg_count);
			DEBUG("calling callback.\n");
			safe_apply_svalue(THIS->read_callbacks + i, msg_count +1, 1);
			pop_stack();
		 
		  have_hit = 1;
		}
		
	    if(pollitem->revents & ZMQ_POLLOUT) {
		  		struct pike_string * mdata;
		  		zmq_msg_t * msg;
				struct object * message;
			   	void * data;
		  		int rc;
			    void * socket = pollitem->socket;
				ref_push_object(THIS->sockets[i]);
				safe_apply_svalue(THIS->write_callbacks + i, 1, 1);
				pop_stack();
		 		  
				have_hit = 1;
		}
		
		if(have_hit > 0) hits--;
		pollitem->revents = 0;
		
		if(hits == 0) { loop++; break; }
	  }
	  
	} else {
	DEBUG("hit break\n");
	  break;
	}
	DEBUG("outer\n");
	} while(have_hit && loop <=  1);
	DEBUG("exit\n");
	  RETURN(res);
  } 

  EXIT {
  
    int i = 0;
	
	for(i = 0; i < THIS->slots_used; i++) {
	  clear_item(i);
	}
    
	if(THIS->pollitems)
	  free(THIS->pollitems);
	  
	THIS->pollitems = NULL;

    if(THIS->sockets)
	  free(THIS->sockets);
	  
	THIS->sockets = NULL;

    if(THIS->read_callbacks)
	  free(THIS->read_callbacks);
	  
	THIS->read_callbacks = NULL;

    if(THIS->write_callbacks)
	  free(THIS->write_callbacks);
	  
	THIS->write_callbacks = NULL;
  }
}

  PIKEFUN int socket_monitor(object socket, string endpoint, int events) {
  int retval;

	  void * sm;
	  
      sm = get_storage(socket, ZeroMQ_Socket_program);
	  	  
  	  if(sm == NULL) {
  	    pop_n_elems(args);
  	    Pike_error("ZeroMQ.socket_monitor(): socket must be a ZeroMQ.Socket.\n");
  	  }
  
  	  if(endpoint == NULL || endpoint->len < 1) {
	    pop_n_elems(args);
		Pike_error("ZeroMQ.socket_monitor(): endpoint must be a non-empty string.\n");
	  }
	  
	  retval = zmq_socket_monitor(sm, endpoint->str, events);
	  
	  RETURN(retval);
  }

  PIKEFUN int proxy(object frontend, object backend) {
      int retval;
	  
	  void * fe;
	  void * be;
	  
      fe = get_storage(frontend, ZeroMQ_Socket_program);
      be = get_storage(backend, ZeroMQ_Socket_program);
	  	  
  	  if(fe == NULL) {
  	    pop_n_elems(args);
  	    Pike_error("ZeroMQ.proxy(): frontend must be a ZeroMQ.Socket.\n");
  	  }
	  
  	  if(be == NULL) {
  	    pop_n_elems(args);
  	    Pike_error("ZeroMQ.proxy(): backend must be a ZeroMQ.Socket.\n");
  	  }

	  THREADS_ALLOW();
	  retval = zmq_proxy(fe, be, NULL);
	  THREADS_DISALLOW();
	  
	  RETURN(retval);
  }
  
  PIKEFUN int proxy(object frontend, object backend, object capture) {
      int retval;
	  
	  void * fe;
	  void * be;
	  void * cp;
	  
      fe = get_storage(frontend, ZeroMQ_Socket_program);
      be = get_storage(backend, ZeroMQ_Socket_program);
      cp = get_storage(capture, ZeroMQ_Socket_program);
	  	  
  	  if(fe == NULL) {
  	    pop_n_elems(args);
  	    Pike_error("ZeroMQ.proxy(): frontend must be a ZeroMQ.Socket.\n");
  	  }
	  
  	  if(be == NULL) {
  	    pop_n_elems(args);
  	    Pike_error("ZeroMQ.proxy(): backend must be a ZeroMQ.Socket.\n");
  	  }
	  
  	  if(cp == NULL) {
  	    pop_n_elems(args);
  	    Pike_error("ZeroMQ.proxy(): backend must be a ZeroMQ.Socket.\n");
  	  }
	  

	  THREADS_ALLOW();
	  retval = zmq_proxy(fe, be, cp);
	  THREADS_DISALLOW();
	  
	  RETURN(retval);
  }
  
  PIKEFUN int proxy_steerable(object frontend, object backend, object control) {
      int retval;
	  
	  void * fe;
	  void * be;
	  void * cn;
	  
      fe = get_storage(frontend, ZeroMQ_Socket_program);
      be = get_storage(backend, ZeroMQ_Socket_program);
      cn = get_storage(control, ZeroMQ_Socket_program);
	  	  
  	  if(fe == NULL) {
  	    pop_n_elems(args);
  	    Pike_error("ZeroMQ.proxy(): frontend must be a ZeroMQ.Socket.\n");
  	  }
	  
  	  if(be == NULL) {
  	    pop_n_elems(args);
  	    Pike_error("ZeroMQ.proxy(): backend must be a ZeroMQ.Socket.\n");
  	  }

  	  if(cn == NULL) {
  	    pop_n_elems(args);
  	    Pike_error("ZeroMQ.proxy(): control must be a ZeroMQ.Socket.\n");
  	  }
	  
	  THREADS_ALLOW();
	  retval = zmq_proxy_steerable(fe, be, NULL, cn);
	  THREADS_DISALLOW();
	  
	  RETURN(retval);
  }

  
  PIKEFUN int proxy_steerable(object frontend, object backend, object capture, object control) {
      int retval;
	  
	  void * fe;
	  void * be;
	  void * cp;
	  void * cn;
	  
      fe = get_storage(frontend, ZeroMQ_Socket_program);
      be = get_storage(backend, ZeroMQ_Socket_program);
      cp = get_storage(capture, ZeroMQ_Socket_program);
      cn = get_storage(control, ZeroMQ_Socket_program);
	  	  
  	  if(fe == NULL) {
  	    pop_n_elems(args);
  	    Pike_error("ZeroMQ.proxy(): frontend must be a ZeroMQ.Socket.\n");
  	  }
	  
  	  if(be == NULL) {
  	    pop_n_elems(args);
  	    Pike_error("ZeroMQ.proxy(): backend must be a ZeroMQ.Socket.\n");
  	  }

  	  if(cp == NULL) {
  	    pop_n_elems(args);
  	    Pike_error("ZeroMQ.proxy(): capture must be a ZeroMQ.Socket.\n");
  	  }
	  
  	  if(cn == NULL) {
  	    pop_n_elems(args);
  	    Pike_error("ZeroMQ.proxy(): control must be a ZeroMQ.Socket.\n");
  	  }
	  
	  THREADS_ALLOW();
	  retval = zmq_proxy_steerable(fe, be, cp, cn);
	  THREADS_DISALLOW();
	  
	  RETURN(retval);
  }

  PIKEFUN string z85_encode(string(8bit) bare_string) {
    size_t sz;
	char * val;
	char * rv;
	struct pike_string * str;
	
	if(bare_string == NULL || bare_string->len < 1) {
	  pop_n_elems(args);
	  Pike_error("ZeroMQ.z85_encode(): bare_string must be a non-empty string.\n");
	}
	
	sz = (int)(bare_string->len * 1.25) + 2;
	
	val = malloc(sz);
	
	rv = zmq_z85_encode(val, (const uint8_t *)bare_string->str, bare_string->len);
	
	if(rv == NULL) {
	  free(val);
	  pop_n_elems(args);
	  push_int(0);
	  return;
	}
	
    str = make_shared_binary_string((const char *)val, strlen(val));
	
	free(val);
    pop_n_elems(args);
	push_string(str);
  }

  PIKEFUN string z85_decode(string(8bit) encoded_string) {
    size_t sz;
	uint8_t * val;
	uint8_t * rv;
	struct pike_string * str;
	
	if(encoded_string == NULL || encoded_string->len < 1) {
	  pop_n_elems(args);
	  Pike_error("ZeroMQ.z85_decode(): encoded_string must be a non-empty string.\n");
	}
	
	if(encoded_string->len % 5 != 0) {
  		pop_n_elems(args);	
  	  	Pike_error("ZeroMQ.z85_decode(): length of encoded_string must be divisible by 5.\n");
	}
	
	sz = (int)(encoded_string->len * 0.8);
	
	val = malloc(sz);
	
	rv = zmq_z85_decode(val, encoded_string->str);
	
	if(rv == NULL) {
	  free(val);
	  pop_n_elems(args);
	  push_int(0);
	  return;
	}
	
    str = make_shared_binary_string((const char *)val, sz);
	
	free(val);
    pop_n_elems(args);
	push_string(str);
  }


  PIKEFUN int has(string(8bit) capability) {
    int retval;
	retval = zmq_has(capability->str);
	RETURN(retval);
  }
  
  PIKEFUN int errno() {
    RETURN(zmq_errno());
  }
  
  PIKEFUN string strerror(int err) {
	int e = err;
	pop_stack();
    push_text(zmq_strerror(e));
  }

PIKEFUN string version() {
  int maj, min, patch;
  zmq_version(&maj, &min, &patch);
  
  push_text("%d.%d.%d");
  push_int(maj);
  push_int(min);
  push_int(patch);
  f_sprintf(4);
}

PIKE_MODULE_INIT {
  INIT;
  
  // socket types
#ifdef HAVE_ZMQ_CLIENT
  add_integer_constant("CLIENT", ZMQ_CLIENT, 0);
#endif
#ifdef HAVE_ZMQ_SERVER
//  add_integer_constant("SERVER", ZMQ_SERVER, 0);
#endif
#ifdef HAVE_ZMQ_RADIO
//  add_integer_constant("RADIO", ZMQ_RADIO, 0);
#endif
#ifdef HAVE_ZMQ_DISH
//  add_integer_constant("DISH", ZMQ_DISH, 0);
#endif
  add_integer_constant("PUB", ZMQ_PUB, 0);
  add_integer_constant("SUB", ZMQ_SUB, 0);
#ifdef HAVE_ZMQ_XPUB
  add_integer_constant("XPUB", ZMQ_XPUB, 0);
#endif
#ifdef HAVE_ZMQ_XSUB
  add_integer_constant("XSUB", ZMQ_XSUB, 0);
#endif
#ifdef HAVE_ZMQ_PUSH
  add_integer_constant("PUSH", ZMQ_PUSH, 0);
#endif
#ifdef HAVE_ZMQ_PULL
  add_integer_constant("PULL", ZMQ_PULL, 0);
#endif
  add_integer_constant("PAIR", ZMQ_PAIR, 0);
#ifdef HAVE_ZMQ_STREAM
  add_integer_constant("STREAM", ZMQ_STREAM, 0);
#endif
  add_integer_constant("REQ", ZMQ_REQ, 0);
  add_integer_constant("REP", ZMQ_REP, 0);
  add_integer_constant("DEALER", ZMQ_DEALER, 0);
  add_integer_constant("ROUTER", ZMQ_ROUTER, 0);
  
  add_integer_constant("SNDMORE", ZMQ_SNDMORE, 0);
  
 // set socket options
  add_integer_constant("AFFINITY", ZMQ_AFFINITY, 0);
  add_integer_constant("BACKLOG", ZMQ_BACKLOG, 0);
  add_integer_constant("CONNECT_RID", ZMQ_CONNECT_RID, 0);
  add_integer_constant("CONFLATE", ZMQ_CONFLATE, 0);
  add_integer_constant("CONNECT_TIMEOUT", ZMQ_CONNECT_TIMEOUT, 0);
  add_integer_constant("CURVE_PUBLICKEY", ZMQ_CURVE_PUBLICKEY, 0);
  add_integer_constant("CURVE_SECRETKEY", ZMQ_CURVE_SECRETKEY, 0);
  add_integer_constant("CURVE_SERVER", ZMQ_CURVE_SERVER, 0);
  add_integer_constant("CURVE_SERVERKEY", ZMQ_CURVE_SERVERKEY, 0);
  add_integer_constant("GSSAPI_PLAINTEXT", ZMQ_GSSAPI_PLAINTEXT, 0);
  add_integer_constant("GSSAPI_PRINCIPAL", ZMQ_GSSAPI_PRINCIPAL, 0);
  add_integer_constant("GSSAPI_SERVER", ZMQ_GSSAPI_SERVER, 0);
  add_integer_constant("GSSAPI_SERVICE_PRINCIPAL", ZMQ_GSSAPI_SERVICE_PRINCIPAL, 0);
  add_integer_constant("HANDSHAKE_IVL", ZMQ_HANDSHAKE_IVL, 0);
  add_integer_constant("HEARTBEAT_IVL", ZMQ_HEARTBEAT_IVL, 0);
  add_integer_constant("HEARTBEAT_TIMEOUT", ZMQ_HEARTBEAT_TIMEOUT, 0);
  add_integer_constant("HEARTBEAT_TTL", ZMQ_HEARTBEAT_TTL, 0);
  add_integer_constant("IDENTITY", ZMQ_IDENTITY, 0);
  add_integer_constant("IMMEDIATE", ZMQ_IMMEDIATE, 0);
  add_integer_constant("INVERT_MATCHING", ZMQ_INVERT_MATCHING, 0);
  add_integer_constant("IPV6", ZMQ_IPV6, 0);
  add_integer_constant("LINGER", ZMQ_LINGER, 0);
  add_integer_constant("MAXMSGSIZE", ZMQ_MAXMSGSIZE, 0);
  add_integer_constant("MULTICAST_HOPS", ZMQ_MULTICAST_HOPS, 0);
  add_integer_constant("MULTICAST_MAXTPDU", ZMQ_MULTICAST_MAXTPDU, 0);
  add_integer_constant("PLAIN_PASSWORD", ZMQ_PLAIN_PASSWORD, 0);
  add_integer_constant("PLAIN_SERVER", ZMQ_PLAIN_SERVER, 0);
  add_integer_constant("PLAIN_USERNAME", ZMQ_PLAIN_USERNAME, 0);
  add_integer_constant("USE_FD", ZMQ_USE_FD, 0);
  add_integer_constant("PROBE_ROUTER", ZMQ_PROBE_ROUTER, 0);
  add_integer_constant("RATE", ZMQ_RATE, 0);
  add_integer_constant("RCVBUF", ZMQ_RCVBUF, 0);
  add_integer_constant("RCVHWM", ZMQ_RCVHWM, 0);
  add_integer_constant("RCVTIMEO", ZMQ_RCVTIMEO, 0);
  add_integer_constant("RECONNECT_IVL", ZMQ_RECONNECT_IVL, 0);
  add_integer_constant("RECONNECT_IVL_MAX", ZMQ_RECONNECT_IVL_MAX, 0);
  add_integer_constant("RECOVERY_IVL", ZMQ_RECOVERY_IVL, 0);
  add_integer_constant("REQ_CORRELATE", ZMQ_REQ_CORRELATE, 0);
  add_integer_constant("REQ_RELAXED", ZMQ_REQ_RELAXED, 0);
  add_integer_constant("ROUTER_HANDOVER", ZMQ_ROUTER_HANDOVER, 0);
  add_integer_constant("ROUTER_MANDATORY", ZMQ_ROUTER_MANDATORY, 0);
  add_integer_constant("ROUTER_RAW", ZMQ_ROUTER_RAW, 0);
  add_integer_constant("SNDBUF", ZMQ_SNDBUF, 0);
  add_integer_constant("SNDHWM", ZMQ_SNDHWM, 0);
  add_integer_constant("SNDTIMEO", ZMQ_SNDTIMEO, 0);
  add_integer_constant("SOCKS_PROXY", ZMQ_SOCKS_PROXY, 0);
  add_integer_constant("STREAM_NOTIFY", ZMQ_STREAM_NOTIFY, 0);
  add_integer_constant("SUBSCRIBE", ZMQ_SUBSCRIBE, 0);
  add_integer_constant("TCP_KEEPALIVE", ZMQ_TCP_KEEPALIVE, 0);
  add_integer_constant("TCP_KEEPALIVE_CNT", ZMQ_TCP_KEEPALIVE_CNT, 0);
  add_integer_constant("TCP_KEEPALIVE_IDLE", ZMQ_TCP_KEEPALIVE_IDLE, 0);
  add_integer_constant("TCP_KEEPALIVE_INTVL", ZMQ_TCP_KEEPALIVE_INTVL, 0);
  add_integer_constant("TCP_MAXRT", ZMQ_TCP_MAXRT, 0);
  add_integer_constant("TOS", ZMQ_TOS, 0);
  add_integer_constant("UNSUBSCRIBE", ZMQ_UNSUBSCRIBE, 0);
  add_integer_constant("XPUB_VERBOSE", ZMQ_XPUB_VERBOSE, 0);
  add_integer_constant("XPUB_VERBOSER", ZMQ_XPUB_VERBOSER, 0);
  add_integer_constant("XPUB_MANUAL", ZMQ_XPUB_MANUAL, 0);
  add_integer_constant("XPUB_NODROP", ZMQ_XPUB_NODROP, 0);
  add_integer_constant("XPUB_WELCOME_MSG", ZMQ_XPUB_WELCOME_MSG, 0);
  add_integer_constant("ZAP_DOMAIN", ZMQ_ZAP_DOMAIN, 0);
  add_integer_constant("TCP_ACCEPT_FILTER", ZMQ_TCP_ACCEPT_FILTER, 0);
  add_integer_constant("IPC_FILTER_GID", ZMQ_IPC_FILTER_GID, 0);
  add_integer_constant("IPC_FILTER_PID", ZMQ_IPC_FILTER_PID, 0);
  add_integer_constant("IPC_FILTER_UID", ZMQ_IPC_FILTER_UID, 0);
  add_integer_constant("IPV4ONLY", ZMQ_IPV4ONLY, 0);
  add_integer_constant("VMCI_BUFFER_SIZE", ZMQ_VMCI_BUFFER_SIZE, 0);
  add_integer_constant("VMCI_BUFFER_MIN_SIZE", ZMQ_VMCI_BUFFER_MIN_SIZE, 0);
  add_integer_constant("VMCI_BUFFER_MAX_SIZE", ZMQ_VMCI_BUFFER_MAX_SIZE, 0);
  add_integer_constant("VMCI_CONNECT_TIMEOUT", ZMQ_VMCI_CONNECT_TIMEOUT, 0);
  
  // get socket options
  add_integer_constant("TYPE", ZMQ_TYPE, 0);
  
  add_integer_constant("RCVMORE", ZMQ_RCVMORE, 0);
  add_integer_constant("FD", ZMQ_FD, 0);
  add_integer_constant("EVENTS", ZMQ_EVENTS, 0);
  
  // add_integer_constant("HWM", ZMQ_HWM, 0);
  // add_integer_constant("SWAP", ZMQ_SWAP, 0);
  // add_integer_constant("RECOVERY_IVL_MSEC", ZMQ_RECOVERY_IVL_MSEC, 0);
  // add_integer_constant("MCAST_LOOP", ZMQ_MCAST_LOOP, 0);
  
  add_integer_constant("POLLIN", ZMQ_POLLIN, 0);
  add_integer_constant("POLLOUT", ZMQ_POLLOUT, 0);
  
  // socket monitor events
  add_integer_constant("EVENT_CONNECTED", ZMQ_EVENT_CONNECTED, 0);
  add_integer_constant("EVENT_CONNECT_DELAYED", ZMQ_EVENT_CONNECT_DELAYED, 0);
  add_integer_constant("EVENT_CONNECT_RETRIED", ZMQ_EVENT_CONNECT_RETRIED, 0);
  add_integer_constant("EVENT_LISTENING", ZMQ_EVENT_LISTENING, 0);
  add_integer_constant("EVENT_BIND_FAILED", ZMQ_EVENT_BIND_FAILED, 0);
  add_integer_constant("EVENT_ACCEPTED", ZMQ_EVENT_ACCEPTED, 0);
  add_integer_constant("EVENT_ACCEPT_FAILED", ZMQ_EVENT_ACCEPT_FAILED, 0);
  add_integer_constant("EVENT_CLOSED", ZMQ_EVENT_CLOSED, 0);
  add_integer_constant("EVENT_CLOSE_FAILED", ZMQ_EVENT_CLOSE_FAILED, 0);
  add_integer_constant("EVENT_DISCONNECTED", ZMQ_EVENT_DISCONNECTED, 0);
  add_integer_constant("EVENT_MONITOR_STOPPED", ZMQ_EVENT_MONITOR_STOPPED, 0);
//  add_integer_constant("EVENT_HANDSHAKE_FAILED", ZMQ_EVENT_HANDSHAKE_FAILED, 0);
//  add_integer_constant("EVENT_HANDSHAKE_SUCCEED", ZMQ_EVENT_HANDSHAKE_SUCCEED, 0);  
}

PIKE_MODULE_EXIT {
 EXIT;
}
/*! @endclass
 */

/*! @endmodule
 */

/*! @endmodule
 */




gotpike.org | Copyright © 2004 - 2019 | Pike is a trademark of Department of Computer and Information Science, Linköping University