Changeset: 0263be48f587 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0263be48f587
Modified Files:
        monetdb5/mal/mal_dataflow.c
        monetdb5/mal/mal_dataflow.h
        monetdb5/mal/mal_resource.c
        monetdb5/mal/mal_resource.h
Branch: default
Log Message:

Add dataflow deblocking instruction
It should alway be allowed to enter the queue,
even if we have resource constraints.
Alse a minimal number of workers should be
active before resource control kicks in.


diffs (101 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -380,13 +380,17 @@ DFLOWworker(void *T)
                MT_lock_unset(&flow->flowlock);
 
 #ifdef USE_MAL_ADMISSION
-               if (MALadmission(fe->argclaim, fe->hotclaim)) {
-                       fe->hotclaim = 0;   /* don't assume priority anymore */
-                       fe->maxclaim = 0;
-                       if (todo->last == 0)
-                               MT_sleep_ms(DELAYUNIT);
-                       q_requeue(todo, fe);
-                       continue;
+               if (MALrunningThreads() > 2 && MALadmission(fe->argclaim, 
fe->hotclaim)) {
+                       // never block on deblockdataflow()
+                       p= getInstrPtr(flow->mb,fe->pc);
+                       if( p->fcn != (MALfcn) deblockdataflow){
+                               fe->hotclaim = 0;   /* don't assume priority 
anymore */
+                               fe->maxclaim = 0;
+                               if (todo->last == 0)
+                                       MT_sleep_ms(DELAYUNIT);
+                               q_requeue(todo, fe);
+                               continue;
+                       }
                }
 #endif
                error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 
1, flow->stk, 0, 0);
@@ -931,6 +935,17 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        return msg;
 }
 
+str
+deblockdataflow( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+    int *ret = getArgReference_int(stk,pci,0);
+    int *val = getArgReference_int(stk,pci,1);
+    (void) cntxt;
+    (void) mb;
+    *ret = *val;
+    return MAL_SUCCEED;
+}
+
 void
 stopMALdataflow(void)
 {
diff --git a/monetdb5/mal/mal_dataflow.h b/monetdb5/mal/mal_dataflow.h
--- a/monetdb5/mal/mal_dataflow.h
+++ b/monetdb5/mal/mal_dataflow.h
@@ -13,5 +13,6 @@
 #include "mal_client.h"
 
 mal_export str runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int 
stoppc, MalStkPtr stk);
+mal_export str deblockdataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 
 #endif /*  _MAL_DATAFLOW_H*/
diff --git a/monetdb5/mal/mal_resource.c b/monetdb5/mal/mal_resource.c
--- a/monetdb5/mal/mal_resource.c
+++ b/monetdb5/mal/mal_resource.c
@@ -206,7 +206,7 @@ MALresourceFairness(lng usec)
                        if (rss < MEMORY_THRESHOLD )
                                break;
                        threads = GDKnr_threads > 0 ? GDKnr_threads : 1;
-                       delay = (unsigned int) ( ((double)DELAYUNIT * running) 
/ threads);
+                       delay = (unsigned int) ( ((double)DELAYUNIT * running) 
/ threads) + 1;
                        if (delay) {
                                if ( delayed++ == 0){
                                                PARDEBUG 
mnstr_printf(GDKstdout, "#delay initial %u["LLFMT"] memory  "SZFMT"[%f]\n", 
delay, clk, rss, MEMORY_THRESHOLD );
@@ -221,6 +221,13 @@ MALresourceFairness(lng usec)
        }
 }
 
+// Get a hint on the parallel behavior
+int
+MALrunningThreads(void)
+{
+       return running;
+}
+
 void
 initResource(void)
 {
diff --git a/monetdb5/mal/mal_resource.h b/monetdb5/mal/mal_resource.h
--- a/monetdb5/mal/mal_resource.h
+++ b/monetdb5/mal/mal_resource.h
@@ -12,7 +12,7 @@
 #include "mal_interpreter.h"
 
 #define TIMESLICE  2000000 /* usec */
-#define DELAYUNIT 5 /* ms delay in parallel processing decisions */
+#define DELAYUNIT 2 /* ms delay in parallel processing decisions */
 #define MAX_DELAYS 1000 /* never wait forever */
 
 #define USE_MAL_ADMISSION
@@ -22,5 +22,6 @@ mal_export int MALadmission(lng argclaim
 
 mal_export lng getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int 
i, int flag);
 mal_export void MALresourceFairness(lng usec);
+mal_export int MALrunningThreads(void);
 
 #endif /*  _MAL_RESOURCE_H*/
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to