Changeset: 28c4ec1c2ecb for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=28c4ec1c2ecb Modified Files: monetdb5/modules/mal/tablet.c Branch: resultset Log Message:
Avoid buffer overflow, simplier stdin - if the data is retrieved from the client channel, then we don't apply double buffering to avoid messy error messages due to read-ahead. - if a record is not finished, we add blocks until the buffer is filled. We cannot reshape it easily, therefore abort. diffs (288 lines): diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c --- a/monetdb5/modules/mal/tablet.c +++ b/monetdb5/modules/mal/tablet.c @@ -53,10 +53,13 @@ #include <string.h> #include <ctype.h> -/* #define _DEBUG_TABLET_*/ +/*#define _DEBUG_TABLET_ */ +/*#define _DEBUG_TABLET_CNTRL */ #define MAXWORKERS 64 #define MAXBUFFERS 2 +/* We restrict the row length to be 32MB for the time being */ +#define MAXROWSIZE(X) (X > 32*1024*1024 ? X : 32*1024*1024) static MT_Lock errorlock MT_LOCK_INITIALIZER("errorlock"); @@ -687,6 +690,7 @@ typedef struct { char quote; char *base[MAXBUFFERS], *input[MAXBUFFERS]; /* buffers for line splitter and tokenizer */ + size_t rowlimit[MAXBUFFERS]; /* determines maximal record length buffer */ char **lines[MAXBUFFERS]; int top[MAXBUFFERS]; /* number of lines in this buffer */ int cur; /* current buffer used by splitter and update threads */ @@ -774,7 +778,7 @@ SQLinsert_val(READERtask *task, int col, int ret =0; /* include testing on the terminating null byte !! */ - if (fmt->nullstr && strncasecmp(s, fmt->nullstr, fmt->null_length + 1) == 0) { + if ( fmt->nullstr && strncasecmp(s, fmt->nullstr, fmt->null_length + 1) == 0) { adt = fmt->nildata; fmt->c->T->nonil = 0; } else if (quote && *s == quote) { @@ -787,8 +791,8 @@ SQLinsert_val(READERtask *task, int col, adt = fmt->frstr(fmt, fmt->adt, s, e, 0); /* The user might have specified a null string escape * e.g. NULL as '', which should be tested */ - if (adt == NULL && s == e && fmt->nullstr && - strncasecmp(s, fmt->nullstr, fmt->null_length + 1) == 0) { + if (adt == NULL && s == e && + fmt->nullstr && strncasecmp(s, fmt->nullstr, fmt->null_length + 1) == 0) { adt = fmt->nildata; fmt->c->T->nonil = 0; } @@ -898,7 +902,7 @@ SQLload_file_line(READERtask *task, int /* recognize fields starting with a quote, keep them */ if (*line == task->quote) { #ifdef _DEBUG_TABLET_ - MT_lock_set(&errorlock, "insert_val"); + //MT_lock_set(&errorlock, "insert_val"); mnstr_printf(GDKout,"before #1 %s\n", s=line); //MT_lock_unset(&errorlock, "insert_val"); #endif @@ -906,7 +910,7 @@ SQLload_file_line(READERtask *task, int #ifdef _DEBUG_TABLET_ //MT_lock_set(&errorlock, "insert_val"); mnstr_printf(GDKout,"after #1 %s\n",s); - MT_lock_unset(&errorlock, "insert_val"); + //MT_lock_unset(&errorlock, "insert_val"); #endif if (!line) { str errline = SQLload_error(task, task->top[task->cur]); @@ -1115,14 +1119,21 @@ SQLworkdivider(READERtask *task, READERt /* * Reading is handled by a separate task as a preparation for more parallelism. * A buffer is filled with proper lines. + * If we are reading from a file then a double buffering scheme ia activated. + * Reading from the console (stdin) remains single buffered only. + * If we end up with unfinished records, then the rowlimit will terminate the process. */ + static void SQLproducer(void *p) { READERtask *task = (READERtask *) p; - BUN cnt = 0; str msg = 0; + int consoleinput = 0; int cur = 0; // buffer being filled + int blocked[MAXBUFFERS]= {0}; + int ateof[MAXBUFFERS]= {0}; + BUN cnt=0, bufcnt[MAXBUFFERS] = {0}; char *end, *e,*s, *base; const char *rsep= task->rsep; size_t rseplen = strlen(rsep), partial =0; @@ -1131,56 +1142,73 @@ SQLproducer(void *p) thr = THRnew("SQLproducer"); -#ifdef _DEBUG_TABLET_ +#ifdef _DEBUG_TABLET_CNTRL mnstr_printf(GDKout, "#SQLproducer started size %d len %d\n", (int)task->b->size, (int)task->b->len); #endif base = end = s = task->input[cur]; *s = 0; - if (task->b == task->cntxt->fdin) + task->cur = cur; + if (task->b == task->cntxt->fdin){ + consoleinput =1; goto parseSTDIN; - while (cnt < task->maxrow ) { - task->ateof = tablet_read_more(task->b, task->out, task->b->size) == EOF; -#ifdef _DEBUG_TABLET_ - mnstr_printf(GDKout, "#read %d bytes pos = %d eof=%d offset="LLFMT" \n", (int) task->b->len, (int) task->b->pos, task->b->eof, (lng)( s - task->input[cur])); + } + while (cnt <= task->maxrow ) { + ateof[cur] = tablet_read_more(task->b, task->out, task->b->size) == EOF; +#ifdef _DEBUG_TABLET_CNTRL + if( ateof[cur] == 0) + mnstr_printf(GDKout, "#read %d bytes pos = %d eof=%d offset="LLFMT" \n", (int) task->b->len, (int) task->b->pos, task->b->eof, (lng)( s - task->input[cur])); #endif // we may be reading from standard input and may be out of input // warn the consumers - if (task->ateof && partial){ - tablet_error(task, lng_nil, int_nil, "incomplete record at end of file", s); - task->as->error = GDKstrdup("Incomplete record at end of file.\n"); - task->b->pos += partial; + if (ateof[cur] && partial){ + if( partial){ + tablet_error(task, lng_nil, int_nil, "incomplete record at end of file", s); + task->as->error = GDKstrdup("Incomplete record at end of file.\n"); + task->b->pos += partial; + } goto reportlackofinput; } - if (task->ateof ) - goto reportlackofinput; if (task->errbuf && task->errbuf[0]) { msg = catchKernelException(task->cntxt, msg); if (msg) { tablet_error(task, lng_nil, int_nil, msg, "SQLload_file"); -#ifdef _DEBUG_TABLET_ +#ifdef _DEBUG_TABLET_CNTRL mnstr_printf(GDKout,"#bailout on SQLload %s\n",msg); #endif - task->ateof = 1; + ateof[cur] = 1; break; } } parseSTDIN: #ifdef _DEBUG_TABLET_ - {char msg[64]={0}; + if( ateof[cur] ==0) { + char msg[64]={0}; snprintf(msg,63,"%s", task->b->buf+ task->b->pos); mnstr_printf(GDKout, "#parse input:%s\n", msg); } #endif - /* copy the stream buffer into the double sized input buffer */ + /* copy the stream buffer into the input buffer, which is guaranteed larger, but still limited */ partial = 0; + task->top[cur] = 0; + s= task->input[cur]; base = end; + /* avoid too long records */ + if ( end - s + task->b->len - task->b->pos >= task->rowlimit[cur]){ + /* the input buffer should be extended, but 'base' is not shared + between the threads, which we can not now update. + Mimick an ateof instead; */ + tablet_error(task, lng_nil, int_nil, msg, "SQLload_file, record too long"); + ateof[cur] =1; +#ifdef _DEBUG_TABLET_CNTRL + mnstr_printf(GDKout,"#bailout on SQLload confronted with too large record\n"); +#endif + break; + } memcpy(end , task->b->buf + task->b->pos, task->b->len - task->b->pos); end = end + task->b->len - task->b->pos; *end = '\0'; /* this is safe, as the stream ensures an extra byte */ - task->top[cur] = 0; - s= task->input[cur]; /* Note that we rescan from the start of a record (the last * partial buffer from the previous iteration), even if in the * previous iteration we have already established that there @@ -1315,30 +1343,64 @@ parseSTDIN: } } - if( cnt <= task->maxrow){ reportlackofinput: -#ifdef _DEBUG_TABLET_ - mnstr_printf(GDKout, "#SQL producer got buffer filled with %d records \n", task->top[cur]); +#ifdef _DEBUG_TABLET_CNTRL + mnstr_printf(GDKout, "#SQL producer got buffer %d filled with %d records \n", cur, task->top[cur]); #endif + if(consoleinput && cnt <= task->maxrow){ task->cur = cur; + task->ateof = ateof[cur]; + task->cnt = bufcnt[cur]; MT_sema_up(&task->consumer, "SQLconsumer"); MT_sema_down(&task->producer, "SQLproducer"); - cur = (cur+1) % MAXBUFFERS; -#ifdef _DEBUG_TABLET_ - mnstr_printf(GDKout,"#Contiue producer state %d ateof %d buffer %d\n", task->state, task->ateof,cur); + } else + if( cnt <= task->maxrow){ + if( blocked[cur] == 0 && blocked[(cur+1) % MAXBUFFERS ] != 1){ + blocked [cur] = 1; + task->cur = cur; + task->ateof = ateof[cur]; + task->cnt = bufcnt[cur]; +#ifdef _DEBUG_TABLET_CNTRL + mnstr_printf(GDKout,"#Let consumer start on buffer %d ateof %d\n", cur, ateof[cur]); #endif + MT_sema_up(&task->consumer, "SQLconsumer"); + } + + if( blocked[ (cur+1) % MAXBUFFERS]) { +#ifdef _DEBUG_TABLET_CNTRL + mnstr_printf(GDKout,"#wait for consumers to finish buffer %d\n", (cur+1) % MAXBUFFERS); +#endif + MT_sema_down(&task->producer, "SQLproducer"); + blocked[(cur+1)% MAXBUFFERS ] = 0; + blocked [cur] = 1; + task->cur = cur; + task->ateof = ateof[cur]; + task->cnt = bufcnt[cur]; +#ifdef _DEBUG_TABLET_CNTRL + mnstr_printf(GDKout, "#SQL producer got buffer %d filled with %d records \n", cur, task->top[cur]); +#endif + MT_sema_up(&task->consumer, "SQLconsumer"); + cur = (cur+1) % MAXBUFFERS; +#ifdef _DEBUG_TABLET_CNTRL + mnstr_printf(GDKout,"#May continue with buffer %d\n", cur); +#endif + } else + cur = (cur+1) % MAXBUFFERS; } - /* we have seen all tuples requested? */ - if (cnt == task->maxrow){ -#ifdef _DEBUG_TABLET_ - mnstr_printf(GDKout,"#Producer delivered\n"); +#ifdef _DEBUG_TABLET_CNTRL + mnstr_printf(GDKout,"#Continue producer buffer %d\n", cur); +#endif + /* we have seen and sent all tuples requested? */ + if (cnt == task->maxrow && blocked[(cur +1) % MAXBUFFERS] == 0 && blocked[cur] == 0){ +#ifdef _DEBUG_TABLET_CNTRL + mnstr_printf(GDKout,"#Producer delivered all\n"); #endif THRdel(thr); return; } /* we ran out of input? */ - if ( task->ateof){ -#ifdef _DEBUG_TABLET_ + if ( task->ateof ){ +#ifdef _DEBUG_TABLET_CNTRL mnstr_printf(GDKout,"#Producer encountered eof\n"); #endif THRdel(thr); @@ -1346,7 +1408,8 @@ reportlackofinput: } /* consumers ask us to stop? */ if (task->state == ENDOFCOPY ){ -#ifdef _DEBUG_TABLET_ +#ifdef _DEBUG_TABLET_CNTRL + if( ateof[cur] == 0) {char msg[64]={0}; snprintf(msg,63,"%s", task->b->buf+ task->b->pos); mnstr_printf(GDKout, "#SQL producer early exit %s\n", msg); @@ -1355,8 +1418,9 @@ reportlackofinput: THRdel(thr); return; } - task->cnt = cnt; -#ifdef _DEBUG_TABLET_ + bufcnt[cur]= cnt; +#ifdef _DEBUG_TABLET_CNTRL + if(ateof[cur] ==0) {char msg[64]={0}; snprintf(msg,63,"%s", s); mnstr_printf(GDKout, "#shuffle %d: %s\n", (int) strlen(s), msg); @@ -1445,7 +1509,8 @@ SQLload_file(Client cntxt, Tablet *as, b task->time = (lng *) GDKzalloc(as->nr_attrs * sizeof(lng)); task->cur = 0; for(i=0; i< MAXBUFFERS; i++){ - task->base[i] = GDKzalloc(2 * b->size + 2); + task->base[i] = GDKzalloc(MAXROWSIZE(2 * b->size) + 2); + task->rowlimit[i] = MAXROWSIZE(2 * b->size); if ( task->base[i] == 0){ tablet_error(task, lng_nil, int_nil, NULL, "SQLload_file"); goto bailout; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list