Changeset: cda5bc233217 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=cda5bc233217 Modified Files: monetdb5/mal/mal.h monetdb5/mal/mal_interpreter.mx Branch: default Log Message:
Enhanced instruction scheduling The dataflow scheduler has been extended to avoid resource fights, especially in the situation of large databases. The policy is to suspend a thread when it reaches the end of an instruction chain, i.e. when it can not immediately continue due to lack of available eligible instructions. In such a situation, the thread is suspended equal to the time it was producing output. Waiting time is reduced faster when memory resources become available. likewise, when we have many sleeping threads, we speed up their wakeup. Initial experiments with SF100 shows good improvement, e.g. Q1 became about 25% faster. diffs (97 lines): diff --git a/monetdb5/mal/mal.h b/monetdb5/mal/mal.h --- a/monetdb5/mal/mal.h +++ b/monetdb5/mal/mal.h @@ -46,6 +46,7 @@ */ #define MAXSCRIPT 64 #define MEMORY_THRESHOLD 0.8 +#define DELAYUNIT 100 /* ms delay in parallel processing decissions */ mal_export char monet_cwd[PATHLENGTH]; mal_export int monet_welcome; diff --git a/monetdb5/mal/mal_interpreter.mx b/monetdb5/mal/mal_interpreter.mx --- a/monetdb5/mal/mal_interpreter.mx +++ b/monetdb5/mal/mal_interpreter.mx @@ -1050,6 +1050,8 @@ DFLOWstep(FlowTask *t, FlowStatus fs) * It could also give preference to an instruction that eats away the object * just produced. THis way it need not be saved on disk for a long time. */ +static int asleep = 0; + static void runDFLOWworker(void *t) { @@ -1058,6 +1060,7 @@ runDFLOWworker(void *t) InstrPtr p; Thread thr; int i, local = 0, last = 0; + long usec=0; thr = THRnew(MT_getpid(), "DFLOWworker"); GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */ @@ -1066,16 +1069,18 @@ runDFLOWworker(void *t) local = nxtfs != 0; if (nxtfs == 0) { fs = (FlowStatus)q_dequeue(task->todo); + #ifdef USE_DFLOW_ADMISSION if (DFLOWadmission(fs->argclaim, fs->hotclaim)) { fs->hotclaim = 0; /* don't assume priority anymore */ if ( task->todo->last == 0) - MT_sleep_ms(1); + MT_sleep_ms(DELAYUNIT); q_requeue(task->todo, fs); nxtfs = 0; continue; } #endif + usec = GDKusec(); } else /* always execute, it does not affect memory claims */ fs = nxtfs; @@ -1124,6 +1129,45 @@ runDFLOWworker(void *t) q_requeue(task->flow->done, fs); else q_enqueue(task->flow->done, fs); + /* Thread chain context switch decision. */ + /* Delay the threads if too much competition arises */ + /* If in the mean time memory becomes free, or too many sleep, re-enable worker */ + /* It may happen that all threads enter the wait state. So, keep one running at all time */ + if ( nxtfs == 0){ + long delay, clk = (long)(GDKusec()-usec)/1000; + double factor = 1.0; + if ( clk > DELAYUNIT ) { + long clkstart = clk; + mal_set_lock(mal_contextLock, "runMALdataflow"); + asleep++; + /* speedup as we see more threads asleep */ + clk = (long) (clk * (1.0- asleep/GDKnr_threads)); + /* always keep one running to avoid all waiting for a chain context switch */ + if ( asleep >= GDKnr_threads) + clk = -2 * DELAYUNIT; + mal_unset_lock(mal_contextLock, "runMALdataflow"); + /* if there are no other instructions in the queue, then simply wait for them */ + if ( task->todo->last == 0) + clk = -3 * DELAYUNIT; + + PARDEBUG if ( clk > 0) + mnstr_printf(GDKstdout,"#delay %d initial %ld\n", task->id, clk); + while (clk > 0 ){ + /* speed up wake up when we have memory or too many sleepers */ + factor = memorypool/(MEMORY_THRESHOLD * monet_memory); + delay = (long)( DELAYUNIT * (factor > 1.0 ? 1.0:factor)) + 1; + MT_sleep_ms( delay ); + clk -= DELAYUNIT; + } + if ( clkstart > DELAYUNIT || clkstart < - DELAYUNIT ) { + mal_set_lock(mal_contextLock, "runMALdataflow"); + asleep--; + mal_unset_lock(mal_contextLock, "runMALdataflow"); + PARDEBUG if ( clk > -2 * DELAYUNIT ) + mnstr_printf(GDKstdout,"#delayed finished thread %d asleep %d\n", task->id, asleep); + } + } + } } GDKfree(GDKerrbuf); GDKsetbuf(0); _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list