Changeset: 80a6edac804f for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/80a6edac804f
Modified Files:
        monetdb5/mal/mal_dataflow.c
        monetdb5/optimizer/opt_for.c
Branch: default
Log Message:

Merge with Sep2022 branch.


diffs (227 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -51,13 +51,12 @@ typedef struct FLOWEVENT {
        lng hotclaim;   /* memory foot print of result variables */
        lng argclaim;   /* memory foot print of arguments */
        lng maxclaim;   /* memory foot print of largest argument, could be used 
to indicate result size */
+       struct FLOWEVENT *next;         /* linked list for queues */
 } *FlowEvent, FlowEventRec;
 
 typedef struct queue {
-       int size;       /* size of queue */
-       int last;       /* last element in the queue */
        int exitcount;  /* how many threads should exit */
-       FlowEvent *data;
+       FlowEvent first, last;          /* first and last element of the queue 
*/
        MT_Lock l;      /* it's a shared resource, ie we need locks */
        MT_Sema s;      /* threads wait on empty queues */
 } Queue;
@@ -112,7 +111,6 @@ mal_dataflow_reset(void)
        idle_workers = -1;
        exited_workers = -1;
        if( todo) {
-               GDKfree(todo->data);
                MT_lock_destroy(&todo->l);
                MT_sema_destroy(&todo->s);
                GDKfree(todo);
@@ -142,21 +140,12 @@ DFLOWgraphSize(MalBlkPtr mb, int start, 
  */
 
 static Queue*
-q_create(int sz, const char *name)
+q_create(const char *name)
 {
-       Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
+       Queue *q = GDKzalloc(sizeof(Queue));
 
        if (q == NULL)
                return NULL;
-       *q = (Queue) {
-               .size = (sz + 1) & ~1,  /* we want a multiple of 2 */
-       };
-       q->data = (FlowEvent*) GDKmalloc(sizeof(FlowEvent) * q->size);
-       if (q->data == NULL) {
-               GDKfree(q);
-               return NULL;
-       }
-
        MT_lock_init(&q->l, name);
        MT_sema_init(&q->s, 0, name);
        return q;
@@ -168,7 +157,6 @@ q_destroy(Queue *q)
        assert(q);
        MT_lock_destroy(&q->l);
        MT_sema_destroy(&q->s);
-       GDKfree(q->data);
        GDKfree(q);
 }
 
@@ -180,12 +168,15 @@ q_enqueue(Queue *q, FlowEvent d)
        assert(q);
        assert(d);
        MT_lock_set(&q->l);
-       if (q->last == q->size) {
-               q->size <<= 1;
-               q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) * 
q->size);
-               assert(q->data);
+       if (q->first == NULL) {
+               assert(q->last == NULL);
+               q->first = q->last = d;
+       } else {
+               assert(q->last != NULL);
+               q->last->next = d;
+               q->last = d;
        }
-       q->data[q->last++] = d;
+       d->next = NULL;
        MT_lock_unset(&q->l);
        MT_sema_up(&q->s);
 }
@@ -202,16 +193,15 @@ q_requeue(Queue *q, FlowEvent d)
        assert(q);
        assert(d);
        MT_lock_set(&q->l);
-       if (q->last == q->size) {
-               /* enlarge buffer */
-               q->size <<= 1;
-               q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) * 
q->size);
-               assert(q->data);
+       if (q->first == NULL) {
+               assert(q->last == NULL);
+               q->first = q->last = d;
+               d->next = NULL;
+       } else {
+               assert(q->last != NULL);
+               d->next = q->first;
+               q->first = d;
        }
-       for (int i = q->last; i > 0; i--)
-               q->data[i] = q->data[i - 1];
-       q->data[0] = d;
-       q->last++;
        MT_lock_unset(&q->l);
        MT_sema_up(&q->s);
 }
