[quagga-dev 16435] [PATCH 02/23] lib: ZeroMQ read event

Philippe Guibert philippe.guibert at 6wind.com
Fri Dec 2 16:29:51 GMT 2016


From: David Lamparter <equinox at opensourcerouting.org>

This commit introduces 2 new files, and introduces the compilation flags
for compiling with zeromq and capnproto dependencies.

Signed-off-by: David Lamparter <equinox at opensourcerouting.org>
Signed-off-by: Philippe Guibert <philippe.guibert at 6wind.com>
---
 lib/Makefile.am |   7 ++-
 lib/memtypes.c  |   1 +
 lib/qzmq.c      | 157 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 lib/qzmq.h      |  55 ++++++++++++++++++++
 4 files changed, 219 insertions(+), 1 deletion(-)
 create mode 100644 lib/qzmq.c
 create mode 100644 lib/qzmq.h

diff --git a/lib/Makefile.am b/lib/Makefile.am
index be8495f140e4..7b6c34dcabb5 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -1,7 +1,7 @@
 ## Process this file with automake to produce Makefile.in.
 
 AM_CPPFLAGS = -I.. -I$(top_srcdir) -I$(top_srcdir)/lib -I$(top_builddir)/lib
-AM_CFLAGS = $(WERROR)
+AM_CFLAGS = $(WERROR) @CAPN_C_CFLAGS@ @ZEROMQ_CFLAGS@
 DEFS = @DEFS@ -DSYSCONFDIR=\"$(sysconfdir)/\"
 
 lib_LTLIBRARIES = libzebra.la
@@ -35,6 +35,11 @@ pkginclude_HEADERS = \
 noinst_HEADERS = \
 	plist_int.h
 
+if HAVE_ZEROMQ
+libzebra_la_SOURCES += qzmq.c
+pkginclude_HEADERS += qzmq.h
+endif
+
 EXTRA_DIST = \
 	regex.c regex-gnu.h \
 	queue.h \
diff --git a/lib/memtypes.c b/lib/memtypes.c
index 75f77a8e13fb..bfda5eb7b14e 100644
--- a/lib/memtypes.c
+++ b/lib/memtypes.c
@@ -74,6 +74,7 @@ struct memory_list memory_list_lib[] =
   { MTYPE_VRF_BITMAP,		"VRF bit-map"			},
   { MTYPE_IF_LINK_PARAMS,       "Informational Link Parameters" },
   { MTYPE_LIB_NEXTHOP,		"Nexthop"			},
+  { MTYPE_ZEROMQ_CB,	        "ZEROMQ_CB"	},
   { -1, NULL },
 };
 
