Changeset: cd474532e378 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=cd474532e378
Modified Files:
        common/stream/stream.c
        common/stream/stream.h
Branch: fixed-width-format
Log Message:

new fwf stream wrapper


diffs (160 lines):

diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -4671,3 +4671,146 @@ stream * stream_blackhole_create (void)
        s->access = ST_WRITE;
        return s;
 }
+
+
+/* fixed-width format streams */
+
+#define STREAM_FWF_NAME "fwf"
+#define STREAM_FWF_FIELD_SEP '|'
+#define STREAM_FWF_ESCAPE '\\'
+#define STREAM_FWF_RECORD_SEP '\n'
+
+typedef struct {
+       stream *s;
+       // config
+       size_t num_fields;
+       size_t *widths;
+       char filler;
+       // state
+       size_t line_len;
+       char* in_buf;
+       char* out_buf;
+       size_t out_buf_start;
+       size_t out_buf_remaining;
+} stream_fwf_data;
+
+
+static ssize_t
+stream_fwf_read(stream *s, void *buf, size_t elmsize, size_t cnt)
+{
+       stream_fwf_data *fsd;
+       size_t to_write = cnt;
+       size_t buf_written = 0;
+       if (strcmp(s->name, STREAM_FWF_NAME) != 0 || elmsize != 1) {
+               return -1;
+       }
+       fsd = (stream_fwf_data*) s->stream_data.p;
+
+       while (to_write > 0) {
+               // input conversion
+               if (fsd->out_buf_remaining == 0) { // need to convert next line
+                       size_t field_idx, in_buf_pos = 0, out_buf_pos = 0;
+                       ssize_t actually_read = fsd->s->read(fsd->s, 
fsd->in_buf, 1, fsd->line_len);
+                       if (actually_read < (ssize_t) fsd->line_len) { // 
incomplete last line
+                               if (actually_read < 0) {
+                                       return actually_read; // this is an 
error
+                               }
+                               return buf_written; // skip last line
+                       }
+                       for (field_idx = 0; field_idx < fsd->num_fields; 
field_idx++) {
+                               char *val_start, *val_end;
+                               val_start = fsd->in_buf + in_buf_pos;
+                               in_buf_pos += fsd->widths[field_idx];
+                               val_end = fsd->in_buf + in_buf_pos - 1;
+                               while (*val_start == fsd->filler) val_start++;
+                               while (*val_end == fsd->filler) val_end--;
+                               while (val_start <= val_end) {
+                                       if (*val_start == STREAM_FWF_FIELD_SEP) 
{
+                                               fsd->out_buf[out_buf_pos++] = 
STREAM_FWF_ESCAPE;
+                                       }
+                                       fsd->out_buf[out_buf_pos++] = 
*val_start++;
+                               }
+                               fsd->out_buf[out_buf_pos++] = 
STREAM_FWF_FIELD_SEP;
+                       }
+                       fsd->out_buf[out_buf_pos++] = STREAM_FWF_RECORD_SEP;
+                       fsd->out_buf_remaining = out_buf_pos;
+                       fsd->out_buf_start = 0;
+               }
+
+               // now we know something is in output_buf so deliver it
+               if (fsd->out_buf_remaining <= to_write) {
+                       memcpy((char*)buf + buf_written, fsd->out_buf + 
fsd->out_buf_start, fsd->out_buf_remaining);
+                       to_write -= fsd->out_buf_remaining;
+                       buf_written += fsd->out_buf_remaining;
+                       fsd->out_buf_remaining = 0;
+               } else {
+                       memcpy((char*) buf + buf_written, fsd->out_buf + 
fsd->out_buf_start, to_write);
+                       fsd->out_buf_start += to_write;
+                       fsd->out_buf_remaining -= to_write;
+                       to_write = 0;
+               }
+       }
+       return cnt;
+}
+
+
+static void
+stream_fwf_close(stream *s)
+{
+       if (strcmp(s->name, STREAM_FWF_NAME) == 0) {
+               stream_fwf_data *fsd = (stream_fwf_data*) s->stream_data.p;
+               fsd->s->close(fsd->s);
+               free(fsd->widths);
+               free(fsd->in_buf);
+               free(fsd->out_buf);
+               free(fsd);
+       }
+       destroy(s);
+}
+
+stream*
+stream_fwf_create (stream *s, size_t num_fields, size_t *widths, char filler)
+{
+       stream *ns;
+       stream_fwf_data *fsd = malloc(sizeof(stream_fwf_data));
+       size_t i, out_buf_len;
+       if (!fsd) {
+               return NULL;
+       }
+       fsd->s = s;
+       fsd->num_fields = num_fields;
+       fsd->widths = widths;
+       fsd->filler = filler;
+       fsd->line_len = 1; // newline
+       for (i = 0; i < num_fields; i++) {
+               fsd->line_len += widths[i];
+       }
+       fsd->in_buf = malloc(fsd->line_len);
+       if (!fsd->in_buf) {
+               free(fsd);
+               return NULL;
+       }
+       out_buf_len = fsd->line_len * 2; // TODO: what if this is not enough?
+       fsd->out_buf = malloc(out_buf_len);
+       if (!fsd->out_buf) {
+               free(fsd->in_buf);
+               free(fsd);
+               return NULL;
+       }
+       fsd->out_buf_remaining = 0;
+
+       if ((ns = create_stream(STREAM_FWF_NAME)) == NULL) {
+               free(fsd->in_buf);
+               free(fsd->out_buf);
+               free(fsd);
+               return NULL;
+       }
+       ns->read = stream_fwf_read;
+       ns->close = stream_fwf_close;
+       ns->write = NULL;
+       ns->flush = NULL;
+       ns->access = ST_READ;
+       ns->stream_data.p = fsd;
+       return ns;
+}
+
diff --git a/common/stream/stream.h b/common/stream/stream.h
--- a/common/stream/stream.h
+++ b/common/stream/stream.h
@@ -254,4 +254,6 @@ stream_export stream *callback_stream(
 
 stream_export stream* stream_blackhole_create(void);
 
+stream_export stream* stream_fwf_create(stream *s, size_t num_fields, size_t 
*widths, char filler);
+
 #endif /*_STREAM_H_*/
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to