Changeset: 74246441f0f0 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=74246441f0f0
Modified Files:
        common/stream/gz_stream.c
        common/stream/stream.h
Branch: makelibstreamgreatagain
Log Message:

Also use pump_stream in gz_stream

The tests pass now.


diffs (truncated from 418 to 300 lines):

diff --git a/common/stream/gz_stream.c b/common/stream/gz_stream.c
--- a/common/stream/gz_stream.c
+++ b/common/stream/gz_stream.c
@@ -6,238 +6,241 @@
  * Copyright 1997 - July 2008 CWI, August 2008 - 2020 MonetDB B.V.
  */
 
-/* streams working on a gzip-compressed disk file */
+/* streams working on a lzma/xz-compressed disk file */
 
 #include "monetdb_config.h"
 #include "stream.h"
 #include "stream_internal.h"
+#include "pump.h"
 
 
 #ifdef HAVE_LIBZ
-#if ZLIB_VERNUM < 0x1290
-typedef size_t z_size_t;
 
-/* simplistic version for ancient systems (CentOS 6, Ubuntu Trusty) */
-static z_size_t
-gzfread(void *buf, z_size_t size, z_size_t nitems, gzFile file)
+struct inner_state {
+       z_stream strm;
+       int (*indeflate)(z_streamp strm, int flush);
+       int (*indeflateEnd)(z_streamp strm);
+       Bytef buf[64*1024];
+};
+
+static pump_buffer
+get_src_win(inner_state_t *inner_state)
 {
-       unsigned sz = nitems * size > (size_t) 1 << 30 ? 1 << 30 : (unsigned) 
(nitems * size);
-       int len;
-
-       len = gzread(file, buf, sz);
-       if (len == -1)
-               return 0;
-       return (z_size_t) len / size;
+       return (pump_buffer) {
+               .start = (void*) inner_state->strm.next_in,
+               .count = inner_state->strm.avail_in,
+       };
 }
 
-static z_size_t
-gzfwrite(const void *buf, z_size_t size, z_size_t nitems, gzFile file)
-{
-       z_size_t sz = nitems * size;
-
-       while (sz > 0) {
-               unsigned len = sz > ((z_size_t) 1 << 30) ? 1 << 30 : (unsigned) 
sz;
-               int wlen;
-
-               wlen = gzwrite(file, buf, len);
-               if (wlen <= 0)
-                       return 0;
-               buf = (const void *) ((const char *) buf + wlen);
-               sz -= (z_size_t) wlen;
-       }
-       return nitems;
-}
-#endif
-
-static ssize_t
-stream_gzread(stream *restrict s, void *restrict buf, size_t elmsize, size_t 
cnt)
+static void
+set_src_win(inner_state_t *inner_state, pump_buffer buf)
 {
-       gzFile fp = (gzFile) s->stream_data.p;
-       z_size_t size;
-
-       if (fp == NULL) {
-               s->errnr = MNSTR_READ_ERROR;
-               return -1;
-       }
-
-       if (elmsize == 0 || cnt == 0)
-               return 0;
-
-       size = gzfread(buf, elmsize, cnt, fp);
-       /* when in text mode, convert \r\n line endings to \n */
-       if (!s->binary) {
-               char *p1, *p2, *pe;
-
-               p1 = buf;
-               pe = p1 + size;
-               while (p1 < pe && *p1 != '\r')
-                       p1++;
-               p2 = p1;
-               while (p1 < pe) {
-                       if (*p1 == '\r' && p1[1] == '\n')
-                               size--;
-                       else
-                               *p2++ = *p1;
-                       p1++;
-               }
-       }
-
-       return size == 0 ? -1 : (ssize_t) size;
+       inner_state->strm.next_in = buf.start;
+       inner_state->strm.avail_in = buf.count;
 }
 
-static ssize_t
-stream_gzwrite(stream *restrict s, const void *restrict buf, size_t elmsize, 
size_t cnt)
+static pump_buffer
+get_dst_win(inner_state_t *inner_state)
 {
-       gzFile fp = (gzFile) s->stream_data.p;
-       z_size_t size;
-
-       if (fp == NULL) {
-               s->errnr = MNSTR_WRITE_ERROR;
-               return -1;
-       }
-
-       if (elmsize == 0 || cnt == 0)
-               return 0;
-
-       size = gzfwrite(buf, elmsize, cnt, fp);
-       return size == 0 ? -1 : (ssize_t) size;
-}
-
-static int
-stream_gzflush(stream *s)
-{
-       if (s->stream_data.p == NULL)
-               return -1;
-       if (!s->readonly &&
-           gzflush((gzFile) s->stream_data.p, Z_SYNC_FLUSH) != Z_OK)
-               return -1;
-       return 0;
+       return (pump_buffer) {
+               .start = inner_state->strm.next_out,
+               .count = inner_state->strm.avail_out,
+       };
 }
 
 static void
