Changeset: 1163cb74e8c2 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/1163cb74e8c2
Modified Files:
        monetdb5/mal/mal_dataflow.c
        monetdb5/optimizer/opt_reorder.c
Branch: default
Log Message:

Issues resolved in the dataflow interpreter.
1) the instructions are preferrably executed in the order of the MAL program
2) correct resource claim
3) try to eat away the hot-potatoe and otherwise another eligible instruction.


diffs (181 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -7,23 +7,21 @@
  */
 
 /*
- * (author) M Kersten
- * Out of order execution
- * The alternative is to execute the instructions out of order
- * using dataflow dependencies and as an independent process.
+ * (author) M Kersten, S Mullender
  * Dataflow processing only works on a code
  * sequence that does not include additional (implicit) flow of control
  * statements and, ideally, consist of expensive BAT operations.
- * The dataflow interpreter selects cheap instructions
- * using a simple costfunction based on the size of the BATs involved.
- *
  * The dataflow portion is identified as a guarded block,
  * whose entry is controlled by the function language.dataflow();
- * This way the function can inform the caller to skip the block
- * when dataflow execution was performed.
  *
- * The flow graphs should be organized such that parallel threads can
- * access it mostly without expensive locking.
+ * The dataflow worker tries to follow the sequence of actions
+ * as layed out in the plan, but abandon this track when it hits
+ * a blocking operator, or an instruction for which not all arguments
+ * are available or resources become scarce.
+ *
+ * The flow graphs is organized such that parallel threads can
+ * access it mostly without expensive locking and dependent 
+ * variables are easy to find..
  */
 #include "monetdb_config.h"
 #include "mal_dataflow.h"
@@ -231,26 +229,21 @@ q_dequeue(Queue *q, Client cntxt)
        if (ATOMIC_GET(&exiting))
                return NULL;
        MT_lock_set(&q->l);
-       if (cntxt) {
-               int i, minpc = -1;
+       {
+               int i, minpc;
 
-               for (i = q->last - 1; i >= 0; i--) {
-                       if (q->data[i]->flow->cntxt == cntxt) {
-                               if (q->last > 1024) {
-                                       /* for long "queues", just grab the 
first eligible
-                                        * entry we encounter */
-                                       minpc = i;
-                                       break;
-                               }
-                               /* for shorter "queues", find the oldest 
eligible entry */
-                               if (minpc < 0) {
-                                       minpc = i;
-                                       s = q->data[i];
-                               }
-                               r = q->data[i];
-                               if (s && r && s->pc > r->pc) {
-                                       minpc = i;
-                                       s = r;
+               minpc = q->last -1;
+               s = q->data[minpc];
+               /* for long "queues", just grab the first eligible entry we 
encounter */
+               if (q->last < 1024) {
+                       for (i = q->last - 1; i >= 0; i--) {
+                               if ( cntxt ==  NULL || q->data[i]->flow->cntxt 
== cntxt) {
+                                       /* for shorter "queues", find the 
oldest eligible entry */
+                                       r = q->data[i];
+                                       if (s && r && s->pc > r->pc) {
+                                               minpc = i;
+                                               s = r;
+                                       }
                                }
                        }
                }
@@ -433,18 +426,27 @@ DFLOWworker(void *T)
                        if( footprint > fe->maxclaim) fe->maxclaim = footprint;
                }
        }
-/* the Hot potatoe also triggers that all binds are executed at the start
- * while they are not immediately needed. Each subquery should be handled 
completely
- * first. This means that in practive nthread subqueries will start processin.
+/* Try to get rid of the hot potatoe or locate an alternative to proceed.
  */
-// #define HOTPOTATOE
+#define HOTPOTATOE
 #ifdef HOTPOTATOE
        /* HOT potatoe choice */
+       int last = 0, alt = -1,  j;
+       int hotpotatoe = getArg(p,0);
+
        MT_lock_set(&flow->flowlock);
-       int last = 0;
-       for (last = fe->pc - flow->start; last >= 0 && (i = flow->nodes[last]) 
> 0; last = flow->edges[last])
-               if (flow->status[i].state == DFLOWpending &&
-                       flow->status[i].blocks == 1) {
+       for (last = fe->pc - flow->start; last >= 0 && (i = flow->nodes[last]) 
> 0; last = flow->edges[last]){
+               if (flow->status[i].state == DFLOWpending && 
flow->status[i].blocks == 1) {
+                       /* check if the hot potatoe is actually used */
+                       p= getInstrPtr(flow->mb,fe->pc);
+                       for(j= p->retc; j< p->argc; j++)
+                               if (getArg(p,0) == hotpotatoe)
+                                       break;
+                       if (j == p->argc){
+                               if( alt == -1)
+                                       alt = i;
+                               continue;
+                       }
                        flow->status[i].state = DFLOWrunning;
                        flow->status[i].blocks = 0;
                        flow->status[i].hotclaim = fe->hotclaim;
@@ -454,17 +456,24 @@ DFLOWworker(void *T)
                        fnxt = flow->status + i;
                        break;
                }
+       }
+       /* hot potatoe can not be removed, use alternative to proceed */
+       if( alt >= 0){
+               i = alt;
+               flow->status[i].state = DFLOWrunning;
+               flow->status[i].blocks = 0;
+               flow->status[i].hotclaim = fe->hotclaim;
+               flow->status[i].argclaim += fe->hotclaim;
+               if( flow->status[i].maxclaim < fe->maxclaim)
+                       flow->status[i].maxclaim = fe->maxclaim;
+               fnxt = flow->status + i;
+       }
        MT_lock_unset(&flow->flowlock);
 #endif
 
                q_enqueue(flow->done, fe);
         if ( fnxt == 0 && malProfileMode) {
-            int last;
-            MT_lock_set(&todo->l);
-            last = todo->last;
-            MT_lock_unset(&todo->l);
-            if (last == 0)
-                profilerHeartbeatEvent("wait");
+                       profilerHeartbeatEvent("wait");
         }
        }
        GDKfree(GDKerrbuf);
@@ -706,8 +715,9 @@ DFLOWscheduler(DataFlow flow, struct wor
                                MT_lock_unset(&flow->flowlock);
                                throw(MAL, "dataflow", "DFLOWscheduler(): 
getInstrPtr(flow->mb,fe[i].pc) returned NULL");
                        }
+                       fe[i].argclaim = 0;
                        for (j = p->retc; j < p->argc; j++)
-                               fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, 
fe[0].flow->stk, p, j, FALSE);
+                               fe[i].argclaim += 
getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, p, j, FALSE);
                        q_enqueue(todo, flow->status + i);
                        flow->status[i].state = DFLOWrunning;
                }
diff --git a/monetdb5/optimizer/opt_reorder.c b/monetdb5/optimizer/opt_reorder.c
--- a/monetdb5/optimizer/opt_reorder.c
+++ b/monetdb5/optimizer/opt_reorder.c
@@ -40,7 +40,7 @@
 str
 OPTreorderImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p)
 {
-    int i,j,k, blkcnt = 1;
+    int i,j,k, blkcnt = 1, pc = 0;
     InstrPtr *old = NULL;
     int limit, slimit, *depth = NULL;
     char buf[256];
@@ -125,8 +125,11 @@ OPTreorderImplementation(Client cntxt, M
     }
 
        for(k =0; k <= blkcnt; k++)
-               for(j=0; j < top[k]; j++)
-                       pushInstruction(mb, blocks[k][j]);
+               for(j=0; j < top[k]; j++){
+                       p =  blocks[k][j];
+                       p->pc = pc++;
+                       pushInstruction(mb, p);
+               }
 
     for(; i<limit; i++)
         if (old[i])
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to