Changeset: 60bfed39bef9 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=60bfed39bef9 Modified Files: gdk/gdk_mapreduce.c monetdb5/mal/mal_client.c monetdb5/mal/mal_interpreter.c monetdb5/modules/mal/tablet.c Branch: default Log Message:
sema_init string is name of semaphore, sema_{up,down} string is function. diffs (266 lines): diff --git a/gdk/gdk_mapreduce.c b/gdk/gdk_mapreduce.c --- a/gdk/gdk_mapreduce.c +++ b/gdk/gdk_mapreduce.c @@ -54,19 +54,19 @@ MRqueueCreate(int sz) MT_Id tid; #ifdef NEED_MT_LOCK_INIT - MT_lock_init(&mrqlock, "q_create"); + MT_lock_init(&mrqlock, "mrqlock"); #endif - MT_lock_set(&mrqlock, "q_create"); - MT_sema_init(&mrqsema, 0, "q_create"); + MT_lock_set(&mrqlock, "MRqueueCreate"); + MT_sema_init(&mrqsema, 0, "mrqsema"); if ( mrqueue ) { - MT_lock_unset(&mrqlock, "q_create"); + MT_lock_unset(&mrqlock, "MRqueueCreate"); GDKerror("One map-reduce queue allowed"); return; } sz *= 2; mrqueue = (MRqueue *) GDKzalloc(sizeof(MRqueue) * sz); if ( mrqueue == 0) { - MT_lock_unset(&mrqlock, "q_create"); + MT_lock_unset(&mrqlock, "MRqueueCreate"); GDKerror("Could not create the map-reduce queue"); return; } @@ -75,19 +75,19 @@ MRqueueCreate(int sz) /* create a worker thread for each core as specified as system parameter */ for (i = 0; i < GDKnr_threads; i++) MT_create_thread(&tid, MRworker, (void *) 0, MT_THR_DETACHED); - MT_lock_unset(&mrqlock, "q_create"); + MT_lock_unset(&mrqlock, "MRqueueCreate"); } static void MRenqueue(int taskcnt, MRtask ** tasks) { assert(taskcnt > 0); - MT_lock_set(&mrqlock, "mrqlock"); + MT_lock_set(&mrqlock, "MRenqueue"); if (mrqlast == mrqsize) { mrqsize <<= 1; mrqueue = (MRqueue *) GDKrealloc(mrqueue, sizeof(MRqueue) * mrqsize); if ( mrqueue == 0) { - MT_lock_unset(&mrqlock, "mrqlock"); + MT_lock_unset(&mrqlock, "MRenqueue"); GDKerror("Could not enlarge the map-reduce queue"); return; } @@ -96,10 +96,10 @@ MRenqueue(int taskcnt, MRtask ** tasks) mrqueue[mrqlast].tasks = tasks; mrqueue[mrqlast].size = taskcnt; mrqlast++; - MT_lock_unset(&mrqlock, "mrqlock"); + MT_lock_unset(&mrqlock, "MRenqueue"); /* a task list is added for consumption */ while (taskcnt-- > 0) - MT_sema_up(&mrqsema, "mrqsema"); + MT_sema_up(&mrqsema, "MRenqueue"); } static MRtask * @@ -108,9 +108,9 @@ MRdequeue(void) MRtask *r = NULL; int idx; - MT_sema_down(&mrqsema, "mrqsema"); + MT_sema_down(&mrqsema, "MRdequeue"); assert(mrqlast); - MT_lock_set(&mrqlock, "mrqlock"); + MT_lock_set(&mrqlock, "MRdequeue"); if (mrqlast > 0) { idx = mrqueue[mrqlast - 1].index; r = mrqueue[mrqlast - 1].tasks[idx++]; @@ -119,7 +119,7 @@ MRdequeue(void) else mrqueue[mrqlast - 1].index = idx; } - MT_lock_unset(&mrqlock, "mrqlock"); + MT_lock_unset(&mrqlock, "MRdequeue"); assert(r); return r; } @@ -132,7 +132,7 @@ MRworker(void *arg) do { task = MRdequeue(); (task->cmd) (task); - MT_sema_up(task->sema, "mrqsema"); + MT_sema_up(task->sema, "MRworker"); } while (1); } @@ -147,7 +147,7 @@ MRschedule(int taskcnt, void **arg, void if (mrqueue == 0) MRqueueCreate(1024); - MT_sema_init(&sema, 0, "q_create"); + MT_sema_init(&sema, 0, "sema"); for (i = 0; i < taskcnt; i++) { task[i]->sema = &sema; task[i]->cmd = cmd; @@ -155,5 +155,5 @@ MRschedule(int taskcnt, void **arg, void MRenqueue(taskcnt, task); /* waiting for all report result */ for (i = 0; i < taskcnt; i++) - MT_sema_down(&sema, "mrqsema"); + MT_sema_down(&sema, "MRschedule"); } diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c --- a/monetdb5/mal/mal_client.c +++ b/monetdb5/mal/mal_client.c @@ -246,7 +246,7 @@ MCinitClientRecord(Client c, oid user, b c->rcc = (RecPtr) GDKzalloc(sizeof(RecStat)); c->rcc->curQ = -1; c->exception_buf_initialized = 0; - MT_sema_init(&c->s, 0, "MCinitClient"); + MT_sema_init(&c->s, 0, "Client->s"); return c; } diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c --- a/monetdb5/mal/mal_interpreter.c +++ b/monetdb5/mal/mal_interpreter.c @@ -438,7 +438,7 @@ callMAL(Client cntxt, MalBlkPtr mb, MalS * is determined by an environment variable. It is initially set equal to the * number of cores, which may be too coarse. */ - MT_sema_down(&mal_parallelism,"mal_parallelism"); + MT_sema_down(&mal_parallelism,"callMAL"); runtimeProfileInit(mb, &runtimeProfile, cntxt->flags & memoryFlag); #ifdef DEBUG_CALLMAL mnstr_printf(cntxt->fdout, "callMAL\n"); @@ -477,10 +477,10 @@ callMAL(Client cntxt, MalBlkPtr mb, MalS case PATcall: case CMDcall: default: - MT_sema_up(&mal_parallelism,"mal_parallelism"); + MT_sema_up(&mal_parallelism,"callMAL"); throw(MAL, "mal.interpreter", RUNTIME_UNKNOWN_INSTRUCTION); } - MT_sema_up(&mal_parallelism,"mal_parallelism"); + MT_sema_up(&mal_parallelism,"callMAL"); if (cntxt->qtimeout && time(NULL) - stk->clock.tv_usec > cntxt->qtimeout) throw(MAL, "mal.interpreter", RUNTIME_QRY_TIMEOUT); return ret; diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c --- a/monetdb5/modules/mal/tablet.c +++ b/monetdb5/modules/mal/tablet.c @@ -1330,14 +1330,14 @@ SQLloader(void *p) mnstr_printf(GDKout, "SQLloader started\n"); #endif while (task->ateof == 0) { - MT_sema_down(&task->producer, "tablet loader"); + MT_sema_down(&task->producer, "SQLloader"); #ifdef _DEBUG_TABLET_ mnstr_printf(GDKout, "SQL loader got buffer \n"); #endif if (task->ateof) /* forced exit received */ break; task->ateof = tablet_read_more(task->b, task->out, task->b->size - (task->b->len - task->b->pos)) == EOF; - MT_sema_up(&task->consumer, "tablet loader"); + MT_sema_up(&task->consumer, "SQLloader"); } } @@ -1395,8 +1395,8 @@ SQLload_file(Client cntxt, Tablet *as, b task->input = task->base + 1; /* wrap the buffer with null bytes */ task->base[b->size + 1] = 0; - MT_sema_init(&task->consumer, 0, "tablet load"); - MT_sema_init(&task->producer, 0, "tablet load"); + MT_sema_init(&task->consumer, 0, "task->consumer"); + MT_sema_init(&task->producer, 0, "task->producer"); task->ateof = 0; task->b = b; task->out = out; @@ -1443,8 +1443,8 @@ SQLload_file(Client cntxt, Tablet *as, b #ifdef MLOCK_TST mlock(ptask[j].cols, sizeof(char *) * task->limit); #endif - MT_sema_init(&ptask[j].sema, 0, "sqlworker"); - MT_sema_init(&ptask[j].reply, 0, "sqlworker"); + MT_sema_init(&ptask[j].sema, 0, "ptask[j].sema"); + MT_sema_init(&ptask[j].reply, 0, "ptask[j].reply"); MT_create_thread(&ptask[j].tid, SQLworker, (void *) &ptask[j], MT_THR_JOINABLE); } @@ -1454,8 +1454,8 @@ SQLload_file(Client cntxt, Tablet *as, b #endif tio = GDKusec(); - MT_sema_up(&task->producer, "tablet load"); - MT_sema_down(&task->consumer, "tablet loader"); + MT_sema_up(&task->producer, "SQLload_file"); + MT_sema_down(&task->consumer, "SQLload_file"); tio = GDKusec() - tio; t1 = GDKusec(); #ifdef MLOCK_TST @@ -1590,7 +1590,7 @@ SQLload_file(Client cntxt, Tablet *as, b } } /* start feeding new data */ - MT_sema_up(&task->producer, "tablet load"); + MT_sema_up(&task->producer, "SQLload_file"); t1 = GDKusec() - t1; total += t1; iototal += tio; @@ -1607,13 +1607,13 @@ SQLload_file(Client cntxt, Tablet *as, b ptask[j].next = task->next; ptask[j].fields = task->fields; ptask[j].limit = task->limit; - MT_sema_up(&ptask[j].sema, "SQLworker"); + MT_sema_up(&ptask[j].sema, "SQLload_file"); } } if (task->next) { /* await completion of line break phase */ for (j = 0; j < threads; j++) { - MT_sema_down(&ptask[j].reply, "sqlreader"); + MT_sema_down(&ptask[j].reply, "SQLload_file"); if (ptask[j].error) res = -1; } @@ -1627,7 +1627,7 @@ SQLload_file(Client cntxt, Tablet *as, b for (j = 0; j < threads; j++) { /* stage two, update the BATs */ ptask[j].state = UPDATEBAT; - MT_sema_up(&ptask[j].sema, "SQLworker"); + MT_sema_up(&ptask[j].sema, "SQLload_file"); } } } @@ -1642,11 +1642,11 @@ SQLload_file(Client cntxt, Tablet *as, b if (res == 0 && task->next) { /* await completion of the BAT updates */ for (j = 0; j < threads; j++) - MT_sema_down(&ptask[j].reply, "sqlreader"); + MT_sema_down(&ptask[j].reply, "SQLload_file"); } if (task->ateof) break; - MT_sema_down(&task->consumer, "tablet loader"); + MT_sema_down(&task->consumer, "SQLload_file"); } if (task->b->pos < task->b->len && cnt < (BUN) maxrow && task->ateof) { @@ -1673,14 +1673,14 @@ SQLload_file(Client cntxt, Tablet *as, b } task->ateof = 1; - MT_sema_up(&task->producer, "tablet load"); + MT_sema_up(&task->producer, "SQLload_file"); for (j = 0; j < threads; j++) { ptask[j].next = -1; - MT_sema_up(&ptask[j].sema, "SQLworker"); + MT_sema_up(&ptask[j].sema, "SQLload_file"); } /* wait for their death */ for (j = 0; j < threads; j++) - MT_sema_down(&ptask[j].reply, "sqlreader"); + MT_sema_down(&ptask[j].reply, "SQLload_file"); #ifdef _DEBUG_TABLET_ mnstr_printf(GDKout, "Kill the workers\n"); #endif _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list