Changeset: 9aed1c90a684 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=9aed1c90a684
Modified Files:
        monetdb5/mal/mal_dataflow.c
Branch: Feb2013
Log Message:

mal_dataflow.c: minor changes (mainly adding assertions and code layout)


mainly to minimize code differences between Feb2013 & default


diffs (154 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -110,26 +110,29 @@ DFLOWgraphSize(MalBlkPtr mb, int start, 
 static Queue*
 q_create(int sz)
 {
+       const char* name = "q_create";
        Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
 
        if (q == NULL)
                return NULL;
        q->size = ((sz << 1) >> 1); /* we want a multiple of 2 */
        q->last = 0;
-       q->data = (void*)GDKmalloc(sizeof(FlowEvent) * q->size);
+       q->data = (FlowEvent*) GDKmalloc(sizeof(FlowEvent) * q->size);
        if (q->data == NULL) {
                GDKfree(q);
                return NULL;
        }
 
-       MT_lock_init(&q->l, "q_create");
-       MT_sema_init(&q->s, 0, "q_create");
+       (void) name; /* in case MT_LOCK_TRACE is not enabled in gdk_system.h */
+       MT_lock_init(&q->l, name);
+       MT_sema_init(&q->s, 0, name);
        return q;
 }
 
 static void
 q_destroy(Queue *q)
 {
+       assert(q);
        MT_lock_destroy(&q->l);
        MT_sema_destroy(&q->s);
        GDKfree(q->data);
@@ -141,16 +144,20 @@ q_destroy(Queue *q)
 static void
 q_enqueue_(Queue *q, FlowEvent d)
 {
+       assert(q);
        assert(d);
        if (q->last == q->size) {
                q->size <<= 1;
-               q->data = GDKrealloc(q->data, sizeof(FlowEvent) * q->size);
+               q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) * 
q->size);
+               assert(q->data);
        }
        q->data[q->last++] = d;
 }
 static void
 q_enqueue(Queue *q, FlowEvent d)
 {
+       assert(q);
+       assert(d);
        MT_lock_set(&q->l, "q_enqueue");
        q_enqueue_(q, d);
        MT_lock_unset(&q->l, "q_enqueue");
@@ -169,20 +176,23 @@ q_requeue_(Queue *q, FlowEvent d)
 {
        int i;
 
+       assert(q);
        assert(d);
        if (q->last == q->size) {
                /* enlarge buffer */
                q->size <<= 1;
-               q->data = GDKrealloc(q->data, sizeof(void*) * q->size);
+               q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) * 
q->size);
+               assert(q->data);
        }
        for (i = q->last; i > 0; i--)
                q->data[i] = q->data[i - 1];
-       q->data[0] = (void*)d;
+       q->data[0] = d;
        q->last++;
 }
 static void
 q_requeue(Queue *q, FlowEvent d)
 {
+       assert(q);
        assert(d);
        MT_lock_set(&q->l, "q_requeue");
        q_requeue_(q, d);
@@ -196,12 +206,13 @@ q_dequeue(Queue *q)
 {
        void *r = NULL;
 
+       assert(q);
        MT_sema_down(&q->s, "q_dequeue");
        MT_lock_set(&q->l, "q_dequeue");
-       assert(q->last);
+       assert(q->last > 0);
        if (q->last > 0) {
                /* LIFO favors garbage collection */
-               r = q->data[--q->last];
+               r = (void*) q->data[--q->last];
                q->data[q->last] = 0;
        }
        /* else: terminating */
@@ -260,10 +271,12 @@ DFLOWworker(void *t)
                wq = workerqueue[id];
                if (fnxt == 0)
                        fe = q_dequeue(todo[wq]);
-               else fe = fnxt;
+               else
+                       fe = fnxt;
                fnxt = 0;
                assert(fe);
                flow = fe->flow;
+               assert(flow);
 
                /* whenever we have a (concurrent) error, skip it */
                if (flow->error) {
@@ -277,6 +290,7 @@ DFLOWworker(void *t)
 #ifdef USE_MAL_ADMISSION
                        if (MALadmission(fe->argclaim, fe->hotclaim)) {
                                fe->hotclaim = 0;   /* don't assume priority 
anymore */
+                               assert(todo[wq]);
                                if (todo[wq]->last == 0)
                                        MT_sleep_ms(DELAYUNIT);
                                q_requeue(todo[wq], fe);
@@ -314,6 +328,7 @@ DFLOWworker(void *t)
 #ifdef USE_MAL_ADMISSION
                {
                InstrPtr p = getInstrPtr(flow->mb, fe->pc);
+               assert(p);
                fe->hotclaim = 0;
                for (i = 0; i < p->retc; i++)
                        fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, 
fe->pc, i, FALSE);
@@ -335,6 +350,7 @@ DFLOWworker(void *t)
 
                q_enqueue(flow->done, fe);
                if ( fnxt == 0) {
+                       assert(todo[wq]);
                        if (todo[wq]->last == 0)
                                profilerHeartbeatEvent("wait");
                        else
@@ -513,11 +529,14 @@ DFLOWscheduler(DataFlow flow)
        int j;
        InstrPtr p;
 #endif
-       int tasks=0, actions = flow->stop - flow->start;
+       int tasks=0, actions;
        str ret = MAL_SUCCEED;
        FlowEvent fe, f = 0;
        int wq;
 
+       if (flow == NULL)
+               throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == 
NULL");
+       actions = flow->stop - flow->start;
        if (actions == 0)
                throw(MAL, "dataflow", "Empty dataflow block");
        /* initialize the eligible statements */
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to