Changeset: 3a6f13db184e for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=3a6f13db184e
Modified Files:
        monetdb5/mal/mal.c
        monetdb5/mal/mal.h
        monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:

Give delayed threads their own lock
and refine the punishment to include all instructions
when we reached our memory quota.


diffs (83 lines):

diff --git a/monetdb5/mal/mal.c b/monetdb5/mal/mal.c
--- a/monetdb5/mal/mal.c
+++ b/monetdb5/mal/mal.c
@@ -197,6 +197,7 @@ MT_Lock     mal_contextLock;
 MT_Lock     mal_remoteLock;
 MT_Lock        mal_profileLock ;
 MT_Lock     mal_copyLock;
+MT_Lock     mal_delayLock;
 /*
  * @-
  * Initialization of the MAL context
@@ -234,6 +235,7 @@ int mal_init(void){
        MT_lock_init( &mal_remoteLock, "mal_remoteLock");
        MT_lock_init( &mal_profileLock, "mal_profileLock");
        MT_lock_init( &mal_copyLock, "mal_copyLock");
+       MT_lock_init( &mal_delayLock, "mal_delayLock");
 
        GDKprotect();
        tstAligned();
diff --git a/monetdb5/mal/mal.h b/monetdb5/mal/mal.h
--- a/monetdb5/mal/mal.h
+++ b/monetdb5/mal/mal.h
@@ -126,6 +126,7 @@ mal_export MT_Lock  mal_contextLock;
 mal_export MT_Lock  mal_remoteLock;
 mal_export MT_Lock  mal_profileLock ;
 mal_export MT_Lock  mal_copyLock ;
+mal_export MT_Lock  mal_delayLock ;
 
 
 mal_export int mal_init(void);
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -873,21 +873,22 @@ runDFLOWworker(void *t)
                else
                        q_enqueue(task->flow->done, fs);
                /* Thread chain context switch decision. */
-               /* Delay the threads if too much competition arises */
+               /* Delay the threads if too much competition arises and memory 
becomes scarce */
                /* If in the mean time memory becomes free, or too many sleep, 
re-enable worker */
                /* It may happen that all threads enter the wait state. So, 
keep one running at all time */
-               if ( nxtfs == 0){
+               if ( nxtfs == 0 || MT_getrss() > MEMORY_THRESHOLD * 
monet_memory) {
                        long delay, clk = (GDKusec()- usec)/1000;
+                       int rss=0;
                        double factor = 1.0;
                        if ( clk > DELAYUNIT ) {
-                               mal_set_lock(mal_contextLock, "runMALdataflow");
+                               mal_set_lock(mal_delayLock, "runMALdataflow");
                                asleep++;
                                /* speedup as we see more threads asleep */
                                clk = (long) (clk * (1.0- 
asleep/GDKnr_threads));
                                /* always keep one running to avoid all waiting 
for a chain context switch */
                                if ( asleep >= GDKnr_threads)
                                        clk = -2 * DELAYUNIT;
-                               mal_unset_lock(mal_contextLock, 
"runMALdataflow");
+                               mal_unset_lock(mal_delayLock, "runMALdataflow");
                                /* if there are no other instructions in the 
queue, then simply wait for them */
                                if ( task->todo->last ==  0) 
                                        clk = -3 * DELAYUNIT;
@@ -895,16 +896,18 @@ runDFLOWworker(void *t)
                                PARDEBUG mnstr_printf(GDKstdout,"#delay %d 
initial %ld\n", task->id, clk);
                                while (clk > 0 ){
                                        /* speed up wake up when we have memory 
or too many sleepers */
-                                       factor = MT_getrss()/(MEMORY_THRESHOLD 
* monet_memory);
+                                       /* don't call getrss too often */
+                                       if ( rss++ % 10 == 0)
+                                               factor = 
MT_getrss()/(MEMORY_THRESHOLD * monet_memory);
                                        delay = (long)( DELAYUNIT * (factor > 
1.0 ? 1.0:factor));
-                                       //delay = (long) (delay * (1.0 - 
asleep/GDKnr_threads));
+                                       delay = (long) (delay * (1.0 - 
(asleep-1)/GDKnr_threads));
                                        if ( delay)
                                                MT_sleep_ms( delay );
                                        clk -= DELAYUNIT;
                                }
-                               mal_set_lock(mal_contextLock, "runMALdataflow");
+                               mal_set_lock(mal_delayLock, "runMALdataflow");
                                asleep--;
-                               mal_unset_lock(mal_contextLock, 
"runMALdataflow");
+                               mal_unset_lock(mal_delayLock, "runMALdataflow");
                                PARDEBUG mnstr_printf(GDKstdout,"#delayed 
finished thread %d asleep %d\n", task->id, asleep);
                        }
                }
_______________________________________________
Checkin-list mailing list
Checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to