Changeset: 0263be48f587 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0263be48f587 Modified Files: monetdb5/mal/mal_dataflow.c monetdb5/mal/mal_dataflow.h monetdb5/mal/mal_resource.c monetdb5/mal/mal_resource.h Branch: default Log Message:
Add dataflow deblocking instruction It should alway be allowed to enter the queue, even if we have resource constraints. Alse a minimal number of workers should be active before resource control kicks in. diffs (101 lines): diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -380,13 +380,17 @@ DFLOWworker(void *T) MT_lock_unset(&flow->flowlock); #ifdef USE_MAL_ADMISSION - if (MALadmission(fe->argclaim, fe->hotclaim)) { - fe->hotclaim = 0; /* don't assume priority anymore */ - fe->maxclaim = 0; - if (todo->last == 0) - MT_sleep_ms(DELAYUNIT); - q_requeue(todo, fe); - continue; + if (MALrunningThreads() > 2 && MALadmission(fe->argclaim, fe->hotclaim)) { + // never block on deblockdataflow() + p= getInstrPtr(flow->mb,fe->pc); + if( p->fcn != (MALfcn) deblockdataflow){ + fe->hotclaim = 0; /* don't assume priority anymore */ + fe->maxclaim = 0; + if (todo->last == 0) + MT_sleep_ms(DELAYUNIT); + q_requeue(todo, fe); + continue; + } } #endif error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1, flow->stk, 0, 0); @@ -931,6 +935,17 @@ runMALdataflow(Client cntxt, MalBlkPtr m return msg; } +str +deblockdataflow( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + int *ret = getArgReference_int(stk,pci,0); + int *val = getArgReference_int(stk,pci,1); + (void) cntxt; + (void) mb; + *ret = *val; + return MAL_SUCCEED; +} + void stopMALdataflow(void) { diff --git a/monetdb5/mal/mal_dataflow.h b/monetdb5/mal/mal_dataflow.h --- a/monetdb5/mal/mal_dataflow.h +++ b/monetdb5/mal/mal_dataflow.h @@ -13,5 +13,6 @@ #include "mal_client.h" mal_export str runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc, MalStkPtr stk); +mal_export str deblockdataflow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); #endif /* _MAL_DATAFLOW_H*/ diff --git a/monetdb5/mal/mal_resource.c b/monetdb5/mal/mal_resource.c --- a/monetdb5/mal/mal_resource.c +++ b/monetdb5/mal/mal_resource.c @@ -206,7 +206,7 @@ MALresourceFairness(lng usec) if (rss < MEMORY_THRESHOLD ) break; threads = GDKnr_threads > 0 ? GDKnr_threads : 1; - delay = (unsigned int) ( ((double)DELAYUNIT * running) / threads); + delay = (unsigned int) ( ((double)DELAYUNIT * running) / threads) + 1; if (delay) { if ( delayed++ == 0){ PARDEBUG mnstr_printf(GDKstdout, "#delay initial %u["LLFMT"] memory "SZFMT"[%f]\n", delay, clk, rss, MEMORY_THRESHOLD ); @@ -221,6 +221,13 @@ MALresourceFairness(lng usec) } } +// Get a hint on the parallel behavior +int +MALrunningThreads(void) +{ + return running; +} + void initResource(void) { diff --git a/monetdb5/mal/mal_resource.h b/monetdb5/mal/mal_resource.h --- a/monetdb5/mal/mal_resource.h +++ b/monetdb5/mal/mal_resource.h @@ -12,7 +12,7 @@ #include "mal_interpreter.h" #define TIMESLICE 2000000 /* usec */ -#define DELAYUNIT 5 /* ms delay in parallel processing decisions */ +#define DELAYUNIT 2 /* ms delay in parallel processing decisions */ #define MAX_DELAYS 1000 /* never wait forever */ #define USE_MAL_ADMISSION @@ -22,5 +22,6 @@ mal_export int MALadmission(lng argclaim mal_export lng getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int i, int flag); mal_export void MALresourceFairness(lng usec); +mal_export int MALrunningThreads(void); #endif /* _MAL_RESOURCE_H*/ _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list