Changeset: 4029ade15099 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=4029ade15099 Modified Files: monetdb5/mal/mal_interpreter.mx Branch: default Log Message:
PACMAN scheduler The scheduler has been distributed over the main scheduler and the workers. Whenever an instruction is finished by a worker, it chases the flow graph and execute all instructions that it feeds into exclusively. The main scheduler is notified, to ensure proper waiting for all results. The effect of this algorithm is to ensure that each result produced is immediately fed into another operator, thereby attempting to avoid unnecessary IO of tmps. Error handling in a worker blocks the main scheduler to feed new instructions, but the worker has to continue until it reaches a blocking instruction. diffs (truncated from 513 to 300 lines): diff --git a/monetdb5/mal/mal_interpreter.mx b/monetdb5/mal/mal_interpreter.mx --- a/monetdb5/mal/mal_interpreter.mx +++ b/monetdb5/mal/mal_interpreter.mx @@ -87,11 +87,6 @@ static lng memorypool; /* memory claimed by concurrent threads */ static lng memoryused; /* memory used for intermediates */ static int memoryclaims = 0; /* number of threads active with expensive operations */ -static struct{ - lng claim; /* actual claim on memory*/ - int bid; -} hotpotatoes[MAXHOT]; -static int hottop = 0; #define heapinfo(X) if((X) && (X)->base) vol = (X)->free; else vol = 0; #define hashinfo(X) if((X) && (X)->mask) vol = ((X)->mask+(X)->lim+1)*sizeof(int) + sizeof(*(X)); else vol = 0; @@ -699,8 +694,12 @@ BBPunfix( h = b->batCacheid); } @c +/* + * The memory claim is the estimate for the amount of memory hold. + * Views are consider cheap and ignored +*/ static lng -getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int i){ +getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int i, int flag){ lng total=0, vol = 0; BAT *b; BUN cnt = 0; @@ -710,6 +709,10 @@ b = BATdescriptor(stk->stk[getArg(pci,i)].val.bval); if (b==NULL) return 0; + if ( flag && isVIEW(b)){ + BBPunfix( b->batCacheid); + return 0; + } @:calcFootprint@ /* prepare for hashes */ if ( b->H->hash == NULL && b->hsorted == 0 && @@ -725,143 +728,19 @@ } BBPunfix( b->batCacheid); } - PARDEBUG - mnstr_printf(GDKstdout,"#memory claim " LLFMT "\n",total); return total; } -static lng -getHotClaim() -{ - lng total = 0; - int i; - for ( i = 0; i < hottop; i++) - total += hotpotatoes[i].claim; - return total; -} - -@- -Calculate the memory need of a single instruction and also determine how much of the -hot potatoes it will eat. -@c -void -getMemoryHotClaim(FlowStatus fs) -{ - int i, h, t; - InstrPtr pci = getInstrPtr(fs->mb, fs->pc); - - fs->hotclaim = 0; - for(i= pci->retc; i< pci->argc; i++) { - if (fs->stk->stk[getArg(pci,i)].vtype == TYPE_bat) - h = fs->stk->stk[getArg(pci,i)].val.bval; - for (t =0; t< hottop; t++) - if (hotpotatoes[t].bid == h) - fs->hotclaim += hotpotatoes[t].claim; - } -} -@- -After we have executed the instruction, we should release any hotpotatoe claim (it is hot only once) -and make more precise claims for the return arguments. Moreover, we should reduce the hot potatoe set, -due to other arguments being loaded. -@c -void -updMemoryUsedPart(MalStkPtr stk, InstrPtr pci, int start, int stop, lng argclaim) -{ - /* remove the result arguments from the hot set */ - int i,j,h,bid; - - if ( hottop >= MAXHOT || memoryused > (lng) (MEMORY_THRESHOLD * monet_memory) ){ - /* forget everything returning memory to pool */ - mal_set_lock(mal_contextLock, "DFLOWdelay"); - for ( i =0; i< MAXHOT; i++){ - hotpotatoes[i].claim=0; - hotpotatoes[i].bid=0; - } - memoryused = 0; - hottop = 0; - mal_unset_lock(mal_contextLock, "DFLOWdelay"); -#ifdef DEBUG_MEMORY_CLAIM - mnstr_printf(GDKout,"#DFLOWhotpotatoes reset\n"); -#endif - } - - if (memorypool == 0 ){ - /* not initialized */ - return; - } - mal_set_lock(mal_contextLock, "DFLOWdelay"); - for ( i = start; i< stop; i++) - if (stk->stk[getArg(pci,i)].vtype == TYPE_bat && (bid = stk->stk[getArg(pci,i)].val.bval) && bid) - { - for ( h = j= 0; j< hottop; j++) - if ( hotpotatoes[j].bid != ABS(bid)) - hotpotatoes[h++]= hotpotatoes[j]; - else{ -#ifdef DEBUG_MEMORY_CLAIM - if ( hotpotatoes[j].claim){ - str cv = NULL; - ATOMformat(TYPE_bat, &hotpotatoes[hottop].bid, &cv); - mnstr_printf(GDKout,"#DFLOWhotpotatoes[%d] drops [%s]" LLFMT "\n", j, cv, hotpotatoes[j].claim); - if (cv) GDKfree(cv); - } -#endif - } - hottop = h; - } - /* input also invalidates part of the hot set */ - argclaim = argclaim * (memorypool / (memorypool+memoryused)); - for ( h = j= 0; j< hottop; j++) - if ( argclaim > 0) - argclaim -= hotpotatoes[j].claim; - else hotpotatoes[h++]= hotpotatoes[j]; - hottop = h; - mal_unset_lock(mal_contextLock, "DFLOWdelay"); -} - -void -updMemoryUsed(MalStkPtr stk, InstrPtr pci, lng argclaim) -{ - lng action=0,total,t,vol; - int i,h,bid; - BUN cnt = 0; - BAT *b; - - - for ( i= 0; i< pci->retc && hottop < MAXHOT; i++) - if (stk->stk[getArg(pci,i)].vtype == TYPE_bat && (bid = stk->stk[getArg(pci,i)].val.bval) && bid) - { - total = 0; - @:calcclaim@ - (void) t; - (void) h; - if ( total ) { - mal_set_lock(mal_contextLock, "DFLOWdelay"); - hotpotatoes[hottop].bid = ABS(stk->stk[getArg(pci,i)].val.bval); - hotpotatoes[hottop].claim = total; -#ifdef DEBUG_MEMORY_CLAIM - if ( total ) { - str cv = NULL; - ATOMformat(TYPE_bat, &hotpotatoes[hottop].bid, &cv); - mnstr_printf(GDKout,"#DFLOWhotpotatoes[%d] claims [%s]" LLFMT "\n", hottop, cv, total); - GDKfree(cv); - } -#endif - hottop++; - action++; - mal_unset_lock(mal_contextLock, "DFLOWdelay"); - } - } - updMemoryUsedPart(stk,pci, pci->retc,pci->argc,argclaim); -#ifdef DEBUG_MEMORY_CLAIM - if ( total && action ) - mnstr_printf(GDKout,"#DFLOWhotpotatoes pool " LLFMT " used " LLFMT "\n", memorypool, memoryused); -#endif -} - +/* + * The hotclaim indicates the amount of data recentely written. + * as a result of an operation. The argclaim is the sum over the hotclaims + * for all arguments. + * The argclaim provides a hint on how much we actually may need to execute + * The hotclaim is a hint how large the result would be. +*/ int DFLOWadmission(lng argclaim, lng hotclaim) { - lng hot; /* optimistically set memory */ if ( argclaim == 0) return 0; @@ -870,32 +749,25 @@ mal_set_lock(mal_contextLock, "DFLOWdelay"); if (memorypool <= 0 && memoryclaims == 0) { memorypool = (lng) (MEMORY_THRESHOLD * monet_memory); - hottop = 0; } if ( argclaim > 0 ) { - if (memoryclaims == 0 || memorypool - memoryused > argclaim -hotclaim){ - hot = getHotClaim(); - if ( memoryclaims && hotclaim == 0 && argclaim > memorypool - memoryused - hot) { - /* don't start unless you can eat the hot potatoes first */ - PARDEBUG - mnstr_printf(GDKstdout,"#Delayed due to hot potatoes pool " LLFMT " used " LLFMT " hot " LLFMT "\n", memorypool, memoryused,hot); - mal_unset_lock(mal_contextLock, "DFLOWdelay"); - return -1; - } - memorypool -= argclaim; + if (memoryclaims == 0 || memorypool - memoryused > argclaim + hotclaim){ + memorypool -= (argclaim + hotclaim); memoryclaims ++; PARDEBUG mnstr_printf(GDKstdout,"#DFLOWadmit %3d thread %d pool " LLFMT","LLFMT " claims " LLFMT "," LLFMT"\n", - memoryclaims, THRgettid(), memorypool, memoryused, argclaim, hotclaim); + memoryclaims, THRgettid(), memorypool, memoryused, argclaim, hotclaim); mal_unset_lock(mal_contextLock, "DFLOWdelay"); return 0; } + PARDEBUG + mnstr_printf(GDKstdout,"#Delayed due to lack of memory " LLFMT " used " LLFMT " requestd " LLFMT "\n", memorypool, memoryused, argclaim+hotclaim); mal_unset_lock(mal_contextLock, "DFLOWdelay"); return -1; } /* release memory claimed before */ - memorypool += -argclaim ; + memorypool += -argclaim - hotclaim ; memoryclaims --; PARDEBUG mnstr_printf(GDKstdout,"#DFLOWadmit %3d thread %d pool " LLFMT","LLFMT " claims " LLFMT "," LLFMT"\n", @@ -961,11 +833,16 @@ MT_sema_up(&q->s, "q_enqueue"); } +/* + * A priority queue over the hot claims of memory may + * be more effective. It priorizes those instructions + * that want to use a big recent result +*/ + static void -q_requeue(queue *q, void *d) +q_requeue_(queue *q, void *d) { int i; - MT_lock_set(&q->l, "q_requeue"); if (q->last == q->size) { /* enlarge buffer */ q->size <<= 1; @@ -975,6 +852,12 @@ q->data[i]= q->data[i-1]; q->data[0] = (void*) d; q->last++; +} +static void +q_requeue(queue *q, void *d) +{ + MT_lock_set(&q->l, "q_requeue"); + q_requeue_(q,d); MT_lock_unset(&q->l, "q_requeue"); MT_sema_up(&q->s, "q_requeue"); } @@ -1140,26 +1023,70 @@ static void runDFLOWworker(void *t) { - FlowStatus fs; + FlowStatus fs, nxtfs =0; FlowTask *task = (FlowTask*) t; + InstrPtr p; Thread thr; + int i, local=0, last = 0; thr = THRnew(MT_getpid(), "DFLOWworker"); while(task) { - fs = (FlowStatus) q_dequeue(task->todo); + local = nxtfs != 0; + if ( nxtfs == 0) + fs = (FlowStatus) q_dequeue(task->todo); + else fs = nxtfs; if ( DFLOWadmission(fs->argclaim, fs->hotclaim) ){ PARDEBUG mnstr_printf(GDKout,"#delay pc=%d thr= %d pool " LLFMT " claim "LLFMT"\n", fs->pc,THRgettid(), memorypool, fs->argclaim); MT_sleep_ms(1); fs->hotclaim = 0; /* don't assume priority anymore */ q_requeue(task->todo,fs); + nxtfs = 0; continue; } assert(fs->pc > 0); + PARDEBUG + mnstr_printf(GDKstdout,"#execute pc= %d thr= %d %s\n",fs->pc, task->id, fs->error?fs->error:""); fs->error = DFLOWstep(task, fs); + PARDEBUG - mnstr_printf(GDKstdout,"#execute pc=%d thr= %d finished %s\n",fs->pc, task->id, fs->error?fs->error:""); - q_enqueue(task->flow->done, fs); + mnstr_printf(GDKstdout,"#execute pc= %d thr= %d finished %s\n",fs->pc, task->id, fs->error?fs->error:""); + + /* release the memory claim */ _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list