-stream_gzclose(stream *s)
+set_dst_win(inner_state_t *inner_state, pump_buffer buf)
+{
+       inner_state->strm.next_out = buf.start;
+       inner_state->strm.avail_out = buf.count;
+}
+
+static pump_buffer
+get_buffer(inner_state_t *inner_state)
+{
+       return (pump_buffer) {
+               .start = inner_state->buf,
+               .count = sizeof(inner_state->buf),
+       };
+}
+
+static pump_result
+work(inner_state_t *inner_state, pump_action action)
+{
+       int a;
+       switch (action) {
+       case PUMP_NO_FLUSH:
+               a = Z_NO_FLUSH;
+               break;
+       case PUMP_FLUSH_DATA:
+               a = Z_SYNC_FLUSH;
+               break;
+       case PUMP_FLUSH_ALL:
+               a = Z_FULL_FLUSH;
+               break;
+       case PUMP_FINISH:
+               a = Z_FINISH;
+               break;
+       default:
+               assert(0 /* unknown action */);
+       }
+
+       int ret = inner_state->indeflate(&inner_state->strm, a);
+
+       switch (ret) {
+               case Z_OK:
+                       return PUMP_OK;
+               case Z_STREAM_END:
+                       return PUMP_END;
+               default:
+                       return PUMP_ERROR;
+       }
+}
+
+static void
+finalizer(inner_state_t *inner_state)
 {
-       stream_gzflush(s);
-       if (s->stream_data.p)
-               gzclose((gzFile) s->stream_data.p);
-       s->stream_data.p = NULL;
+       inner_state->indeflateEnd(&inner_state->strm);
+       free(inner_state);
+}
+
+stream *
+gz_stream(stream *inner, int level)
+{
+       inner_state_t *gz = calloc(1, sizeof(inner_state_t));
+       pump_state *state = calloc(1, sizeof(pump_state));
+       if (gz == NULL || state == NULL) {
+               free(gz);
+               free(state);
+               return NULL;
+       }
+
+       state->inner_state = gz;
+       state->get_src_win = get_src_win;
+       state->set_src_win = set_src_win;
+       state->get_dst_win = get_dst_win;
+       state->set_dst_win = set_dst_win;
+       state->get_buffer = get_buffer;
+       state->worker = work;
+       state->finalizer = finalizer;
+
+       int ret;
+       if (inner->readonly) {
+               gz->indeflate = inflate;
+               gz->indeflateEnd = inflateEnd;
+               gz->strm.next_in = gz->buf;
+               gz->strm.avail_in = 0;
+               gz->strm.next_in = NULL;
+               gz->strm.avail_in = 0;
+               ret = inflateInit2(&gz->strm, 15 | 32); // 15 = allow all 
window sizes, 32 = accept gzip and zlib headers
+       } else {
+               gz->indeflate = deflate;
+               gz->indeflateEnd = deflateEnd;
+               gz->strm.next_out = gz->buf;
+               gz->strm.avail_out = sizeof(gz->buf);
+               ret = deflateInit2(&gz->strm, level, Z_DEFLATED, 15 | 16, 8, 
Z_DEFAULT_STRATEGY);
+       }
+
+       stream *s = pump_stream(inner, state);
+
+       if (ret != LZMA_OK || s == NULL) {
+               gz->indeflateEnd(&gz->strm);
+               free(gz);
+               free(state);
+               return NULL;
+       }
+
+       s->stream_data.p = (void*) state;
+
+       return s;
 }
 
 static stream *
 open_gzstream(const char *restrict filename, const char *restrict flags)
 {
-       stream *s;
-       gzFile fp;
+       stream *inner;
+       int preset = 6;
 
-       if ((s = create_stream(filename)) == NULL)
+       inner = open_stream(filename, flags);
+       if (inner == NULL)
                return NULL;
-#ifdef HAVE__WFOPEN
-       {
-               wchar_t *wfname = utf8towchar(filename);
-               if (wfname != NULL) {
-                       fp = gzopen_w(wfname, flags);
-                       free(wfname);
-               } else
-                       fp = NULL;
-       }
-#else
-       {
-               char *fname = cvfilename(filename);
-               if (fname) {
-                       fp = gzopen(fname, flags);
-                       free(fname);
-               } else
-                       fp = NULL;
-       }
-#endif
-       if (fp == NULL) {
-               destroy_stream(s);
-               return NULL;
-       }
-       s->read = stream_gzread;
-       s->write = stream_gzwrite;
-       s->close = stream_gzclose;
-       s->flush = stream_gzflush;
-       s->stream_data.p = (void *) fp;
-       if (flags[0] == 'r' && flags[1] != 'b') {
-               char buf[UTF8BOMLENGTH];
-               if (gzread(fp, buf, UTF8BOMLENGTH) == UTF8BOMLENGTH &&
-                   strncmp(buf, UTF8BOM, UTF8BOMLENGTH) == 0) {
-                       s->isutf8 = true;
-               } else {
-                       gzrewind(fp);
-               }
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to