diff --git a/lib/qzmq.c b/lib/qzmq.c
new file mode 100644
index 000000000000..ffa086d403a0
--- /dev/null
+++ b/lib/qzmq.c
@@ -0,0 +1,157 @@
+/*
+ * libzebra ZeroMQ bindings
+ * Copyright (C) 2015  David Lamparter, for NetDEF, Inc.
+ *
+ * Quagga is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Library General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Quagga is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with Quagga; see the file COPYING.LIB.  If not,
+ * write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <zebra.h>
+#include <zmq.h>
+
+#include "thread.h"
+#include "memory.h"
+#include "qzmq.h"
+#include "log.h"
+
+/* libzmq's context */
+void *qzmq_context = NULL;
+
+void qzmq_init (void)
+{
+  qzmq_context = zmq_ctx_new ();
+  zmq_ctx_set (qzmq_context, ZMQ_IPV6, 1);
+}
+
+void qzmq_finish (void)
+{
+  zmq_ctx_term (qzmq_context);
+  qzmq_context = NULL;
+}
+
+/* read callback integration */
+struct qzmq_cb {
+  struct thread *thread;
+  void *zmqsock;
+  void *arg;
+
+#if 0
+  void (*cb_buf)(void *arg, void *zmqsock, const uint8_t *buf, size_t len);
+#endif
+  void (*cb_msg)(void *arg, void *zmqsock, zmq_msg_t *msg);
+};
+
+
+static int qzmq_read_msg (struct thread *t)
+{
+  struct qzmq_cb *cb = THREAD_ARG (t);
+  zmq_msg_t msg;
+  int ret;
+  cb->thread = NULL;
+
+  while (1)
+    {
+      zmq_pollitem_t polli = { .socket = cb->zmqsock, .events = ZMQ_POLLIN };
+      ret = zmq_poll (&polli, 1, 0);
+
+      if (ret < 0)
+        goto out_err;
+      if (!(polli.revents & ZMQ_POLLIN))
+        break;
+
+      if (zmq_msg_init (&msg))
+        goto out_err;
+      ret = zmq_msg_recv (&msg, cb->zmqsock, ZMQ_NOBLOCK);
+      if (ret < 0)
+        {
+          if (errno == EAGAIN)
+            break;
+
+          zmq_msg_close (&msg);
+          goto out_err;
+        }
+      cb->cb_msg (cb->arg, cb->zmqsock, &msg);
+      zmq_msg_close (&msg);
+    }
+
+  cb->thread = funcname_thread_add_read (t->master, qzmq_read_msg, cb,
+        t->u.fd, t->funcname, t->schedfrom, t->schedfrom_line);
+  return 0;
+
+out_err:
+  zlog_err ("ZeroMQ error: %s(%d)", strerror (errno), errno);
+  return 0;
+}
+
+#if 0
+static int qzmq_read_buf (struct thread *t)
+{
+  return 0;
+}
+
+struct qzmq_cb *funcname_qzmq_thread_read_buf (
+        struct thread_master *master,
+        void (*func)(void *arg, void *zmqsock, const uint8_t *buf, size_t len),
+        void *arg, void *zmqsock, debugargdef)
+{
+  int fd;
+  size_t fd_len = sizeof (fd);
+  struct qzmq_cb *cb;
+
+  if (zmq_getsockopt (zmqsock, ZMQ_FD, &fd, &fd_len))
+    return NULL;
+
+  cb = XCALLOC (MTYPE_ZEROMQ_CB, sizeof (struct qzmq_cb));
+  if (!cb)
+    return NULL;
+
+  cb->arg = arg;
+  cb->zmqsock = zmqsock;
+  cb->cb_buf = func;
+  cb->thread = funcname_thread_add_read (master, qzmq_read_buf, cb, fd,
+                                         funcname, schedfrom, fromln);
+  return cb;
+}
+#endif
+
+struct qzmq_cb *funcname_qzmq_thread_read_msg (
+        struct thread_master *master,
+        void (*func)(void *arg, void *zmqsock, zmq_msg_t *msg),
+        void *arg, void *zmqsock, debugargdef)
+{
+  int fd;
+  size_t fd_len = sizeof (fd);
+  struct qzmq_cb *cb;
+
+  if (zmq_getsockopt (zmqsock, ZMQ_FD, &fd, &fd_len))
+    return NULL;
+
+  cb = XCALLOC (MTYPE_ZEROMQ_CB, sizeof (struct qzmq_cb));
+  if (!cb)
+    return NULL;
+
+  cb->arg = arg;
+  cb->zmqsock = zmqsock;
+  cb->cb_msg = func;
+  cb->thread = funcname_thread_add_read (master, qzmq_read_msg, cb, fd,
+                                         funcname, schedfrom, fromln);
+  return cb;
+}
+
+void qzmq_thread_cancel (struct qzmq_cb *cb)
+{
+  thread_cancel (cb->thread);
+  XFREE (MTYPE_ZEROMQ_CB, cb);
+}
diff --git a/lib/qzmq.h b/lib/qzmq.h
new file mode 100644
index 000000000000..c5ef0cb9e0e3
--- /dev/null
+++ b/lib/qzmq.h
@@ -0,0 +1,55 @@
+/*
+ * libzebra ZeroMQ bindings
+ * Copyright (C) 2015  David Lamparter, for NetDEF, Inc.
+ *
+ * Quagga is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Library General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Quagga is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with Quagga; see the file COPYING.LIB.  If not,
+ * write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef _QUAGGA_QZMQ_H
+#define _QUAGGA_QZMQ_H
+
+#include "thread.h"
+#include <zmq.h>
+
+/* libzmq's context */
+extern void *qzmq_context;
+
+extern void qzmq_init (void);
+extern void qzmq_finish (void);
+
+#define debugargdef const char *funcname, const char *schedfrom, int fromln
+
+#define qzmq_thread_read_msg(m,f,a,z) funcname_qzmq_thread_read_msg( \
+                             m,f,a,z,#f,__FILE__,__LINE__)
+
+struct qzmq_cb;
+
+extern struct qzmq_cb *funcname_qzmq_thread_read_msg (
+        struct thread_master *master,
+        void (*func)(void *arg, void *zmqsock, zmq_msg_t *msg),
+        void *arg, void *zmqsock, debugargdef);
+
+extern void qzmq_thread_cancel (struct qzmq_cb *cb);
+
+#if 0
+#define qzmq_thread_read_buf(m,f,a,z) funcname_qzmq_thread_read_buf(m,f,a,z,#f,__FILE__,__LINE__)
+extern struct qzmq_cb *funcname_qzmq_thread_read_buf (
+        struct thread_master *master,
+        void (*func)(void *arg, void *zmqsock, const uint8_t *buf, size_t len),
+        void *arg, void *zmqsock, debugargdef);
+#endif
+
+#endif
-- 
2.1.4





More information about the Quagga-dev mailing list