Changeset: 80a6edac804f for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/80a6edac804f Modified Files: monetdb5/mal/mal_dataflow.c monetdb5/optimizer/opt_for.c Branch: default Log Message:
Merge with Sep2022 branch. diffs (227 lines): diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -51,13 +51,12 @@ typedef struct FLOWEVENT { lng hotclaim; /* memory foot print of result variables */ lng argclaim; /* memory foot print of arguments */ lng maxclaim; /* memory foot print of largest argument, could be used to indicate result size */ + struct FLOWEVENT *next; /* linked list for queues */ } *FlowEvent, FlowEventRec; typedef struct queue { - int size; /* size of queue */ - int last; /* last element in the queue */ int exitcount; /* how many threads should exit */ - FlowEvent *data; + FlowEvent first, last; /* first and last element of the queue */ MT_Lock l; /* it's a shared resource, ie we need locks */ MT_Sema s; /* threads wait on empty queues */ } Queue; @@ -112,7 +111,6 @@ mal_dataflow_reset(void) idle_workers = -1; exited_workers = -1; if( todo) { - GDKfree(todo->data); MT_lock_destroy(&todo->l); MT_sema_destroy(&todo->s); GDKfree(todo); @@ -142,21 +140,12 @@ DFLOWgraphSize(MalBlkPtr mb, int start, */ static Queue* -q_create(int sz, const char *name) +q_create(const char *name) { - Queue *q = (Queue*)GDKmalloc(sizeof(Queue)); + Queue *q = GDKzalloc(sizeof(Queue)); if (q == NULL) return NULL; - *q = (Queue) { - .size = (sz + 1) & ~1, /* we want a multiple of 2 */ - }; - q->data = (FlowEvent*) GDKmalloc(sizeof(FlowEvent) * q->size); - if (q->data == NULL) { - GDKfree(q); - return NULL; - } - MT_lock_init(&q->l, name); MT_sema_init(&q->s, 0, name); return q; @@ -168,7 +157,6 @@ q_destroy(Queue *q) assert(q); MT_lock_destroy(&q->l); MT_sema_destroy(&q->s); - GDKfree(q->data); GDKfree(q); } @@ -180,12 +168,15 @@ q_enqueue(Queue *q, FlowEvent d) assert(q); assert(d); MT_lock_set(&q->l); - if (q->last == q->size) { - q->size <<= 1; - q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) * q->size); - assert(q->data); + if (q->first == NULL) { + assert(q->last == NULL); + q->first = q->last = d; + } else { + assert(q->last != NULL); + q->last->next = d; + q->last = d; } - q->data[q->last++] = d; + d->next = NULL; MT_lock_unset(&q->l); MT_sema_up(&q->s); } @@ -202,16 +193,15 @@ q_requeue(Queue *q, FlowEvent d) assert(q); assert(d); MT_lock_set(&q->l); - if (q->last == q->size) { - /* enlarge buffer */ - q->size <<= 1; - q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) * q->size); - assert(q->data); + if (q->first == NULL) { + assert(q->last == NULL); + q->first = q->last = d; + d->next = NULL; + } else { + assert(q->last != NULL); + d->next = q->first; + q->first = d; } - for (int i = q->last; i > 0; i--) - q->data[i] = q->data[i - 1]; - q->data[0] = d; - q->last++; MT_lock_unset(&q->l); MT_sema_up(&q->s); } @@ -219,46 +209,37 @@ q_requeue(Queue *q, FlowEvent d) static FlowEvent q_dequeue(Queue *q, Client cntxt) { - FlowEvent r = NULL, s = NULL; - assert(q); MT_sema_down(&q->s); if (ATOMIC_GET(&exiting)) return NULL; MT_lock_set(&q->l); - if( cntxt == NULL && q->exitcount > 0){ + if (cntxt == NULL && q->exitcount > 0) { q->exitcount--; MT_lock_unset(&q->l); return NULL; } - { - int minpc; - minpc = q->last -1; - s = q->data[minpc]; - /* for long "queues", just grab the first eligible entry we encounter */ - if (s && q->last < 1024) { - for (int i = q->last - 1; i >= 0; i--) { - if (cntxt == NULL || q->data[i]->flow->cntxt == cntxt) { - /* for shorter "queues", find the oldest eligible entry */ - r = q->data[i]; - if (r && s->pc > r->pc) { - minpc = i; - s = r; - } - } - } - } - if (minpc >= 0) { - r = q->data[minpc]; - q->last--; - if (minpc < q->last) - memmove(q->data + minpc, q->data + minpc + 1, - (q->last - minpc) * sizeof(q->data[0])); + FlowEvent *dp = &q->first; + FlowEvent pd = NULL; + /* if cntxt == NULL, return the first event, if cntxt != NULL, find + * the first event in the queue with matching cntxt value and return + * that */ + if (cntxt != NULL) { + while (*dp && (*dp)->flow->cntxt != cntxt) { + pd = *dp; + dp = &pd->next; } } + FlowEvent d = *dp; + if (d) { + *dp = d->next; + d->next = NULL; + if (*dp == NULL) + q->last = pd; + } MT_lock_unset(&q->l); - return r; + return d; } /* @@ -358,9 +339,9 @@ DFLOWworker(void *T) fe->hotclaim = 0; /* don't assume priority anymore */ fe->maxclaim = 0; MT_lock_set(&todo->l); - int last = todo->last; + FlowEvent last = todo->last; MT_lock_unset(&todo->l); - if (last == 0) + if (last == NULL) MT_sleep_ms(DELAYUNIT); q_requeue(todo, fe); continue; @@ -495,7 +476,7 @@ DFLOWinitialize(void) return 0; } free_max = GDKgetenv_int("dataflow_max_free", GDKnr_threads < 4 ? 4 : GDKnr_threads); - todo = q_create(2048, "todo"); + todo = q_create("todo"); if (todo == NULL) { MT_lock_unset(&dataflowLock); MT_lock_unset(&mal_contextLock); @@ -911,7 +892,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m flow->start = startpc + 1; flow->stop = stoppc; - flow->done = q_create((unsigned) (stoppc - startpc) + 1, "flow->done"); + flow->done = q_create("flow->done"); if (flow->done == NULL) { GDKfree(flow); throw(MAL, "dataflow", "runMALdataflow(): Failed to create flow->done queue"); diff --git a/monetdb5/optimizer/opt_for.c b/monetdb5/optimizer/opt_for.c --- a/monetdb5/optimizer/opt_for.c +++ b/monetdb5/optimizer/opt_for.c @@ -47,12 +47,22 @@ OPTforImplementation(Client cntxt, MalBl if (mb->inlineProp) goto wrapup; + limit = mb->stop; + + for (i = 0; i < limit; i++) { + p = mb->stmt[i]; + if (p && p->retc == 1 && getModuleId(p) == forRef && getFunctionId(p) == decompressRef) { + break; + } + } + if (i == limit) + goto wrapup; /* nothing to do */ + varisfor = GDKzalloc(2 * mb->vtop * sizeof(int)); varforvalue = GDKzalloc(2 * mb->vtop * sizeof(int)); if (varisfor == NULL || varforvalue == NULL) goto wrapup; - limit = mb->stop; slimit = mb->ssize; old = mb->stmt; if (newMalBlkStmt(mb, mb->ssize) < 0) { _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org