Changeset: 74246441f0f0 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=74246441f0f0 Modified Files: common/stream/gz_stream.c common/stream/stream.h Branch: makelibstreamgreatagain Log Message:
Also use pump_stream in gz_stream The tests pass now. diffs (truncated from 418 to 300 lines): diff --git a/common/stream/gz_stream.c b/common/stream/gz_stream.c --- a/common/stream/gz_stream.c +++ b/common/stream/gz_stream.c @@ -6,238 +6,241 @@ * Copyright 1997 - July 2008 CWI, August 2008 - 2020 MonetDB B.V. */ -/* streams working on a gzip-compressed disk file */ +/* streams working on a lzma/xz-compressed disk file */ #include "monetdb_config.h" #include "stream.h" #include "stream_internal.h" +#include "pump.h" #ifdef HAVE_LIBZ -#if ZLIB_VERNUM < 0x1290 -typedef size_t z_size_t; -/* simplistic version for ancient systems (CentOS 6, Ubuntu Trusty) */ -static z_size_t -gzfread(void *buf, z_size_t size, z_size_t nitems, gzFile file) +struct inner_state { + z_stream strm; + int (*indeflate)(z_streamp strm, int flush); + int (*indeflateEnd)(z_streamp strm); + Bytef buf[64*1024]; +}; + +static pump_buffer +get_src_win(inner_state_t *inner_state) { - unsigned sz = nitems * size > (size_t) 1 << 30 ? 1 << 30 : (unsigned) (nitems * size); - int len; - - len = gzread(file, buf, sz); - if (len == -1) - return 0; - return (z_size_t) len / size; + return (pump_buffer) { + .start = (void*) inner_state->strm.next_in, + .count = inner_state->strm.avail_in, + }; } -static z_size_t -gzfwrite(const void *buf, z_size_t size, z_size_t nitems, gzFile file) -{ - z_size_t sz = nitems * size; - - while (sz > 0) { - unsigned len = sz > ((z_size_t) 1 << 30) ? 1 << 30 : (unsigned) sz; - int wlen; - - wlen = gzwrite(file, buf, len); - if (wlen <= 0) - return 0; - buf = (const void *) ((const char *) buf + wlen); - sz -= (z_size_t) wlen; - } - return nitems; -} -#endif - -static ssize_t -stream_gzread(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt) +static void +set_src_win(inner_state_t *inner_state, pump_buffer buf) { - gzFile fp = (gzFile) s->stream_data.p; - z_size_t size; - - if (fp == NULL) { - s->errnr = MNSTR_READ_ERROR; - return -1; - } - - if (elmsize == 0 || cnt == 0) - return 0; - - size = gzfread(buf, elmsize, cnt, fp); - /* when in text mode, convert \r\n line endings to \n */ - if (!s->binary) { - char *p1, *p2, *pe; - - p1 = buf; - pe = p1 + size; - while (p1 < pe && *p1 != '\r') - p1++; - p2 = p1; - while (p1 < pe) { - if (*p1 == '\r' && p1[1] == '\n') - size--; - else - *p2++ = *p1; - p1++; - } - } - - return size == 0 ? -1 : (ssize_t) size; + inner_state->strm.next_in = buf.start; + inner_state->strm.avail_in = buf.count; } -static ssize_t -stream_gzwrite(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt) +static pump_buffer +get_dst_win(inner_state_t *inner_state) { - gzFile fp = (gzFile) s->stream_data.p; - z_size_t size; - - if (fp == NULL) { - s->errnr = MNSTR_WRITE_ERROR; - return -1; - } - - if (elmsize == 0 || cnt == 0) - return 0; - - size = gzfwrite(buf, elmsize, cnt, fp); - return size == 0 ? -1 : (ssize_t) size; -} - -static int -stream_gzflush(stream *s) -{ - if (s->stream_data.p == NULL) - return -1; - if (!s->readonly && - gzflush((gzFile) s->stream_data.p, Z_SYNC_FLUSH) != Z_OK) - return -1; - return 0; + return (pump_buffer) { + .start = inner_state->strm.next_out, + .count = inner_state->strm.avail_out, + }; } static void -stream_gzclose(stream *s) +set_dst_win(inner_state_t *inner_state, pump_buffer buf) +{ + inner_state->strm.next_out = buf.start; + inner_state->strm.avail_out = buf.count; +} + +static pump_buffer +get_buffer(inner_state_t *inner_state) +{ + return (pump_buffer) { + .start = inner_state->buf, + .count = sizeof(inner_state->buf), + }; +} + +static pump_result +work(inner_state_t *inner_state, pump_action action) +{ + int a; + switch (action) { + case PUMP_NO_FLUSH: + a = Z_NO_FLUSH; + break; + case PUMP_FLUSH_DATA: + a = Z_SYNC_FLUSH; + break; + case PUMP_FLUSH_ALL: + a = Z_FULL_FLUSH; + break; + case PUMP_FINISH: + a = Z_FINISH; + break; + default: + assert(0 /* unknown action */); + } + + int ret = inner_state->indeflate(&inner_state->strm, a); + + switch (ret) { + case Z_OK: + return PUMP_OK; + case Z_STREAM_END: + return PUMP_END; + default: + return PUMP_ERROR; + } +} + +static void +finalizer(inner_state_t *inner_state) { - stream_gzflush(s); - if (s->stream_data.p) - gzclose((gzFile) s->stream_data.p); - s->stream_data.p = NULL; + inner_state->indeflateEnd(&inner_state->strm); + free(inner_state); +} + +stream * +gz_stream(stream *inner, int level) +{ + inner_state_t *gz = calloc(1, sizeof(inner_state_t)); + pump_state *state = calloc(1, sizeof(pump_state)); + if (gz == NULL || state == NULL) { + free(gz); + free(state); + return NULL; + } + + state->inner_state = gz; + state->get_src_win = get_src_win; + state->set_src_win = set_src_win; + state->get_dst_win = get_dst_win; + state->set_dst_win = set_dst_win; + state->get_buffer = get_buffer; + state->worker = work; + state->finalizer = finalizer; + + int ret; + if (inner->readonly) { + gz->indeflate = inflate; + gz->indeflateEnd = inflateEnd; + gz->strm.next_in = gz->buf; + gz->strm.avail_in = 0; + gz->strm.next_in = NULL; + gz->strm.avail_in = 0; + ret = inflateInit2(&gz->strm, 15 | 32); // 15 = allow all window sizes, 32 = accept gzip and zlib headers + } else { + gz->indeflate = deflate; + gz->indeflateEnd = deflateEnd; + gz->strm.next_out = gz->buf; + gz->strm.avail_out = sizeof(gz->buf); + ret = deflateInit2(&gz->strm, level, Z_DEFLATED, 15 | 16, 8, Z_DEFAULT_STRATEGY); + } + + stream *s = pump_stream(inner, state); + + if (ret != LZMA_OK || s == NULL) { + gz->indeflateEnd(&gz->strm); + free(gz); + free(state); + return NULL; + } + + s->stream_data.p = (void*) state; + + return s; } static stream * open_gzstream(const char *restrict filename, const char *restrict flags) { - stream *s; - gzFile fp; + stream *inner; + int preset = 6; - if ((s = create_stream(filename)) == NULL) + inner = open_stream(filename, flags); + if (inner == NULL) return NULL; -#ifdef HAVE__WFOPEN - { - wchar_t *wfname = utf8towchar(filename); - if (wfname != NULL) { - fp = gzopen_w(wfname, flags); - free(wfname); - } else - fp = NULL; - } -#else - { - char *fname = cvfilename(filename); - if (fname) { - fp = gzopen(fname, flags); - free(fname); - } else - fp = NULL; - } -#endif - if (fp == NULL) { - destroy_stream(s); - return NULL; - } - s->read = stream_gzread; - s->write = stream_gzwrite; - s->close = stream_gzclose; - s->flush = stream_gzflush; - s->stream_data.p = (void *) fp; - if (flags[0] == 'r' && flags[1] != 'b') { - char buf[UTF8BOMLENGTH]; - if (gzread(fp, buf, UTF8BOMLENGTH) == UTF8BOMLENGTH && - strncmp(buf, UTF8BOM, UTF8BOMLENGTH) == 0) { - s->isutf8 = true; - } else { - gzrewind(fp); - } _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list