Changeset: 293807560d4c for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=293807560d4c
Modified Files:
        sql/src/backends/monet5/merovingian/Makefile.ag
        sql/src/backends/monet5/merovingian/merovingian.c
        sql/src/backends/monet5/merovingian/merovingian_controlrunner.c
        sql/src/backends/monet5/merovingian/merovingian_discoveryrunner.c
        sql/src/backends/monet5/merovingian/merovingian_peering.c
        sql/src/backends/monet5/merovingian/merovingian_peering.h
Branch: default
Log Message:

Implement peering client/server support


diffs (truncated from 307 to 300 lines):

diff -r d185d074e3f4 -r 293807560d4c 
sql/src/backends/monet5/merovingian/Makefile.ag
--- a/sql/src/backends/monet5/merovingian/Makefile.ag   Mon Sep 20 14:53:10 
2010 +0200
+++ b/sql/src/backends/monet5/merovingian/Makefile.ag   Mon Sep 20 14:54:10 
2010 +0200
@@ -38,6 +38,7 @@
        glob.h \
        database.h \
        control.h \
+       merovingian_peering.h \
        $(MEROVINGIAN_SUBS)
 
 lib_meroutil = {
@@ -50,11 +51,19 @@
                control.c
 }
 
+lib_merodaemon = {
+       NOINST
+       SOURCES = \
+               merovingian.c \
+               merovingian_peering.c
+}
+
 bin_merovingian = {
        # hack: include merovingian.1.in here to get it expanded
-       SOURCES = merovingian.c merovingian.1.in
+       SOURCES = merovingian.1.in
        LIBS = \
                libmeroutil \
+               libmerodaemon \
                $(MONETDB5_LIBS) -lmonetdb5 \
                $(MONETDB_LIBS) -lbat -lstream \
                $(SOCKET_LIBS) \
diff -r d185d074e3f4 -r 293807560d4c 
sql/src/backends/monet5/merovingian/merovingian.c
--- a/sql/src/backends/monet5/merovingian/merovingian.c Mon Sep 20 14:53:10 
2010 +0200
+++ b/sql/src/backends/monet5/merovingian/merovingian.c Mon Sep 20 14:54:10 
2010 +0200
@@ -409,6 +409,8 @@
        return(ret);
 }
 
+#include "merovingian_peering.h"
+
 #include "merovingian_forkmserver.c"
 #include "merovingian_proxy.c"
 #include "merovingian_client.c"
