Changeset: 940bd2cd4328 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=940bd2cd4328 Modified Files: monetdb5/modules/mal/tablet.c Branch: resultset Log Message:
Fixes to COPY INTO code. - Wait for SQLproducer to exit (potential crashes otherwise); - Avoid deadlock in debug code; - Simplify waiting for other buffer. diffs (155 lines): diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c --- a/monetdb5/modules/mal/tablet.c +++ b/monetdb5/modules/mal/tablet.c @@ -894,16 +894,12 @@ SQLload_parse_line(READERtask *task, int if (*line == task->quote) { skip = 1; #ifdef _DEBUG_TABLET_ - //MT_lock_set(&errorlock, "insert_val"); mnstr_printf(GDKout, "before #1 %s\n", s = line); - //MT_lock_unset(&errorlock, "insert_val"); #endif task->fields[i][idx] = line + 1; line = tablet_skip_string(line + 1, task->quote); #ifdef _DEBUG_TABLET_ - //MT_lock_set(&errorlock, "insert_val"); mnstr_printf(GDKout, "after #1 %s\n", s); - //MT_lock_unset(&errorlock, "insert_val"); #endif if (!line) { str errline = SQLload_error(task, task->top[task->cur]); @@ -957,9 +953,7 @@ SQLload_parse_line(READERtask *task, int for (i = 0; i < as->nr_attrs; i++) { task->fields[i][idx] = line; #ifdef _DEBUG_TABLET_ - MT_lock_set(&errorlock, "insert_val"); mnstr_printf(GDKout, "before #2 %s\n", line); - //MT_lock_unset(&errorlock, "insert_val"); #endif /* eat away the column separator */ for (; *line; line++) @@ -972,9 +966,7 @@ SQLload_parse_line(READERtask *task, int goto endoffield2; } #ifdef _DEBUG_TABLET_ - //MT_lock_set(&errorlock, "insert_val"); mnstr_printf(GDKout, "#after #23 %s\n", line); - MT_lock_unset(&errorlock, "insert_val"); #endif /* not enough fields */ if (i < as->nr_attrs - 1) { @@ -1160,7 +1152,7 @@ SQLproducer(void *p) consoleinput = 1; goto parseSTDIN; } - while (cnt <= task->maxrow) { + for (;;) { ateof[cur] = tablet_read_more(task->b, task->out, task->b->size) == EOF; #ifdef _DEBUG_TABLET_CNTRL if (ateof[cur] == 0) @@ -1360,7 +1352,7 @@ SQLproducer(void *p) mnstr_printf(GDKout, "#SQL producer got buffer %d filled with %d records \n", cur, task->top[cur]); #endif - if (consoleinput && cnt <= task->maxrow) { + if (consoleinput) { task->cur = cur; task->ateof = ateof[cur]; task->cnt = bufcnt[cur]; @@ -1368,57 +1360,53 @@ SQLproducer(void *p) MT_sema_up(&task->consumer, "SQLconsumer"); /* then wait until it is done */ MT_sema_down(&task->producer, "SQLproducer"); - } else if (cnt <= task->maxrow) { - if (blocked[cur] == 0 && blocked[(cur + 1) % MAXBUFFERS] != 1) { - blocked[cur] = 1; - task->cur = cur; - task->ateof = ateof[cur]; - task->cnt = bufcnt[cur]; -#ifdef _DEBUG_TABLET_CNTRL - mnstr_printf(GDKout, "#Let consumer start on buffer %d ateof %d\n", - cur, ateof[cur]); -#endif - MT_sema_up(&task->consumer, "SQLconsumer"); + if (cnt == task->maxrow) { + THRdel(thr); + return; } - + } else { + assert(!blocked[cur]); if (blocked[(cur + 1) % MAXBUFFERS]) { + /* first wait until other buffer is done */ #ifdef _DEBUG_TABLET_CNTRL mnstr_printf(GDKout, "#wait for consumers to finish buffer %d\n", (cur + 1) % MAXBUFFERS); #endif MT_sema_down(&task->producer, "SQLproducer"); - if (task->state == ENDOFCOPY || task->ateof) { + blocked[(cur + 1) % MAXBUFFERS] = 0; + if (task->state == ENDOFCOPY) { THRdel(thr); return; } - blocked[(cur + 1) % MAXBUFFERS] = 0; - blocked[cur] = 1; - task->cur = cur; - task->ateof = ateof[cur]; - task->cnt = bufcnt[cur]; + } + /* other buffer is done, proceed with current buffer */ + assert(!blocked[(cur + 1) % MAXBUFFERS]); + blocked[cur] = 1; + task->cur = cur; + task->ateof = ateof[cur]; + task->cnt = bufcnt[cur]; #ifdef _DEBUG_TABLET_CNTRL - mnstr_printf(GDKout, "#SQL producer got buffer %d filled with %d records \n", - cur, task->top[cur]); + mnstr_printf(GDKout, "#SQL producer got buffer %d filled with %d records \n", + cur, task->top[cur]); #endif - MT_sema_up(&task->consumer, "SQLconsumer"); - cur = (cur + 1) % MAXBUFFERS; + MT_sema_up(&task->consumer, "SQLconsumer"); + + cur = (cur + 1) % MAXBUFFERS; #ifdef _DEBUG_TABLET_CNTRL - mnstr_printf(GDKout, "#May continue with buffer %d\n", cur); + mnstr_printf(GDKout, "#May continue with buffer %d\n", cur); #endif - } else - cur = (cur + 1) % MAXBUFFERS; + if (cnt == task->maxrow) { + MT_sema_down(&task->producer, "SQLproducer"); +#ifdef _DEBUG_TABLET_CNTRL + mnstr_printf(GDKout, "#Producer delivered all\n"); +#endif + THRdel(thr); + return; + } } #ifdef _DEBUG_TABLET_CNTRL mnstr_printf(GDKout, "#Continue producer buffer %d\n", cur); #endif - /* we have seen and sent all tuples requested? */ - if (cnt == task->maxrow && blocked[(cur + 1) % MAXBUFFERS] == 0 && blocked[cur] == 0) { -#ifdef _DEBUG_TABLET_CNTRL - mnstr_printf(GDKout, "#Producer delivered all\n"); -#endif - THRdel(thr); - return; - } /* we ran out of input? */ if (task->ateof) { #ifdef _DEBUG_TABLET_CNTRL @@ -1792,8 +1780,8 @@ SQLload_file(Client cntxt, Tablet *as, b mnstr_printf(GDKout, "#Shut down reader\n"); #endif MT_sema_up(&task->producer, "SQLload_file"); - MT_join_thread(task->tid); } + MT_join_thread(task->tid); // await completion of the BAT syncs for (j = 0; j < threads; j++) MT_sema_down(&ptask[j].reply, "SQLload_file"); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list