Changeset: 3a6f13db184e for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=3a6f13db184e Modified Files: monetdb5/mal/mal.c monetdb5/mal/mal.h monetdb5/mal/mal_dataflow.c Branch: default Log Message:
Give delayed threads their own lock and refine the punishment to include all instructions when we reached our memory quota. diffs (83 lines): diff --git a/monetdb5/mal/mal.c b/monetdb5/mal/mal.c --- a/monetdb5/mal/mal.c +++ b/monetdb5/mal/mal.c @@ -197,6 +197,7 @@ MT_Lock mal_contextLock; MT_Lock mal_remoteLock; MT_Lock mal_profileLock ; MT_Lock mal_copyLock; +MT_Lock mal_delayLock; /* * @- * Initialization of the MAL context @@ -234,6 +235,7 @@ int mal_init(void){ MT_lock_init( &mal_remoteLock, "mal_remoteLock"); MT_lock_init( &mal_profileLock, "mal_profileLock"); MT_lock_init( &mal_copyLock, "mal_copyLock"); + MT_lock_init( &mal_delayLock, "mal_delayLock"); GDKprotect(); tstAligned(); diff --git a/monetdb5/mal/mal.h b/monetdb5/mal/mal.h --- a/monetdb5/mal/mal.h +++ b/monetdb5/mal/mal.h @@ -126,6 +126,7 @@ mal_export MT_Lock mal_contextLock; mal_export MT_Lock mal_remoteLock; mal_export MT_Lock mal_profileLock ; mal_export MT_Lock mal_copyLock ; +mal_export MT_Lock mal_delayLock ; mal_export int mal_init(void); diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -873,21 +873,22 @@ runDFLOWworker(void *t) else q_enqueue(task->flow->done, fs); /* Thread chain context switch decision. */ - /* Delay the threads if too much competition arises */ + /* Delay the threads if too much competition arises and memory becomes scarce */ /* If in the mean time memory becomes free, or too many sleep, re-enable worker */ /* It may happen that all threads enter the wait state. So, keep one running at all time */ - if ( nxtfs == 0){ + if ( nxtfs == 0 || MT_getrss() > MEMORY_THRESHOLD * monet_memory) { long delay, clk = (GDKusec()- usec)/1000; + int rss=0; double factor = 1.0; if ( clk > DELAYUNIT ) { - mal_set_lock(mal_contextLock, "runMALdataflow"); + mal_set_lock(mal_delayLock, "runMALdataflow"); asleep++; /* speedup as we see more threads asleep */ clk = (long) (clk * (1.0- asleep/GDKnr_threads)); /* always keep one running to avoid all waiting for a chain context switch */ if ( asleep >= GDKnr_threads) clk = -2 * DELAYUNIT; - mal_unset_lock(mal_contextLock, "runMALdataflow"); + mal_unset_lock(mal_delayLock, "runMALdataflow"); /* if there are no other instructions in the queue, then simply wait for them */ if ( task->todo->last == 0) clk = -3 * DELAYUNIT; @@ -895,16 +896,18 @@ runDFLOWworker(void *t) PARDEBUG mnstr_printf(GDKstdout,"#delay %d initial %ld\n", task->id, clk); while (clk > 0 ){ /* speed up wake up when we have memory or too many sleepers */ - factor = MT_getrss()/(MEMORY_THRESHOLD * monet_memory); + /* don't call getrss too often */ + if ( rss++ % 10 == 0) + factor = MT_getrss()/(MEMORY_THRESHOLD * monet_memory); delay = (long)( DELAYUNIT * (factor > 1.0 ? 1.0:factor)); - //delay = (long) (delay * (1.0 - asleep/GDKnr_threads)); + delay = (long) (delay * (1.0 - (asleep-1)/GDKnr_threads)); if ( delay) MT_sleep_ms( delay ); clk -= DELAYUNIT; } - mal_set_lock(mal_contextLock, "runMALdataflow"); + mal_set_lock(mal_delayLock, "runMALdataflow"); asleep--; - mal_unset_lock(mal_contextLock, "runMALdataflow"); + mal_unset_lock(mal_delayLock, "runMALdataflow"); PARDEBUG mnstr_printf(GDKstdout,"#delayed finished thread %d asleep %d\n", task->id, asleep); } } _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list