diff -r d185d074e3f4 -r 293807560d4c 
sql/src/backends/monet5/merovingian/merovingian_controlrunner.c
--- a/sql/src/backends/monet5/merovingian/merovingian_controlrunner.c   Mon Sep 
20 14:53:10 2010 +0200
+++ b/sql/src/backends/monet5/merovingian/merovingian_controlrunner.c   Mon Sep 
20 14:54:10 2010 +0200
@@ -241,13 +241,16 @@
                                len = snprintf(buf2, sizeof(buf2), "OK\n");
                                send(msgsock, buf2, len, 0);
                        } else if (strcmp(q, "peer") == 0) {
-                               Mfprintf(_mero_ctlout, "%s: peering not yet 
implemented\n",
-                                               origin);
-                               len = snprintf(buf2, sizeof(buf2),
-                                               "peering not yet 
implemented\n");
+                               pthread_t ptid; /* FIXME: register global */
+                               len = snprintf(buf2, sizeof(buf2), "OK\n");
                                send(msgsock, buf2, len, 0);
-                               close(msgsock);
-                               continue;
+                               /* start a separate thread to handle the 
peering */
+                               if (pthread_create(&ptid, NULL,
+                                                       (void *(*)(void 
*))peeringServerThread,
+                                                       (void *)&msgsock) < 0)
+                               {
+                                       /* FAIL */
+                               }
                        } else {
                                Mfprintf(_mero_ctlout, "%s: invalid mode "
                                                "(%s)\n", origin, q);
diff -r d185d074e3f4 -r 293807560d4c 
sql/src/backends/monet5/merovingian/merovingian_discoveryrunner.c
--- a/sql/src/backends/monet5/merovingian/merovingian_discoveryrunner.c Mon Sep 
20 14:53:10 2010 +0200
+++ b/sql/src/backends/monet5/merovingian/merovingian_discoveryrunner.c Mon Sep 
20 14:54:10 2010 +0200
@@ -109,6 +109,54 @@
        return(1);
 }
 
+typedef struct _disc_message_tap {
+       int fd;
+       struct _disc_message_tap *next;
+} *disc_message_tap;
+
+/* list of hooks for incoming messages */
+static disc_message_tap _mero_disc_msg_taps = NULL;
+
+void
+registerMessageTap(int fd)
+{
+       disc_message_tap h = _mero_disc_msg_taps;
+       /* make sure we never block in the main loop below because we can't
+        * write to the pipe */
+       fcntl(fd, F_SETFD, O_NONBLOCK);
+       pthread_mutex_lock(&_mero_remotedb_lock);
+       if (h == NULL) {
+               h = GDKmalloc(sizeof(struct _disc_message_tap));
+       } else {
+               for (; h->next != NULL; h = h->next)
+                       ;
+               h = h->next = GDKmalloc(sizeof(struct _disc_message_tap));
+       }
+       h->next = NULL;
+       h->fd = fd;
+       pthread_mutex_unlock(&_mero_remotedb_lock);
+}
+
+void
+unregisterMessageTap(int fd)
+{
+       disc_message_tap h = _mero_disc_msg_taps;
+       disc_message_tap lasth;
+       pthread_mutex_lock(&_mero_remotedb_lock);
+       for (lasth = NULL; h != NULL; lasth = h, h = h->next) {
+               if (h->fd == fd) {
+                       if (lasth == NULL) {
+                               _mero_disc_msg_taps = h->next;
+                       } else {
+                               lasth->next = h->next;
+                       }
+                       GDKfree(h);
+                       break;
+               }
+       }
+       pthread_mutex_unlock(&_mero_remotedb_lock);
+}
+
 static void
 discoveryRunner(void *d)
 {
@@ -252,6 +300,13 @@
                /* ignore messages from broadcast interface */
                if (strcmp(host, "0.0.0.0") == 0)
                        continue;
+               /* forward messages not coming from ourself to all routes that
+                * are active */
+               if (strcmp(host, _mero_hostname) != 0) {
+                       disc_message_tap h = _mero_disc_msg_taps;
+                       for (; h != NULL; h = h->next)
+                               write(h->fd, buf, nread);
+               }
 
                if (strncmp(buf, "HELO ", 5) == 0) {
                        /* HELLO message, respond with current databases */
diff -r d185d074e3f4 -r 293807560d4c 
sql/src/backends/monet5/merovingian/merovingian_peering.c
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/sql/src/backends/monet5/merovingian/merovingian_peering.c Mon Sep 20 
14:54:10 2010 +0200
@@ -0,0 +1,137 @@
+/*
+ * The contents of this file are subject to the MonetDB Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://monetdb.cwi.nl/Legal/MonetDBLicense-1.1.html
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+ * License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * The Original Code is the MonetDB Database System.
+ *
+ * The Initial Developer of the Original Code is CWI.
+ * Portions created by CWI are Copyright (C) 1997-July 2008 CWI.
+ * Copyright August 2008-2010 MonetDB B.V.
+ * All Rights Reserved.
+ */
+
+#include "sql_config.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <gdk.h>
+
+extern void broadcast(char *);
+extern void registerMessageTap(int);
+extern void unregisterMessageTap(int);
+extern char *_mero_hostname;
+extern unsigned short _mero_port;
+extern char _mero_keep_listening;
+
+void
+peeringServerThread(void *d)
+{
+       int s = *(int *)d;
+       int msock;
+       ssize_t len;
+       char data[1024];
+       char *masquerade;
+       int discreader[2];
+       struct timeval tv;
+       fd_set fds;
+
+       /* start speaking the peering initialisation language, client tells
+        * what it wants, we reply
+        * ritual is as follows:
+        *   for a bi-directional tunnel over which all traffic is routed
+        *   (with masquerading of the discovery announcements such that
+        *   traffic from both networks is directed over the two border
+        *   hosts as advertised by the request and response:
+        * > tunnel host:port
+        * < tunnel myhost:myport
+        *   for one-sided proxying of traffic, where the network from the
+        *   client connects to the border host advertised in the response,
+        *   and the network from the server connects to each host from the
+        *   client's network individually (as typically in a NAT
+        *   situation):
+        * > proxy
+        * < proxy myhost:myport
+        *   for full connectable networks, were masquerading is not
+        *   necessary on any side and all hosts from the one network
+        *   directly connect any of the hosts from the other network:
+        * > direct
+        * < direct
+        * after this (on error, the server disconnects), the regular
+        * discovery protocol (HELO, ANNC, LEAV) is spoken on the line until
+        * disconnected by either party (typically a shutdown). */
+
+       masquerade = NULL;
+       len = read(s, data, sizeof(data));
+       if (len > 0 && strncmp(data, "tunnel ", 7) == 0) {
+               /* tunnel mode */
+               masquerade = GDKstrdup(data + 7);
+               snprintf(data, sizeof(data),
+                               "tunnel %s:%hu\n", _mero_hostname, _mero_port);
+               write(s, data, strlen(data));
+       } else if (len > 0 && strcmp(data, "proxy") == 0) {
+               /* proxy mode */
+               snprintf(data, sizeof(data),
+                               "proxy %s:%hu\n", _mero_hostname, _mero_port);
+               write(s, data, strlen(data));
+       } else if (len > 0 && strcmp(data, "direct") == 0) {
+               /* direct mode */
+               snprintf(data, sizeof(data), "direct\n");
+               write(s, data, strlen(data));
+       } else {
+               /* invalid, abort here */
+               snprintf(data, sizeof(data), "invalid request\n");
+               write(s, data, strlen(data));
+               close(s);
+               return;
+       }
+
+       if (pipe(discreader) == -1) {
+               /* bla error */
+               close(s);
+               return;
+       }
+       registerMessageTap(discreader[0]);
+
+       /* now just forward and inject announce messages, doing the
+        * masquerading if necessary */
+       while (_mero_keep_listening == 1) {
+               FD_ZERO(&fds);
+               FD_SET(s, &fds);
+               FD_SET(discreader[1], &fds);
+               msock = s > discreader[1] ? s : discreader[1];
+               /* wait up to 5 seconds. */
+               tv.tv_sec = 5;
+               tv.tv_usec = 0;
+               len = select(msock + 1, &fds, NULL, NULL, &tv);
+               /* nothing interesting has happened */
+               if (len == 0)
+                       continue;
+               if (FD_ISSET(s, &fds)) {
+                       /* from client, forward to our network */
+                       if ((len = read(s, data, sizeof(data) - 1)) == -1)
+                               break;
+                       data[len] = '\0';
+                       /* FIXME: simple form, no masquerading */
+                       broadcast(data);
+               } else if (FD_ISSET(discreader[1], &fds)) {
+                       /* from our network, forward to client */
+                       len = read(discreader[1], data, sizeof(data));
+                       /* FIXME: simple form, no masquerading */
+                       if (write(s, data, len) == -1)
+                               break;
+               }
+       }
+
+       unregisterMessageTap(discreader[0]);
+       close(discreader[0]);
+       close(discreader[1]);
+       close(s);
+}
diff -r d185d074e3f4 -r 293807560d4c 
sql/src/backends/monet5/merovingian/merovingian_peering.h
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/sql/src/backends/monet5/merovingian/merovingian_peering.h Mon Sep 20 
14:54:10 2010 +0200
@@ -0,0 +1,20 @@
+/*
+ * The contents of this file are subject to the MonetDB Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://monetdb.cwi.nl/Legal/MonetDBLicense-1.1.html
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+ * License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * The Original Code is the MonetDB Database System.
+ *
_______________________________________________
Checkin-list mailing list
Checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to