Changeset: 28c4ec1c2ecb for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=28c4ec1c2ecb
Modified Files:
        monetdb5/modules/mal/tablet.c
Branch: resultset
Log Message:

Avoid buffer overflow, simplier stdin
- if the data is retrieved from the client channel,
then we don't apply double buffering to avoid
messy error messages due to read-ahead.
- if a record is not finished, we add blocks until
the buffer is filled. We cannot reshape it easily,
therefore abort.


diffs (288 lines):

diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -53,10 +53,13 @@
 #include <string.h>
 #include <ctype.h>
 
-/* #define _DEBUG_TABLET_*/
+/*#define _DEBUG_TABLET_ */
+/*#define _DEBUG_TABLET_CNTRL */
 
 #define MAXWORKERS     64
 #define MAXBUFFERS 2
+/* We restrict the row length to be 32MB for the time being */
+#define MAXROWSIZE(X) (X > 32*1024*1024 ? X : 32*1024*1024)
 
 static MT_Lock errorlock MT_LOCK_INITIALIZER("errorlock");
 
@@ -687,6 +690,7 @@ typedef struct {
        char quote;
 
        char *base[MAXBUFFERS], *input[MAXBUFFERS];     /* buffers for line 
splitter and tokenizer */
+       size_t rowlimit[MAXBUFFERS];                            /* determines 
maximal record length buffer */
        char **lines[MAXBUFFERS];
        int top[MAXBUFFERS];                            /* number of lines in 
this buffer */
        int cur;                                        /* current buffer used 
by splitter and update threads */
@@ -774,7 +778,7 @@ SQLinsert_val(READERtask *task, int col,
        int ret  =0;
 
        /* include testing on the terminating null byte !! */
-       if (fmt->nullstr && strncasecmp(s, fmt->nullstr, fmt->null_length + 1) 
== 0) {
+       if ( fmt->nullstr && strncasecmp(s, fmt->nullstr, fmt->null_length + 1) 
== 0) {
                adt = fmt->nildata;
                fmt->c->T->nonil = 0;
        } else if (quote && *s == quote) {
@@ -787,8 +791,8 @@ SQLinsert_val(READERtask *task, int col,
                adt = fmt->frstr(fmt, fmt->adt, s, e, 0);
                /* The user might have specified a null string escape
                 * e.g. NULL as '', which should be tested */
-               if (adt == NULL && s == e && fmt->nullstr &&
-                       strncasecmp(s, fmt->nullstr, fmt->null_length + 1) == 
0) {
+               if (adt == NULL && s == e && 
+                       fmt->nullstr && strncasecmp(s, fmt->nullstr, 
fmt->null_length + 1) == 0) {
                        adt = fmt->nildata;
                        fmt->c->T->nonil = 0;
                }
@@ -898,7 +902,7 @@ SQLload_file_line(READERtask *task, int 
                        /* recognize fields starting with a quote, keep them */
                        if (*line == task->quote) {
 #ifdef _DEBUG_TABLET_
-       MT_lock_set(&errorlock, "insert_val");
+       //MT_lock_set(&errorlock, "insert_val");
        mnstr_printf(GDKout,"before #1 %s\n", s=line);
        //MT_lock_unset(&errorlock, "insert_val");
 #endif
@@ -906,7 +910,7 @@ SQLload_file_line(READERtask *task, int 
 #ifdef _DEBUG_TABLET_
        //MT_lock_set(&errorlock, "insert_val");
        mnstr_printf(GDKout,"after #1 %s\n",s);
-       MT_lock_unset(&errorlock, "insert_val");
+       //MT_lock_unset(&errorlock, "insert_val");
 #endif
                                if (!line) {
                                        str errline = SQLload_error(task, 
task->top[task->cur]);
@@ -1115,14 +1119,21 @@ SQLworkdivider(READERtask *task, READERt
 /*
  * Reading is handled by a separate task as a preparation for more parallelism.
  * A buffer is filled with proper lines.
+ * If we are reading from a file then a double buffering scheme ia activated.
+ * Reading from the console (stdin) remains single buffered only.
+ * If we end up with unfinished records, then the rowlimit will terminate the 
process.
  */
+
 static void
 SQLproducer(void *p)
 {
        READERtask *task = (READERtask *) p;
-       BUN cnt = 0;
        str msg = 0;
+       int consoleinput = 0;
        int cur = 0; // buffer being filled
+       int blocked[MAXBUFFERS]= {0};
+       int ateof[MAXBUFFERS]= {0};
+       BUN cnt=0, bufcnt[MAXBUFFERS] = {0};
        char *end, *e,*s, *base;
        const char *rsep= task->rsep;
        size_t rseplen = strlen(rsep), partial =0;
@@ -1131,56 +1142,73 @@ SQLproducer(void *p)
 
        thr = THRnew("SQLproducer");
 
-#ifdef _DEBUG_TABLET_
+#ifdef _DEBUG_TABLET_CNTRL
        mnstr_printf(GDKout, "#SQLproducer started size %d len %d\n", 
(int)task->b->size,  (int)task->b->len);
 #endif
        base = end = s = task->input[cur];
        *s = 0;
-       if (task->b == task->cntxt->fdin)
+       task->cur = cur;
+       if (task->b == task->cntxt->fdin){
+               consoleinput =1;
                goto parseSTDIN;
-       while (cnt < task->maxrow ) {
-               task->ateof = tablet_read_more(task->b, task->out, 
task->b->size) == EOF;
-#ifdef _DEBUG_TABLET_
-               mnstr_printf(GDKout, "#read %d bytes pos = %d eof=%d 
offset="LLFMT" \n", (int) task->b->len, (int) task->b->pos, task->b->eof, 
(lng)( s - task->input[cur]));
+       }
+       while (cnt <= task->maxrow ) {
+               ateof[cur] = tablet_read_more(task->b, task->out, 
task->b->size) == EOF;
+#ifdef _DEBUG_TABLET_CNTRL
+               if( ateof[cur] == 0)
+                       mnstr_printf(GDKout, "#read %d bytes pos = %d eof=%d 
offset="LLFMT" \n", (int) task->b->len, (int) task->b->pos, task->b->eof, 
(lng)( s - task->input[cur]));
 #endif
                // we may be reading from standard input and may be out of input
                // warn the consumers
-               if (task->ateof && partial){
-                       tablet_error(task, lng_nil, int_nil, "incomplete record 
at end of file", s);
-                       task->as->error = GDKstrdup("Incomplete record at end 
of file.\n");
-                       task->b->pos += partial;
+               if (ateof[cur] && partial){
+                       if( partial){
+                               tablet_error(task, lng_nil, int_nil, 
"incomplete record at end of file", s);
+                               task->as->error = GDKstrdup("Incomplete record 
at end of file.\n");
+                               task->b->pos += partial;
+                       }
                        goto reportlackofinput;
                } 
-               if (task->ateof )
-                       goto reportlackofinput;
 
                if (task->errbuf && task->errbuf[0]) {
                        msg = catchKernelException(task->cntxt, msg);
                        if (msg) {
                                tablet_error(task, lng_nil, int_nil, msg, 
"SQLload_file");
-#ifdef _DEBUG_TABLET_
+#ifdef _DEBUG_TABLET_CNTRL
                                mnstr_printf(GDKout,"#bailout on SQLload 
%s\n",msg);
 #endif
-                               task->ateof = 1;
+                               ateof[cur] = 1;
                                break;
                        }
                }
 
 parseSTDIN:
 #ifdef _DEBUG_TABLET_
-               {char msg[64]={0};
+               if( ateof[cur] ==0) {
+                char msg[64]={0};
                 snprintf(msg,63,"%s", task->b->buf+ task->b->pos);
                 mnstr_printf(GDKout, "#parse input:%s\n", msg);
                }
 #endif
-               /* copy the stream buffer into the double sized input buffer */
+               /* copy the stream buffer into the input buffer, which is 
guaranteed larger, but still limited */
                partial = 0;
+               task->top[cur] = 0;
+               s= task->input[cur];
                base = end;
+               /* avoid too long records */
+               if ( end - s + task->b->len - task->b->pos >= 
task->rowlimit[cur]){
+                       /* the input buffer should be extended, but 'base' is 
not shared
+                               between the threads, which we can not now 
update.
+                               Mimick an ateof instead; */
+                       tablet_error(task, lng_nil, int_nil, msg, 
"SQLload_file, record too long");
+                       ateof[cur] =1;
+#ifdef _DEBUG_TABLET_CNTRL
+                       mnstr_printf(GDKout,"#bailout on SQLload confronted 
with too large record\n");
+#endif
+                       break;
+               }
                memcpy(end , task->b->buf + task->b->pos, task->b->len - 
task->b->pos);
                end = end + task->b->len - task->b->pos;
                *end = '\0';                    /* this is safe, as the stream 
ensures an extra byte */
-               task->top[cur] = 0;
-               s= task->input[cur];
                /* Note that we rescan from the start of a record (the last
                 * partial buffer from the previous iteration), even if in the
                 * previous iteration we have already established that there
@@ -1315,30 +1343,64 @@ parseSTDIN:
                        }
                }
 
-               if( cnt <= task->maxrow){
 reportlackofinput:
-#ifdef _DEBUG_TABLET_
-                       mnstr_printf(GDKout, "#SQL producer got buffer filled 
with %d records \n", task->top[cur]);
+#ifdef _DEBUG_TABLET_CNTRL
+                       mnstr_printf(GDKout, "#SQL producer got buffer %d 
filled with %d records \n", cur, task->top[cur]);
 #endif
+               if(consoleinput && cnt <= task->maxrow){
                        task->cur = cur;
+                       task->ateof = ateof[cur];
+                       task->cnt = bufcnt[cur];
                        MT_sema_up(&task->consumer, "SQLconsumer");
                        MT_sema_down(&task->producer, "SQLproducer");
-                       cur = (cur+1) % MAXBUFFERS;
-#ifdef _DEBUG_TABLET_
-                       mnstr_printf(GDKout,"#Contiue producer state %d ateof 
%d buffer %d\n", task->state, task->ateof,cur);
+               } else
+               if( cnt <= task->maxrow){
+                       if( blocked[cur] == 0 && blocked[(cur+1) % MAXBUFFERS ] 
!= 1){
+                               blocked [cur] = 1;
+                               task->cur = cur;
+                               task->ateof = ateof[cur];
+                               task->cnt = bufcnt[cur];
+#ifdef _DEBUG_TABLET_CNTRL
+                               mnstr_printf(GDKout,"#Let consumer start on 
buffer %d ateof %d\n", cur, ateof[cur]);
 #endif
+                               MT_sema_up(&task->consumer, "SQLconsumer");
+                       }
+
+                       if( blocked[ (cur+1) % MAXBUFFERS]) {
+#ifdef _DEBUG_TABLET_CNTRL
+                               mnstr_printf(GDKout,"#wait for consumers to 
finish buffer %d\n", (cur+1) % MAXBUFFERS);
+#endif
+                               MT_sema_down(&task->producer, "SQLproducer");
+                               blocked[(cur+1)% MAXBUFFERS ] = 0;
+                               blocked [cur] = 1;
+                               task->cur = cur;
+                               task->ateof = ateof[cur];
+                               task->cnt = bufcnt[cur];
+#ifdef _DEBUG_TABLET_CNTRL
+                               mnstr_printf(GDKout, "#SQL producer got buffer 
%d filled with %d records \n", cur, task->top[cur]);
+#endif
+                               MT_sema_up(&task->consumer, "SQLconsumer");
+                               cur = (cur+1) % MAXBUFFERS;
+#ifdef _DEBUG_TABLET_CNTRL
+                               mnstr_printf(GDKout,"#May continue with buffer 
%d\n", cur);
+#endif
+                       } else
+                               cur = (cur+1) % MAXBUFFERS;
                }
-               /* we have seen all tuples requested? */
-               if (cnt == task->maxrow){
-#ifdef _DEBUG_TABLET_
-                       mnstr_printf(GDKout,"#Producer delivered\n");
+#ifdef _DEBUG_TABLET_CNTRL
+               mnstr_printf(GDKout,"#Continue producer buffer %d\n", cur);
+#endif
+               /* we have seen and sent all tuples requested? */
+               if (cnt == task->maxrow && blocked[(cur +1) % MAXBUFFERS] == 0 
&& blocked[cur] == 0){
+#ifdef _DEBUG_TABLET_CNTRL
+                       mnstr_printf(GDKout,"#Producer delivered all\n");
 #endif
                        THRdel(thr);
                        return;
                }
                /* we ran out of input? */
-               if ( task->ateof){
-#ifdef _DEBUG_TABLET_
+               if ( task->ateof ){
+#ifdef _DEBUG_TABLET_CNTRL
                        mnstr_printf(GDKout,"#Producer encountered eof\n");
 #endif
                        THRdel(thr);
@@ -1346,7 +1408,8 @@ reportlackofinput:
                }
                /* consumers ask us to stop? */
                if (task->state == ENDOFCOPY ){
-#ifdef _DEBUG_TABLET_
+#ifdef _DEBUG_TABLET_CNTRL
+                       if( ateof[cur] == 0)
                        {char msg[64]={0};
                         snprintf(msg,63,"%s", task->b->buf+ task->b->pos);
                         mnstr_printf(GDKout, "#SQL producer early exit %s\n", 
msg);
@@ -1355,8 +1418,9 @@ reportlackofinput:
                        THRdel(thr);
                        return;
                }
-               task->cnt = cnt;
-#ifdef _DEBUG_TABLET_
+               bufcnt[cur]= cnt;
+#ifdef _DEBUG_TABLET_CNTRL
+               if(ateof[cur] ==0)
                {char msg[64]={0};
                 snprintf(msg,63,"%s", s);
                 mnstr_printf(GDKout, "#shuffle %d: %s\n", (int) strlen(s),  
msg);
@@ -1445,7 +1509,8 @@ SQLload_file(Client cntxt, Tablet *as, b
        task->time = (lng *) GDKzalloc(as->nr_attrs * sizeof(lng));
        task->cur = 0;
        for(i=0; i< MAXBUFFERS; i++){
-               task->base[i] = GDKzalloc(2 * b->size + 2);
+               task->base[i] = GDKzalloc(MAXROWSIZE(2 * b->size) + 2);
+               task->rowlimit[i] = MAXROWSIZE(2 * b->size);
                if ( task->base[i] == 0){
                        tablet_error(task, lng_nil, int_nil, NULL, 
"SQLload_file");
                        goto bailout;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to