Changeset: 293807560d4c for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=293807560d4c Modified Files: sql/src/backends/monet5/merovingian/Makefile.ag sql/src/backends/monet5/merovingian/merovingian.c sql/src/backends/monet5/merovingian/merovingian_controlrunner.c sql/src/backends/monet5/merovingian/merovingian_discoveryrunner.c sql/src/backends/monet5/merovingian/merovingian_peering.c sql/src/backends/monet5/merovingian/merovingian_peering.h Branch: default Log Message:
Implement peering client/server support diffs (truncated from 307 to 300 lines): diff -r d185d074e3f4 -r 293807560d4c sql/src/backends/monet5/merovingian/Makefile.ag --- a/sql/src/backends/monet5/merovingian/Makefile.ag Mon Sep 20 14:53:10 2010 +0200 +++ b/sql/src/backends/monet5/merovingian/Makefile.ag Mon Sep 20 14:54:10 2010 +0200 @@ -38,6 +38,7 @@ glob.h \ database.h \ control.h \ + merovingian_peering.h \ $(MEROVINGIAN_SUBS) lib_meroutil = { @@ -50,11 +51,19 @@ control.c } +lib_merodaemon = { + NOINST + SOURCES = \ + merovingian.c \ + merovingian_peering.c +} + bin_merovingian = { # hack: include merovingian.1.in here to get it expanded - SOURCES = merovingian.c merovingian.1.in + SOURCES = merovingian.1.in LIBS = \ libmeroutil \ + libmerodaemon \ $(MONETDB5_LIBS) -lmonetdb5 \ $(MONETDB_LIBS) -lbat -lstream \ $(SOCKET_LIBS) \ diff -r d185d074e3f4 -r 293807560d4c sql/src/backends/monet5/merovingian/merovingian.c --- a/sql/src/backends/monet5/merovingian/merovingian.c Mon Sep 20 14:53:10 2010 +0200 +++ b/sql/src/backends/monet5/merovingian/merovingian.c Mon Sep 20 14:54:10 2010 +0200 @@ -409,6 +409,8 @@ return(ret); } +#include "merovingian_peering.h" + #include "merovingian_forkmserver.c" #include "merovingian_proxy.c" #include "merovingian_client.c" diff -r d185d074e3f4 -r 293807560d4c sql/src/backends/monet5/merovingian/merovingian_controlrunner.c --- a/sql/src/backends/monet5/merovingian/merovingian_controlrunner.c Mon Sep 20 14:53:10 2010 +0200 +++ b/sql/src/backends/monet5/merovingian/merovingian_controlrunner.c Mon Sep 20 14:54:10 2010 +0200 @@ -241,13 +241,16 @@ len = snprintf(buf2, sizeof(buf2), "OK\n"); send(msgsock, buf2, len, 0); } else if (strcmp(q, "peer") == 0) { - Mfprintf(_mero_ctlout, "%s: peering not yet implemented\n", - origin); - len = snprintf(buf2, sizeof(buf2), - "peering not yet implemented\n"); + pthread_t ptid; /* FIXME: register global */ + len = snprintf(buf2, sizeof(buf2), "OK\n"); send(msgsock, buf2, len, 0); - close(msgsock); - continue; + /* start a separate thread to handle the peering */ + if (pthread_create(&ptid, NULL, + (void *(*)(void *))peeringServerThread, + (void *)&msgsock) < 0) + { + /* FAIL */ + } } else { Mfprintf(_mero_ctlout, "%s: invalid mode " "(%s)\n", origin, q); diff -r d185d074e3f4 -r 293807560d4c sql/src/backends/monet5/merovingian/merovingian_discoveryrunner.c --- a/sql/src/backends/monet5/merovingian/merovingian_discoveryrunner.c Mon Sep 20 14:53:10 2010 +0200 +++ b/sql/src/backends/monet5/merovingian/merovingian_discoveryrunner.c Mon Sep 20 14:54:10 2010 +0200 @@ -109,6 +109,54 @@ return(1); } +typedef struct _disc_message_tap { + int fd; + struct _disc_message_tap *next; +} *disc_message_tap; + +/* list of hooks for incoming messages */ +static disc_message_tap _mero_disc_msg_taps = NULL; + +void +registerMessageTap(int fd) +{ + disc_message_tap h = _mero_disc_msg_taps; + /* make sure we never block in the main loop below because we can't + * write to the pipe */ + fcntl(fd, F_SETFD, O_NONBLOCK); + pthread_mutex_lock(&_mero_remotedb_lock); + if (h == NULL) { + h = GDKmalloc(sizeof(struct _disc_message_tap)); + } else { + for (; h->next != NULL; h = h->next) + ; + h = h->next = GDKmalloc(sizeof(struct _disc_message_tap)); + } + h->next = NULL; + h->fd = fd; + pthread_mutex_unlock(&_mero_remotedb_lock); +} + +void +unregisterMessageTap(int fd) +{ + disc_message_tap h = _mero_disc_msg_taps; + disc_message_tap lasth; + pthread_mutex_lock(&_mero_remotedb_lock); + for (lasth = NULL; h != NULL; lasth = h, h = h->next) { + if (h->fd == fd) { + if (lasth == NULL) { + _mero_disc_msg_taps = h->next; + } else { + lasth->next = h->next; + } + GDKfree(h); + break; + } + } + pthread_mutex_unlock(&_mero_remotedb_lock); +} + static void discoveryRunner(void *d) { @@ -252,6 +300,13 @@ /* ignore messages from broadcast interface */ if (strcmp(host, "0.0.0.0") == 0) continue; + /* forward messages not coming from ourself to all routes that + * are active */ + if (strcmp(host, _mero_hostname) != 0) { + disc_message_tap h = _mero_disc_msg_taps; + for (; h != NULL; h = h->next) + write(h->fd, buf, nread); + } if (strncmp(buf, "HELO ", 5) == 0) { /* HELLO message, respond with current databases */ diff -r d185d074e3f4 -r 293807560d4c sql/src/backends/monet5/merovingian/merovingian_peering.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sql/src/backends/monet5/merovingian/merovingian_peering.c Mon Sep 20 14:54:10 2010 +0200 @@ -0,0 +1,137 @@ +/* + * The contents of this file are subject to the MonetDB Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://monetdb.cwi.nl/Legal/MonetDBLicense-1.1.html + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the + * License for the specific language governing rights and limitations + * under the License. + * + * The Original Code is the MonetDB Database System. + * + * The Initial Developer of the Original Code is CWI. + * Portions created by CWI are Copyright (C) 1997-July 2008 CWI. + * Copyright August 2008-2010 MonetDB B.V. + * All Rights Reserved. + */ + +#include "sql_config.h" +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> +#include <gdk.h> + +extern void broadcast(char *); +extern void registerMessageTap(int); +extern void unregisterMessageTap(int); +extern char *_mero_hostname; +extern unsigned short _mero_port; +extern char _mero_keep_listening; + +void +peeringServerThread(void *d) +{ + int s = *(int *)d; + int msock; + ssize_t len; + char data[1024]; + char *masquerade; + int discreader[2]; + struct timeval tv; + fd_set fds; + + /* start speaking the peering initialisation language, client tells + * what it wants, we reply + * ritual is as follows: + * for a bi-directional tunnel over which all traffic is routed + * (with masquerading of the discovery announcements such that + * traffic from both networks is directed over the two border + * hosts as advertised by the request and response: + * > tunnel host:port + * < tunnel myhost:myport + * for one-sided proxying of traffic, where the network from the + * client connects to the border host advertised in the response, + * and the network from the server connects to each host from the + * client's network individually (as typically in a NAT + * situation): + * > proxy + * < proxy myhost:myport + * for full connectable networks, were masquerading is not + * necessary on any side and all hosts from the one network + * directly connect any of the hosts from the other network: + * > direct + * < direct + * after this (on error, the server disconnects), the regular + * discovery protocol (HELO, ANNC, LEAV) is spoken on the line until + * disconnected by either party (typically a shutdown). */ + + masquerade = NULL; + len = read(s, data, sizeof(data)); + if (len > 0 && strncmp(data, "tunnel ", 7) == 0) { + /* tunnel mode */ + masquerade = GDKstrdup(data + 7); + snprintf(data, sizeof(data), + "tunnel %s:%hu\n", _mero_hostname, _mero_port); + write(s, data, strlen(data)); + } else if (len > 0 && strcmp(data, "proxy") == 0) { + /* proxy mode */ + snprintf(data, sizeof(data), + "proxy %s:%hu\n", _mero_hostname, _mero_port); + write(s, data, strlen(data)); + } else if (len > 0 && strcmp(data, "direct") == 0) { + /* direct mode */ + snprintf(data, sizeof(data), "direct\n"); + write(s, data, strlen(data)); + } else { + /* invalid, abort here */ + snprintf(data, sizeof(data), "invalid request\n"); + write(s, data, strlen(data)); + close(s); + return; + } + + if (pipe(discreader) == -1) { + /* bla error */ + close(s); + return; + } + registerMessageTap(discreader[0]); + + /* now just forward and inject announce messages, doing the + * masquerading if necessary */ + while (_mero_keep_listening == 1) { + FD_ZERO(&fds); + FD_SET(s, &fds); + FD_SET(discreader[1], &fds); + msock = s > discreader[1] ? s : discreader[1]; + /* wait up to 5 seconds. */ + tv.tv_sec = 5; + tv.tv_usec = 0; + len = select(msock + 1, &fds, NULL, NULL, &tv); + /* nothing interesting has happened */ + if (len == 0) + continue; + if (FD_ISSET(s, &fds)) { + /* from client, forward to our network */ + if ((len = read(s, data, sizeof(data) - 1)) == -1) + break; + data[len] = '\0'; + /* FIXME: simple form, no masquerading */ + broadcast(data); + } else if (FD_ISSET(discreader[1], &fds)) { + /* from our network, forward to client */ + len = read(discreader[1], data, sizeof(data)); + /* FIXME: simple form, no masquerading */ + if (write(s, data, len) == -1) + break; + } + } + + unregisterMessageTap(discreader[0]); + close(discreader[0]); + close(discreader[1]); + close(s); +} diff -r d185d074e3f4 -r 293807560d4c sql/src/backends/monet5/merovingian/merovingian_peering.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sql/src/backends/monet5/merovingian/merovingian_peering.h Mon Sep 20 14:54:10 2010 +0200 @@ -0,0 +1,20 @@ +/* + * The contents of this file are subject to the MonetDB Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://monetdb.cwi.nl/Legal/MonetDBLicense-1.1.html + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the + * License for the specific language governing rights and limitations + * under the License. + * + * The Original Code is the MonetDB Database System. + * _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list