Changeset: cd474532e378 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=cd474532e378 Modified Files: common/stream/stream.c common/stream/stream.h Branch: fixed-width-format Log Message:
new fwf stream wrapper diffs (160 lines): diff --git a/common/stream/stream.c b/common/stream/stream.c --- a/common/stream/stream.c +++ b/common/stream/stream.c @@ -4671,3 +4671,146 @@ stream * stream_blackhole_create (void) s->access = ST_WRITE; return s; } + + +/* fixed-width format streams */ + +#define STREAM_FWF_NAME "fwf" +#define STREAM_FWF_FIELD_SEP '|' +#define STREAM_FWF_ESCAPE '\\' +#define STREAM_FWF_RECORD_SEP '\n' + +typedef struct { + stream *s; + // config + size_t num_fields; + size_t *widths; + char filler; + // state + size_t line_len; + char* in_buf; + char* out_buf; + size_t out_buf_start; + size_t out_buf_remaining; +} stream_fwf_data; + + +static ssize_t +stream_fwf_read(stream *s, void *buf, size_t elmsize, size_t cnt) +{ + stream_fwf_data *fsd; + size_t to_write = cnt; + size_t buf_written = 0; + if (strcmp(s->name, STREAM_FWF_NAME) != 0 || elmsize != 1) { + return -1; + } + fsd = (stream_fwf_data*) s->stream_data.p; + + while (to_write > 0) { + // input conversion + if (fsd->out_buf_remaining == 0) { // need to convert next line + size_t field_idx, in_buf_pos = 0, out_buf_pos = 0; + ssize_t actually_read = fsd->s->read(fsd->s, fsd->in_buf, 1, fsd->line_len); + if (actually_read < (ssize_t) fsd->line_len) { // incomplete last line + if (actually_read < 0) { + return actually_read; // this is an error + } + return buf_written; // skip last line + } + for (field_idx = 0; field_idx < fsd->num_fields; field_idx++) { + char *val_start, *val_end; + val_start = fsd->in_buf + in_buf_pos; + in_buf_pos += fsd->widths[field_idx]; + val_end = fsd->in_buf + in_buf_pos - 1; + while (*val_start == fsd->filler) val_start++; + while (*val_end == fsd->filler) val_end--; + while (val_start <= val_end) { + if (*val_start == STREAM_FWF_FIELD_SEP) { + fsd->out_buf[out_buf_pos++] = STREAM_FWF_ESCAPE; + } + fsd->out_buf[out_buf_pos++] = *val_start++; + } + fsd->out_buf[out_buf_pos++] = STREAM_FWF_FIELD_SEP; + } + fsd->out_buf[out_buf_pos++] = STREAM_FWF_RECORD_SEP; + fsd->out_buf_remaining = out_buf_pos; + fsd->out_buf_start = 0; + } + + // now we know something is in output_buf so deliver it + if (fsd->out_buf_remaining <= to_write) { + memcpy((char*)buf + buf_written, fsd->out_buf + fsd->out_buf_start, fsd->out_buf_remaining); + to_write -= fsd->out_buf_remaining; + buf_written += fsd->out_buf_remaining; + fsd->out_buf_remaining = 0; + } else { + memcpy((char*) buf + buf_written, fsd->out_buf + fsd->out_buf_start, to_write); + fsd->out_buf_start += to_write; + fsd->out_buf_remaining -= to_write; + to_write = 0; + } + } + return cnt; +} + + +static void +stream_fwf_close(stream *s) +{ + if (strcmp(s->name, STREAM_FWF_NAME) == 0) { + stream_fwf_data *fsd = (stream_fwf_data*) s->stream_data.p; + fsd->s->close(fsd->s); + free(fsd->widths); + free(fsd->in_buf); + free(fsd->out_buf); + free(fsd); + } + destroy(s); +} + +stream* +stream_fwf_create (stream *s, size_t num_fields, size_t *widths, char filler) +{ + stream *ns; + stream_fwf_data *fsd = malloc(sizeof(stream_fwf_data)); + size_t i, out_buf_len; + if (!fsd) { + return NULL; + } + fsd->s = s; + fsd->num_fields = num_fields; + fsd->widths = widths; + fsd->filler = filler; + fsd->line_len = 1; // newline + for (i = 0; i < num_fields; i++) { + fsd->line_len += widths[i]; + } + fsd->in_buf = malloc(fsd->line_len); + if (!fsd->in_buf) { + free(fsd); + return NULL; + } + out_buf_len = fsd->line_len * 2; // TODO: what if this is not enough? + fsd->out_buf = malloc(out_buf_len); + if (!fsd->out_buf) { + free(fsd->in_buf); + free(fsd); + return NULL; + } + fsd->out_buf_remaining = 0; + + if ((ns = create_stream(STREAM_FWF_NAME)) == NULL) { + free(fsd->in_buf); + free(fsd->out_buf); + free(fsd); + return NULL; + } + ns->read = stream_fwf_read; + ns->close = stream_fwf_close; + ns->write = NULL; + ns->flush = NULL; + ns->access = ST_READ; + ns->stream_data.p = fsd; + return ns; +} + diff --git a/common/stream/stream.h b/common/stream/stream.h --- a/common/stream/stream.h +++ b/common/stream/stream.h @@ -254,4 +254,6 @@ stream_export stream *callback_stream( stream_export stream* stream_blackhole_create(void); +stream_export stream* stream_fwf_create(stream *s, size_t num_fields, size_t *widths, char filler); + #endif /*_STREAM_H_*/ _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list