Changeset: bdf6c7251142 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=bdf6c7251142
Modified Files:
        monetdb5/scheduler/srvpool.c
        monetdb5/scheduler/srvpool.h
        monetdb5/scheduler/srvpool.mal
Branch: default
Log Message:

The server pool code base
This module maintains a list of remote servers and
arranges for remote execution of partitioned queries.


diffs (truncated from 504 to 300 lines):

diff --git a/monetdb5/scheduler/srvpool.c b/monetdb5/scheduler/srvpool.c
new file mode 100644
--- /dev/null
+++ b/monetdb5/scheduler/srvpool.c
@@ -0,0 +1,402 @@
+/*
+ * The contents of this file are subject to the MonetDB Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.monetdb.org/Legal/MonetDBLicense
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+ * License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * The Original Code is the MonetDB Database System.
+ *
+ * The Initial Developer of the Original Code is CWI.
+ * Portions created by CWI are Copyright (C) 1997-July 2008 CWI.
+ * Copyright August 2008-2011 MonetDB B.V.
+ * All Rights Reserved.
+ */
+
+/*
+ * @a M. Kersten
+ * The octopus/centipede modules enable compute cloud based processing of SQL 
queries.
+ * Their scheduler takes over control of a MAL execution by
+ * re-directing requests to multiple sites. If there are no sites known,
+ * then the code is executed locally as is.
+ * The scheduler runs all subqueries asynchronously if possible.
+ *
+ * To make our live easier, we assume that all subqueries are
+ * grouped together in a guarded block as follows:
+ *
+ * @verbatim
+ * barrier parallel:= scheduler.srvpool();
+ * a:= srvpool.exec(sitename,fcnname,arg...);
+ * ...
+ * b:= srvpool.exec(sitename,fcnname,arg...);
+ * z:= mat.pack(a,...,b);
+ * exit parallel;
+ * @end verbatim
+ *
+ * A dummy site name is generated by the compilers.
+ * It is replaced by an actual name upon first call.
+ * If the site went down, a new site is selected automatically.
+ *
+ * We assume that the database versions are synchronised.
+ *
+ * To make the scheduler work, it needs a list of database instances.
+ * This list it gets from Merovingian by resolving
+ * all with the property 'shared=octopus' or 'shared=centipede' (set by 
monetdb).
+ * The default is to use the local database as a target.
+ */
+#include "monetdb_config.h"
+#include "mal_interpreter.h"
+#include "mat.h"
+#include "srvpool.h"
+#include "optimizer.h"
+#include <mapi.h>
+#include "remote.h"
+#include "mal_sabaoth.h"
+
+/*
+ * All remotely known query blocks are listed here
+*/
+typedef struct REGMAL{
+       str fcn;
+       struct REGMAL *nxt;
+} *Registry;
+
+/*
+ * All open connections are grouped here.
+*/
+typedef struct {
+       str name;       /* local unique name */
+       str uri;        /* uri associated with remote server */
+       str usr;        /* user credentials for using the server */
+       str pwd;
+       Registry nxt; /* list of registered mal functions */
+       bte active;     /* flag set when the servers is active */
+       str conn;       /* remote handle */
+} Server;
+
+#define MAXSITES 2048  /* should become dynamic at some point */
+static Server servers[MAXSITES];       /* registry of servers */
+static int nrservers=0;
+static int localExecution= FALSE;
+
+static int
+SRVPOOLfindServer(str dbalias)
+{
+       int i;
+       for (i=0; i<nrservers; i++)
+               if ( strcmp(dbalias, servers[i].name) == 0 )
+                       return i;
+       return -1;
+}
+
+/* Look for and add a server with uri in the registry, give it a local unique 
name.  Return index in registry */
+static int serverid=0;
+
+static int
+SRVPOOLnewServer(str uri)
+{
+       int i;
+       char buf[BUFSIZ];
+
+       i = nrservers;
+       /* use default settings */
+       snprintf(buf,BUFSIZ,"srv_%d",serverid++);
+       servers[i].name = GDKstrdup(buf);
+       servers[i].usr = GDKstrdup("monetdb");
+       servers[i].uri = GDKstrdup(uri);
+       servers[i].pwd = GDKstrdup("monetdb");
+       servers[i].active = 1;
+       servers[i].nxt = NULL;
+       nrservers++;
+       return i;
+}
+
+static int
+SRVPOOLgetServer(str uri)
+{
+       int i;
+
+       for (i=0; i<nrservers; i++)
+       if ( strcmp(uri, servers[i].name) == 0 || strcmp(uri, servers[i].uri) 
== 0 )
+               return i;
+
+       return SRVPOOLnewServer(uri);
+}
+
+/* Clean function registry of non-active servers */
+
+static void SRVPOOLcleanup(int i)
+{
+       Registry r, q;
+       r = servers[i].nxt;
+       while ( r ) {
+                       q = r->nxt;
+                       GDKfree(r->fcn);
+                       GDKfree(r);
+                       r = q;
+       }
+       servers[i].nxt = NULL;
+}
+
+/* logically disconnect from all servers */
+static str
+SRVPOOLdisconnect(void)
+{
+       int i, ret;
+       str msg = MAL_SUCCEED;
+
+       for ( i=0; i< nrservers; i++)
+       if ( servers[i].active && servers[i].conn != NULL ) {
+               msg = RMTdisconnect(&ret,&servers[i].conn);
+               GDKfree(servers[i].conn);
+               servers[i].conn = NULL;
+       }
+       return msg;
+}
+/* restart server pool */
+str SRVPOOLreset(int *ret)
+{
+       int i;
+       str msg;
+
+       (void) ret;
+       msg = SRVPOOLdisconnect();
+       for ( i=0; i< nrservers; i++){
+               SRVPOOLcleanup(i);
+               GDKfree(servers[i].name);
+       }
+       memset((char*)servers, 0, sizeof(Server) * nrservers);
+       nrservers = 0;
+       return msg;
+}
+
+str
+SRVPOOLconnect(str *c, str *uri)
+{
+       int i;
+       str msg = MAL_SUCCEED;
+       str conn = NULL, scen = "msql";
+
+
+       *c = NULL;
+       i = SRVPOOLfindServer(*uri);
+       if ( i < 0 ){
+               for ( i =0; i < nrservers; i++)
+               if ( strcmp(*uri, servers[i].uri) == 0)
+                       break;
+               if ( i == nrservers) 
+                       return createException(MAL, "srvpool.connect", "Server 
%s is not registered", *uri);
+       }
+
+       if ( servers[i].conn == NULL ) {
+               msg = RMTconnectScen(&conn, &servers[i].uri, &servers[i].usr, 
&servers[i].pwd, &scen);
+               if ( msg == MAL_SUCCEED )
+                       servers[i].conn = GDKstrdup(conn);
+               else 
+                       return msg;
+       }
+
+       *c = GDKstrdup(servers[i].conn);
+       return msg;
+}
+
+/* Look up the servers available for processing , guarantee a minimum number 
of servers */
+static str
+SRVPOOLdiscover(Client cntxt, str pattern, int minservers)
+{
+       bat bid = 0;
+       BAT *b;
+       BUN p,q;
+       str msg = MAL_SUCCEED, conn, scen = "msql";
+       BATiter bi;
+       int i = 0, j;
+       char buf[BUFSIZ], *s= buf;
+
+       strncpy(buf,pattern, BUFSIZ-1);
+
+       msg = RMTresolve(&bid,&s);
+       if ( msg == MAL_SUCCEED) {
+               b = BATdescriptor(bid);
+               if ( b != NULL && BATcount(b) > 0 ) {
+                       bi = bat_iterator(b);
+                       BATloop(b,p,q){
+                               str t= (str) BUNtail(bi,p);
+
+                               j = SRVPOOLgetServer(t); 
+                               msg = RMTconnectScen(&conn, &servers[j].uri, 
&servers[j].usr, &servers[j].pwd, &scen);
+                               if ( msg == MAL_SUCCEED )
+                                       servers[j].conn = GDKstrdup(conn);
+                               else  GDKfree(msg);     /* ignore failure */
+
+#ifdef DEBUG_RUN_SRVPOOL
+                               mnstr_printf(cntxt->fdout,"#Worker site %d 
alias %s %s\n", i, (conn?conn:""), t);
+#endif
+                               assert(i <MAXSITES);
+                       }
+               }
+               BBPreleaseref(bid);
+       } 
+       if( msg) {
+               /* ignore merovingian complaints */
+               mnstr_printf(cntxt->fdout,"#%s\n", msg);
+               GDKfree(msg);
+       }
+
+       while (i < minservers) {
+               /* there is a last resort, use local execution */
+               /* make sure you have enough connections */
+               SABAOTHgetLocalConnection(&s);
+
+               j = SRVPOOLnewServer(s); /*ref to servers registry*/
+               msg = RMTconnectScen(&conn, &servers[j].uri, &servers[j].usr, 
&servers[j].pwd, &scen);
+               if ( msg == MAL_SUCCEED )
+                       servers[j].conn = GDKstrdup(conn);
+               else  GDKfree(msg);
+#ifdef DEBUG_RUN_SRVPOOL
+               mnstr_printf(cntxt->fdout,"#Worker site %d %s\n", j, s);
+#endif
+               i++;
+       }
+
+#ifdef DEBUG_RUN_SRVPOOL
+       mnstr_printf(cntxt->fdout,"#Servers available %d\n", nrservers);
+#else
+               (void) cntxt;
+#endif
+       return msg;
+}
+
+/*
+ * We first need to register the code at a worker sites and keep
+ * a list of those already sent.
+ */
+static int
+SRVPOOLfind(int srv, str qry){
+       Registry r;
+       for ( r= servers[srv].nxt; r; r= r->nxt)
+       if ( strcmp(qry, r->fcn)==0)
+               return 1;
+       return 0;
+}
+
+
+static str
+SRVPOOLregisterInternal(Client cntxt, str dbalias, str fname)
+{
+       int srv;
+       str msg = MAL_SUCCEED, con;
+       Registry r;
+
+#ifdef DEBUG_RUN_SRVPOOL
+               mnstr_printf(GDKout,"#register %s at site %s\n", fname, 
dbalias);
_______________________________________________
Checkin-list mailing list
Checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to