Changeset: 9d81c7b5a874 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=9d81c7b5a874
Modified Files:
        MonetDB5/src/mal/mal_instruction.mx
        MonetDB5/src/mal/mal_interpreter.mx
        MonetDB5/src/mal/mal_stack.mx
Branch: default
Log Message:

Hooks for admission control
The dataflow block execution has been extended with a hook for
admission and wrapup. They take effect when the instruction is
about to be added to or released from the queue.

The hooks context is a single stack frame.

These hooks are intended to finetune the scheduling using
more knowlegde about the context, or to implement a dynamic
dataflow crawler where the instructions to be executed are
determined by e.g. sampling first.

The stack structure is changed, so a clean rebuild may be required.


diffs (91 lines):

diff -r 3ce2ea819aea -r 9d81c7b5a874 MonetDB5/src/mal/mal_instruction.mx
--- a/MonetDB5/src/mal/mal_instruction.mx       Wed Jun 30 21:27:45 2010 +0200
+++ b/MonetDB5/src/mal/mal_instruction.mx       Thu Jul 01 09:09:21 2010 +0200
@@ -350,7 +350,6 @@
 #define VARRETS 2
 
 /* all functions return a string */
-typedef str (*MALfcn) ();
 
 typedef struct {
        bit token;              /* instruction type */
diff -r 3ce2ea819aea -r 9d81c7b5a874 MonetDB5/src/mal/mal_interpreter.mx
--- a/MonetDB5/src/mal/mal_interpreter.mx       Wed Jun 30 21:27:45 2010 +0200
+++ b/MonetDB5/src/mal/mal_interpreter.mx       Thu Jul 01 09:09:21 2010 +0200
@@ -1262,7 +1262,15 @@
                printInstruction(GDKstdout, flow->mb, 0, p, LIST_MAL_STMT);
        }
 }
-
+...@-
+Parallel processing is mostly driven by dataflow, but within this context
+there may be different schemes to take instructions into execution.
+The admission scheme (and wrapup) are the necessary scheduler hooks.
+A scheduler registers the functions needed and should release them
+at the end of the parallel block.
+They take effect after we have ensured that the basic properties for
+execution hold.
+...@c
 static str
 DFLOWscheduler( DataFlow flow )
 {
@@ -1287,6 +1295,8 @@
        while(queued){
                PARDEBUG stream_printf(GDKstdout,"#waiting for results, queued 
%d\n", queued);
                f = q_dequeue(flow->done);
+               if ( f->flow->stk->wrapup ) /* clean up whatever is called for 
*/
+                       (*f->flow->stk->wrapup)(f->flow->cntxt, f->flow->mb, 
f->flow->stk, getInstrPtr(flow->mb, f->pc));
                f->status = DFLOWwrapup;
                queued--;
                if (f->pc < 0) {
@@ -1295,8 +1305,11 @@
                        }
                        /* we have to wait for all threads to report back */
                        /* dequeue the remainders in case of an error */
-                       while(queued-- > 0) 
-                               (void)q_dequeue(flow->done);
+                       while(queued-- > 0)  {
+                                       if ( f->flow->stk->wrapup ) /* clean up 
whatever is called for */
+                                               
(*f->flow->stk->wrapup)(f->flow->cntxt, f->flow->mb, f->flow->stk, 
getInstrPtr(flow->mb, f->pc));
+                                       (void)q_dequeue(flow->done);
+                       }
                        return f->error;
                } else {
                        p = getInstrPtr(flow->mb, f->pc);
@@ -1337,7 +1350,9 @@
                if (fs[i].status == DFLOWpending ) {
                        p = getInstrPtr(flow->mb, fs[i].pc);
                        for ( j= p->retc; j < p->argc; j++)
-                       if ( getArg(p,j)== oa && DFLOWeligible(flow,fs,i,p,pc)){
+                       if ( getArg(p,j)== oa && DFLOWeligible(flow,fs,i,p,pc) 
&&
+                               ( f->flow->stk->admit == 0 || 
(*f->flow->stk->admit)(f->flow->cntxt, f->flow->mb, f->flow->stk, 
getInstrPtr(flow->mb, f->pc)) )
+                       ){
                                queued++;
                                candidates ++;
                                DFLOWactivate(flow,fs,i,p);
diff -r 3ce2ea819aea -r 9d81c7b5a874 MonetDB5/src/mal/mal_stack.mx
--- a/MonetDB5/src/mal/mal_stack.mx     Wed Jun 30 21:27:45 2010 +0200
+++ b/MonetDB5/src/mal/mal_stack.mx     Thu Jul 01 09:09:21 2010 +0200
@@ -68,6 +68,8 @@
 #define MAXGLOBALS  4 * STACKINCR
 #define MAXSHARES   8
 
+typedef str (*MALfcn) ();
+
 typedef struct MALSTK {
        int stksize;
        int stktop;
@@ -76,6 +78,13 @@
        int calldepth;          /* to protect against runtime stack overflow */
        short keepAlive;        /* do not garbage collect when set */
        short garbageCollect; /* stack needs garbage collection */
+...@-
+Parallel processing is mostly driven by dataflow, but within this context
+there may be different schemes to take instructions into execution.
+The admission scheme (and wrapup) are the necessary scheduler hooks.
+...@h
+       MALfcn admit;
+       MALfcn wrapup;
        MT_Lock stklock;        /* used for parallel processing */
 @-
 It is handy to administer the timing in the stack frame
_______________________________________________
Checkin-list mailing list
Checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to