@@ -219,46 +209,37 @@ q_requeue(Queue *q, FlowEvent d)
 static FlowEvent
 q_dequeue(Queue *q, Client cntxt)
 {
-       FlowEvent r = NULL, s = NULL;
-
        assert(q);
        MT_sema_down(&q->s);
        if (ATOMIC_GET(&exiting))
                return NULL;
        MT_lock_set(&q->l);
-       if( cntxt == NULL && q->exitcount > 0){
+       if (cntxt == NULL && q->exitcount > 0) {
                q->exitcount--;
                MT_lock_unset(&q->l);
                return NULL;
        }
-       {
-               int minpc;
 
-               minpc = q->last -1;
-               s = q->data[minpc];
-               /* for long "queues", just grab the first eligible entry we 
encounter */
-               if (s && q->last < 1024) {
-                       for (int i = q->last - 1; i >= 0; i--) {
-                               if (cntxt ==  NULL || q->data[i]->flow->cntxt 
== cntxt) {
-                                       /* for shorter "queues", find the 
oldest eligible entry */
-                                       r = q->data[i];
-                                       if (r && s->pc > r->pc) {
-                                               minpc = i;
-                                               s = r;
-                                       }
-                               }
-                       }
-               }
-               if (minpc >= 0) {
-                       r = q->data[minpc];
-                       q->last--;
-                       if (minpc < q->last)
-                               memmove(q->data + minpc, q->data + minpc + 1,
-                                               (q->last - minpc) * 
sizeof(q->data[0]));
+       FlowEvent *dp = &q->first;
+       FlowEvent pd = NULL;
+       /* if cntxt == NULL, return the first event, if cntxt != NULL, find
+        * the first event in the queue with matching cntxt value and return
+        * that */
+       if (cntxt != NULL) {
+               while (*dp && (*dp)->flow->cntxt != cntxt) {
+                       pd = *dp;
+                       dp = &pd->next;
                }
        }
+       FlowEvent d = *dp;
+       if (d) {
+               *dp = d->next;
+               d->next = NULL;
+               if (*dp == NULL)
+                       q->last = pd;
+       }
        MT_lock_unset(&q->l);
-       return r;
+       return d;
 }
 
 /*
@@ -358,9 +339,9 @@ DFLOWworker(void *T)
                                fe->hotclaim = 0;   /* don't assume priority 
anymore */
                                fe->maxclaim = 0;
                                MT_lock_set(&todo->l);
-                               int last = todo->last;
+                               FlowEvent last = todo->last;
                                MT_lock_unset(&todo->l);
-                               if (last == 0)
+                               if (last == NULL)
                                        MT_sleep_ms(DELAYUNIT);
                                q_requeue(todo, fe);
                                continue;
@@ -495,7 +476,7 @@ DFLOWinitialize(void)
                return 0;
        }
        free_max = GDKgetenv_int("dataflow_max_free", GDKnr_threads < 4 ? 4 : 
GDKnr_threads);
-       todo = q_create(2048, "todo");
+       todo = q_create("todo");
        if (todo == NULL) {
                MT_lock_unset(&dataflowLock);
                MT_lock_unset(&mal_contextLock);
@@ -911,7 +892,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        flow->start = startpc + 1;
        flow->stop = stoppc;
 
-       flow->done = q_create((unsigned) (stoppc - startpc) + 1, "flow->done");
+       flow->done = q_create("flow->done");
        if (flow->done == NULL) {
                GDKfree(flow);
                throw(MAL, "dataflow", "runMALdataflow(): Failed to create 
flow->done queue");
diff --git a/monetdb5/optimizer/opt_for.c b/monetdb5/optimizer/opt_for.c
--- a/monetdb5/optimizer/opt_for.c
+++ b/monetdb5/optimizer/opt_for.c
@@ -47,12 +47,22 @@ OPTforImplementation(Client cntxt, MalBl
        if (mb->inlineProp)
                goto wrapup;
 
+       limit = mb->stop;
+
+       for (i = 0; i < limit; i++) {
+               p = mb->stmt[i];
+               if (p && p->retc == 1 && getModuleId(p) == forRef && 
getFunctionId(p) == decompressRef) {
+                       break;
+               }
+       }
+       if (i == limit)
+               goto wrapup;                    /* nothing to do */
+
        varisfor = GDKzalloc(2 * mb->vtop * sizeof(int));
        varforvalue = GDKzalloc(2 * mb->vtop * sizeof(int));
        if (varisfor == NULL || varforvalue == NULL)
                goto wrapup;
 
-       limit = mb->stop;
        slimit = mb->ssize;
        old = mb->stmt;
        if (newMalBlkStmt(mb, mb->ssize) < 0) {
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to