Changeset: 940bd2cd4328 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=940bd2cd4328
Modified Files:
        monetdb5/modules/mal/tablet.c
Branch: resultset
Log Message:

Fixes to COPY INTO code.
- Wait for SQLproducer to exit (potential crashes otherwise);
- Avoid deadlock in debug code;
- Simplify waiting for other buffer.


diffs (155 lines):

diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -894,16 +894,12 @@ SQLload_parse_line(READERtask *task, int
                        if (*line == task->quote) {
                                skip = 1;
 #ifdef _DEBUG_TABLET_
-                               //MT_lock_set(&errorlock, "insert_val");
                                mnstr_printf(GDKout, "before #1 %s\n", s = 
line);
-                               //MT_lock_unset(&errorlock, "insert_val");
 #endif
                                task->fields[i][idx] = line + 1;
                                line = tablet_skip_string(line + 1, 
task->quote);
 #ifdef _DEBUG_TABLET_
-                               //MT_lock_set(&errorlock, "insert_val");
                                mnstr_printf(GDKout, "after #1 %s\n", s);
-                               //MT_lock_unset(&errorlock, "insert_val");
 #endif
                                if (!line) {
                                        str errline = SQLload_error(task, 
task->top[task->cur]);
@@ -957,9 +953,7 @@ SQLload_parse_line(READERtask *task, int
        for (i = 0; i < as->nr_attrs; i++) {
                task->fields[i][idx] = line;
 #ifdef _DEBUG_TABLET_
-               MT_lock_set(&errorlock, "insert_val");
                mnstr_printf(GDKout, "before #2 %s\n", line);
-               //MT_lock_unset(&errorlock, "insert_val");
 #endif
                /* eat away the column separator */
                for (; *line; line++)
@@ -972,9 +966,7 @@ SQLload_parse_line(READERtask *task, int
                                goto endoffield2;
                        }
 #ifdef _DEBUG_TABLET_
-               //MT_lock_set(&errorlock, "insert_val");
                mnstr_printf(GDKout, "#after #23 %s\n", line);
-               MT_lock_unset(&errorlock, "insert_val");
 #endif
                /* not enough fields */
                if (i < as->nr_attrs - 1) {
@@ -1160,7 +1152,7 @@ SQLproducer(void *p)
                consoleinput = 1;
                goto parseSTDIN;
        }
-       while (cnt <= task->maxrow) {
+       for (;;) {
                ateof[cur] = tablet_read_more(task->b, task->out, 
task->b->size) == EOF;
 #ifdef _DEBUG_TABLET_CNTRL
                if (ateof[cur] == 0)
@@ -1360,7 +1352,7 @@ SQLproducer(void *p)
                mnstr_printf(GDKout, "#SQL producer got buffer %d filled with 
%d records \n",
                                         cur, task->top[cur]);
 #endif
-               if (consoleinput && cnt <= task->maxrow) {
+               if (consoleinput) {
                        task->cur = cur;
                        task->ateof = ateof[cur];
                        task->cnt = bufcnt[cur];
@@ -1368,57 +1360,53 @@ SQLproducer(void *p)
                        MT_sema_up(&task->consumer, "SQLconsumer");
                        /* then wait until it is done */
                        MT_sema_down(&task->producer, "SQLproducer");
-               } else if (cnt <= task->maxrow) {
-                       if (blocked[cur] == 0 && blocked[(cur + 1) % 
MAXBUFFERS] != 1) {
-                               blocked[cur] = 1;
-                               task->cur = cur;
-                               task->ateof = ateof[cur];
-                               task->cnt = bufcnt[cur];
-#ifdef _DEBUG_TABLET_CNTRL
-                               mnstr_printf(GDKout, "#Let consumer start on 
buffer %d ateof %d\n",
-                                                        cur, ateof[cur]);
-#endif
-                               MT_sema_up(&task->consumer, "SQLconsumer");
+                       if (cnt == task->maxrow) {
+                               THRdel(thr);
+                               return;
                        }
-
+               } else {
+                       assert(!blocked[cur]);
                        if (blocked[(cur + 1) % MAXBUFFERS]) {
+                               /* first wait until other buffer is done */
 #ifdef _DEBUG_TABLET_CNTRL
                                mnstr_printf(GDKout, "#wait for consumers to 
finish buffer %d\n",
                                                         (cur + 1) % 
MAXBUFFERS);
 #endif
                                MT_sema_down(&task->producer, "SQLproducer");
-                               if (task->state == ENDOFCOPY || task->ateof) {
+                               blocked[(cur + 1) % MAXBUFFERS] = 0;
+                               if (task->state == ENDOFCOPY) {
                                        THRdel(thr);
                                        return;
                                }
-                               blocked[(cur + 1) % MAXBUFFERS] = 0;
-                               blocked[cur] = 1;
-                               task->cur = cur;
-                               task->ateof = ateof[cur];
-                               task->cnt = bufcnt[cur];
+                       }
+                       /* other buffer is done, proceed with current buffer */
+                       assert(!blocked[(cur + 1) % MAXBUFFERS]);
+                       blocked[cur] = 1;
+                       task->cur = cur;
+                       task->ateof = ateof[cur];
+                       task->cnt = bufcnt[cur];
 #ifdef _DEBUG_TABLET_CNTRL
-                               mnstr_printf(GDKout, "#SQL producer got buffer 
%d filled with %d records \n",
-                                                        cur, task->top[cur]);
+                       mnstr_printf(GDKout, "#SQL producer got buffer %d 
filled with %d records \n",
+                                                cur, task->top[cur]);
 #endif
-                               MT_sema_up(&task->consumer, "SQLconsumer");
-                               cur = (cur + 1) % MAXBUFFERS;
+                       MT_sema_up(&task->consumer, "SQLconsumer");
+
+                       cur = (cur + 1) % MAXBUFFERS;
 #ifdef _DEBUG_TABLET_CNTRL
-                               mnstr_printf(GDKout, "#May continue with buffer 
%d\n", cur);
+                       mnstr_printf(GDKout, "#May continue with buffer %d\n", 
cur);
 #endif
-                       } else
-                               cur = (cur + 1) % MAXBUFFERS;
+                       if (cnt == task->maxrow) {
+                               MT_sema_down(&task->producer, "SQLproducer");
+#ifdef _DEBUG_TABLET_CNTRL
+                               mnstr_printf(GDKout, "#Producer delivered 
all\n");
+#endif
+                               THRdel(thr);
+                               return;
+                       }
                }
 #ifdef _DEBUG_TABLET_CNTRL
                mnstr_printf(GDKout, "#Continue producer buffer %d\n", cur);
 #endif
-               /* we have seen and sent all tuples requested? */
-               if (cnt == task->maxrow && blocked[(cur + 1) % MAXBUFFERS] == 0 
&& blocked[cur] == 0) {
-#ifdef _DEBUG_TABLET_CNTRL
-                       mnstr_printf(GDKout, "#Producer delivered all\n");
-#endif
-                       THRdel(thr);
-                       return;
-               }
                /* we ran out of input? */
                if (task->ateof) {
 #ifdef _DEBUG_TABLET_CNTRL
@@ -1792,8 +1780,8 @@ SQLload_file(Client cntxt, Tablet *as, b
                mnstr_printf(GDKout, "#Shut down reader\n");
 #endif
                MT_sema_up(&task->producer, "SQLload_file");
-               MT_join_thread(task->tid);
        }
+       MT_join_thread(task->tid);
        // await completion of the BAT syncs
        for (j = 0; j < threads; j++)
                MT_sema_down(&ptask[j].reply, "SQLload_file");
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to