Changeset: b744137e3963 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b744137e3963
Modified Files:
        monetdb5/scheduler/run_centipede.c
Branch: default
Log Message:

Clean up source
It is still unclear what this code will become.


diffs (truncated from 324 to 300 lines):

diff --git a/monetdb5/scheduler/run_centipede.c 
b/monetdb5/scheduler/run_centipede.c
--- a/monetdb5/scheduler/run_centipede.c
+++ b/monetdb5/scheduler/run_centipede.c
@@ -18,11 +18,10 @@
  */
 
 /*
- * Use a single gauge structure to split the largest table.
  * @f centipede
  * @a M. Kersten
  * @- Centipede scheduling.
- * TBD when the partition manager works in single node.
+ * It is purposely a variation of the octopus scheduler
  */
 #include "monetdb_config.h"
 #include "centipede.h"
@@ -31,7 +30,7 @@
 #include "remote.h"
 #include "mal_sabaoth.h"
 #include "mal_recycle.h"
-
+#include "opt_partition.h"
 #include "mal_interpreter.h"
 
 typedef struct REGMAL{
@@ -62,7 +61,7 @@ typedef       struct{
 } Gauge;
 
 static int
-OPTfindPeer(str uri)
+CENTIPEDEfindPeer(str uri)
 {
     int i;
     for (i = 0; i < nrpeers; i++)
@@ -72,11 +71,11 @@ OPTfindPeer(str uri)
 }
 /* Look for and add a peer with uri in the registry.  Return index in registry 
*/
 int
-OPTgetPeer(str uri)
+CENTIPEDEgetPeer(str uri)
 {
        int i;
 
-       i = OPTfindPeer(uri);
+       i = CENTIPEDEfindPeer(uri);
        if ( i >=0 ) {
                peers[i].active = 1;
                return i;
@@ -96,7 +95,7 @@ OPTgetPeer(str uri)
 
 /* Clean function registry of non-active peers */
 
-void OPTcleanFunReg(int i)
+void CENTIPEDEcleanFunReg(int i)
 {
        Registry r, q;
        mal_set_lock(mal_contextLock,"slicing.cleanFunReg");
@@ -112,7 +111,7 @@ void OPTcleanFunReg(int i)
 }
 
 str
-OPTdiscover(Client cntxt)
+CENTIPEDEdiscover(Client cntxt)
 {
        bat bid = 0;
        BAT *b;
@@ -135,7 +134,7 @@ OPTdiscover(Client cntxt)
                        bi = bat_iterator(b);
                        BATloop(b,p,q){
                                str t= (str) BUNtail(bi,p);
-                               nrworkers += OPTgetPeer(t) >= 0;
+                               nrworkers += CENTIPEDEgetPeer(t) >= 0;
                        }
                }
                BBPreleaseref(bid);
@@ -146,11 +145,11 @@ OPTdiscover(Client cntxt)
                /* there is a last resort, local execution */
                SABAOTHgetLocalConnection(&s);
 
-               nrworkers += OPTgetPeer(s) >= 0;
+               nrworkers += CENTIPEDEgetPeer(s) >= 0;
                slicingLocal = 1;
        }
 
-#ifdef DEBUG_RUN_OPT
+#ifdef DEBUG_RUN_CENTIPEDE
        mnstr_printf(cntxt->fdout,"Active peers discovered %d\n",nrworkers);
        for (i=0; i<nrpeers; i++)
        if ( peers[i].uri )
@@ -161,7 +160,233 @@ OPTdiscover(Client cntxt)
 
        for (i=0; i<nrpeers; i++)
                if ( !peers[i].active )
-                       OPTcleanFunReg(i);
+                       CENTIPEDEcleanFunReg(i);
 
        return MAL_SUCCEED;
 }
+
+/*
+ * We register the code at all worker sites and keep
+ * a list of those already sent.
+ */
+static int
+CENTIPEDEfind(int i, str qry){
+       Registry r;
+       for ( r= peers[i].nxt; r; r= r->nxt)
+       if ( strcmp(qry, r->fcn)==0)
+               return 1;
+       return 0;
+}
+/*
+ * The work division looks at the system opportunities and
+ * replaces all null valued target site references in all instructions.
+ * The first policy is to simply perform round robin.
+ * The more advanced way is to negotiat with the remote sites.
+ */
+
+
+/*
+ * The scheduler runs all tentacles asynchronously.
+ * We should be careful in accessing a site that runs out
+ * of clients or any failure. It may cause the system to
+ * wait forever.
+ *
+ * The version argument indicates the tentacles
+ * if it is time to refresh their caches.
+ * It should be obtained from the recycler where we
+ * know when updates have been taken place.
+ *
+ * The time-out parameter is not used yet.
+ */
+
+static int admitSerialConn(void *cntxt, void *mb, void *stk, void *pci)
+{
+       str dburi;
+       int i, adm = 0;
+       MalStkPtr s = (MalStkPtr) stk;
+       InstrPtr p = (InstrPtr) pci;
+
+       (void) cntxt;
+       (void) mb;
+
+       if ( p->token == NOOPsymbol )
+               return 1;
+/*     if ( strncmp (getFunctionId(p), "exec", 4) == 0 )
+               dburi = *(str*)getArgReference(s, p,2);
+       else dburi = *(str*)getArgReference(s, p,1);
+*/
+       /* peer uri is the first argument */
+       dburi = *(str*)getArgReference(s, p,p->retc);
+
+       MT_lock_set(&s->stklock,"serialConn");
+       i = CENTIPEDEfindPeer(dburi);
+       if ( i >= 0 ) {
+               if ( !peers[i].inuse ) {
+                       adm = peers[i].inuse = 1;
+#ifdef DEBUG_RUN_CENTIPEDE
+                       mnstr_printf( ((Client)cntxt)->fdout,"USING conn. to 
peer %d (%s)\n", i, dburi);
+                       
printInstruction(((Client)cntxt)->fdout,(MalBlkPtr)mb,0, p, LIST_MAL_ALL);
+#endif
+               }
+               else {
+#ifdef DEBUG_RUN_CENTIPEDE
+                       mnstr_printf( ((Client)cntxt)->fdout,"Conn. to peer %d 
is BUSY\n", i);
+                       
printInstruction(((Client)cntxt)->fdout,(MalBlkPtr)mb,0, p, LIST_MAL_ALL);
+#endif
+               }
+       }
+       else {
+#ifdef DEBUG_RUN_CENTIPEDE
+               mnstr_printf( ((Client)cntxt)->fdout,"No peer %s\n", dburi);
+               printInstruction(((Client)cntxt)->fdout,(MalBlkPtr)mb,0, p, 
LIST_MAL_ALL);
+#endif
+       }
+
+       MT_lock_unset(&s->stklock,"serialConn");
+
+       return adm;
+}
+
+static int wrapupSerialConn(void *cntxt, void *mb, void *stk, void *pci)
+{
+       str dburi;
+       int i;
+       MalStkPtr s = (MalStkPtr) stk;
+       InstrPtr p = (InstrPtr) pci;
+
+       (void) cntxt;
+       (void) mb;
+
+       if ( p->token == NOOPsymbol )
+               return 0;
+
+/*     if ( strncmp (getFunctionId(p), "exec", 4) == 0 )
+               dburi = *(str*)getArgReference(s, p,2);
+       else dburi = *(str*)getArgReference(s, p,1); */
+       dburi = *(str*)getArgReference(s, p,p->retc);
+       i = CENTIPEDEfindPeer(dburi);
+
+       MT_lock_set(&s->stklock,"serialConn");
+       if ( i >= 0 ) {
+               peers[i].inuse = 0;
+#ifdef DEBUG_RUN_CENTIPEDE
+               mnstr_printf( ((Client)cntxt)->fdout,"RELEASE conn to peer %d 
(%s)\n", i, dburi);
+               printInstruction(((Client)cntxt)->fdout,(MalBlkPtr)mb,0, p, 
LIST_MAL_ALL);
+#endif
+       }
+       MT_lock_unset(&s->stklock,"serialConn");
+
+       return 0;
+}
+
+
+/*
+ * @-
+ * Discover available workers and register tentacles on them
+ * scheduler.register():bit;
+ */
+str
+CENTIPEDEdiscoverRegister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci)
+{
+       bit *res = (bit*) getArgReference(stk,pci,0);
+       int j, start, stop, k, found, v, pr;
+       InstrPtr p;
+       str msg = MAL_SUCCEED;
+       ValPtr wname;
+
+       *res = 1;       /* execute the block */
+
+       /* Find available peers */
+       msg= CENTIPEDEdiscover(cntxt);
+       if ( msg )
+               return msg;
+
+       /* Logical names "worker_x" in the mal block are replaced with the
+       uri-s of the associated peers in the execution stack. Logical
+       workers that do not have a physical match are replaced with the
+       constant string "NOTworker" to mark the instructions on this
+       worker to be skipped. */
+
+       start= getPC(mb,pci);
+       for (j = start + 1; j<mb->stop ; j++){
+               p= getInstrPtr(mb,j);
+               if ( p->barrier == EXITsymbol )
+                       break;
+               v = getArg(p,1);
+               wname = &getVarConstant(mb, v);
+
+               found = 0;
+               for ( k = 0; k < nrworkers; k++ )
+                       if ( strcmp(wname->val.sval, workers[k].name) == 0 ){
+                               found = 1;
+                               break;
+                       }
+               if ( found ) {
+                       garbageElement(cntxt, &stk->stk[v]);
+                       pr = workers[k].pnum;
+                       stk->stk[v].val.sval = GDKstrdup(peers[pr].uri);
+                       stk->stk[v].len = (int) strlen(stk->stk[v].val.sval);
+               } else {        /* disable instruction */
+                       garbageElement(cntxt, &stk->stk[v]);
+                       stk->stk[v].val.sval = GDKstrdup("NOTworker");
+                       stk->stk[v].len = (int) strlen(stk->stk[v].val.sval);
+                       p->token = NOOPsymbol;
+               }
+       }
+
+       /* Register tentacle functions at peers */
+       stop = j;
+       if ( !octopusLocal ){   /*skip registration for local execution*/
+               stk->admit = &admitSerialConn;
+               stk->wrapup = &wrapupSerialConn;
+               msg = runMALdataflow(cntxt,mb,start,stop,stk,0,pci);
+               stk->admit = NULL;
+               stk->wrapup = NULL;
+       }
+       *res = 0;
+       return msg;
+}
+
+str CENTIPEDEregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       int i, k;
+       str conn, fname, dburi, msg = MAL_SUCCEED;
+
+       (void) cntxt;
+       (void) mb;
+       dburi = *(str*)getArgReference(stk,pci,1);
+
+#ifdef DEBUG_RUN_CENTIPEDE
+               mnstr_printf(GDKout,"connect to  uri %s\n", dburi);
+#endif
+
+       msg = CENTIPEDEconnect(&conn, &dburi);
+       if ( msg )
+               return msg;
+
+       i =     CENTIPEDEgetPeer(dburi);
+
_______________________________________________
Checkin-list mailing list
Checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to