Changeset: a007728e60e7 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a007728e60e7
Modified Files:
        common/stream/bz2_stream.c
        common/stream/gz_stream.c
        common/stream/lz4_stream.c
        common/stream/pump.c
        common/stream/pump.h
        common/stream/text_stream.c
        common/stream/xz_stream.c
Branch: default
Log Message:

Try to retrieve error messages from pump streams


diffs (237 lines):

diff --git a/common/stream/bz2_stream.c b/common/stream/bz2_stream.c
--- a/common/stream/bz2_stream.c
+++ b/common/stream/bz2_stream.c
@@ -122,6 +122,13 @@ finalizer(inner_state_t *inner_state)
        free(inner_state);
 }
 
+static const char*
+bz2_get_error(inner_state_t *inner_state)
+{
+       int dummy;
+       return BZ2_bzerror(&inner_state->strm, &dummy);
+}
+
 static int
 BZ2_bzDecompress_wrapper(bz_stream *strm, int a)
 {
@@ -148,6 +155,7 @@ bz2_stream(stream *inner, int level)
        state->set_dst_win = set_dst_win;
        state->get_buffer = get_buffer;
        state->worker = work;
+       state->get_error = bz2_get_error;
        state->finalizer = finalizer;
 
        int ret;
diff --git a/common/stream/gz_stream.c b/common/stream/gz_stream.c
--- a/common/stream/gz_stream.c
+++ b/common/stream/gz_stream.c
@@ -107,6 +107,13 @@ finalizer(inner_state_t *inner_state)
        free(inner_state);
 }
 
+
+static const char*
+get_error(inner_state_t *inner_state)
+{
+       return inner_state->strm.msg;
+}
+
 stream *
 gz_stream(stream *inner, int level)
 {
@@ -126,6 +133,7 @@ gz_stream(stream *inner, int level)
        state->set_dst_win = set_dst_win;
        state->get_buffer = get_buffer;
        state->worker = work;
+       state->get_error = get_error;
        state->finalizer = finalizer;
 
        int ret;
diff --git a/common/stream/lz4_stream.c b/common/stream/lz4_stream.c
--- a/common/stream/lz4_stream.c
+++ b/common/stream/lz4_stream.c
@@ -28,6 +28,7 @@ struct inner_state {
                LZ4F_dctx *d;
        } ctx;
        LZ4F_preferences_t compression_prefs;
+       LZ4F_errorCode_t error_code;
        bool finished;
 };
 
@@ -85,10 +86,11 @@ decomp(inner_state_t *inner_state, pump_
        inner_state->dst_win.start += ndst;
        inner_state->dst_win.count -= ndst;
 
-       if (LZ4F_isError(ret))
+       if (LZ4F_isError(ret)) {
+               inner_state->error_code = ret;
                return PUMP_ERROR;
-       else
-               return PUMP_OK;
+       }
+       return PUMP_OK;
 }
 
 static void
@@ -157,8 +159,10 @@ compr(inner_state_t *inner_state, pump_a
                        return PUMP_ERROR;
        }
 
-       if (LZ4F_isError(produced))
+       if (LZ4F_isError(produced)) {
+               inner_state->error_code = produced;
                return PUMP_ERROR;
+       }
 
        inner_state->src_win.start += consumed;
        inner_state->src_win.count -= consumed;
@@ -176,6 +180,12 @@ compr_end(inner_state_t *inner_state)
        free(inner_state);
 }
 
+static const char*
+get_error(inner_state_t *inner_state)
+{
+       return LZ4F_getErrorName(inner_state->error_code);
+}
+
 static stream *
 setup_decompression(stream *inner, pump_state *state)
 {
@@ -287,6 +297,7 @@ lz4_stream(stream *inner, int level)
        state->get_dst_win = get_dst_win;
        state->set_dst_win = set_dst_win;
        state->get_buffer = get_buffer;
+       state->get_error = get_error;
 
        stream *s;
        if (inner->readonly)
diff --git a/common/stream/pump.c b/common/stream/pump.c
--- a/common/stream/pump.c
+++ b/common/stream/pump.c
@@ -35,6 +35,7 @@ pump_stream(stream *inner, pump_state *s
        assert(state->set_dst_win != NULL);
        assert(state->get_buffer != NULL);
        assert(state->worker != NULL);
+       assert(state->get_error != NULL);
        assert(state->finalizer != NULL);
 
        inner_state_t *inner_state = state->inner_state;
@@ -78,7 +79,10 @@ pump_read(stream *restrict s, void *rest
        state->set_dst_win(inner_state, (pump_buffer){ .start = buf, .count = 
size});
        pump_result ret = pump_in(s);
        if (ret == PUMP_ERROR) {
-               assert(s->errkind != MNSTR_NO__ERROR);
+               const char *msg = state->get_error(inner_state);
+               if (msg != NULL)
+                       msg = "processing failed without further error 
indication";
+               mnstr_set_error(s, MNSTR_READ_ERROR, "%s", msg);
                return -1;
        }
 
@@ -102,7 +106,10 @@ pump_write(stream *restrict s, const voi
        state->set_src_win(inner_state, (pump_buffer){ .start = (void*)buf, 
.count = size });
        pump_result ret = pump_out(s, PUMP_NO_FLUSH);
        if (ret == PUMP_ERROR) {
-               assert(s->errkind != MNSTR_NO__ERROR);
+               const char *msg = state->get_error(inner_state);
+               if (msg != NULL)
+                       msg = "processing failed without further error 
indication";
+               mnstr_set_error(s, MNSTR_READ_ERROR, "%s", msg);
                return -1;
        }
        ssize_t nwritten = state->get_src_win(inner_state).start - (char*)buf;
diff --git a/common/stream/pump.h b/common/stream/pump.h
--- a/common/stream/pump.h
+++ b/common/stream/pump.h
@@ -66,6 +66,7 @@ typedef struct pump_state {
        buf_getter get_buffer;
        pump_worker worker;
        void (*finalizer)(inner_state_t *inner_state);
+       const char *(*get_error)(inner_state_t *inner_state);
        size_t elbow_room;
 } pump_state;
 
diff --git a/common/stream/text_stream.c b/common/stream/text_stream.c
--- a/common/stream/text_stream.c
+++ b/common/stream/text_stream.c
@@ -230,6 +230,13 @@ text_end(inner_state_t *s)
 }
 
 
+static const char*
+get_error(inner_state_t *s)
+{
+       (void)s;
+       return "line ending conversion failure";
+}
+
 static ssize_t
 skip_bom(stream *s)
 {
@@ -281,6 +288,7 @@ create_text_stream(stream *inner)
        state->set_dst_win = set_dst_win;
        state->get_buffer = get_buffer;
        state->finalizer = text_end;
+       state->get_error = get_error;
 
        inner_state->putback_win.start = inner_state->putback_buf;
        inner_state->putback_win.count = 0;
diff --git a/common/stream/xz_stream.c b/common/stream/xz_stream.c
--- a/common/stream/xz_stream.c
+++ b/common/stream/xz_stream.c
@@ -19,6 +19,7 @@
 struct inner_state {
        lzma_stream strm;
        uint8_t buf[64*1024];
+       lzma_ret error_code;
 };
 
 static pump_buffer
@@ -85,6 +86,7 @@ xz_work(inner_state_t *xz, pump_action a
        }
 
        lzma_ret ret = lzma_code(&xz->strm, a);
+       xz->error_code = ret;
 
        switch (ret) {
                case LZMA_OK:
@@ -103,6 +105,29 @@ xz_finalizer(inner_state_t *xz)
        free(xz);
 }
 
+static const char *
+xz_get_error(inner_state_t *xz)
+{
+       static const char *msgs[] = {
+               "LZMA_OK",
+               "LZMA_STREAM_END",
+               "LZMA_NO_CHECK",
+               "LZMA_UNSUPPORTED_CHECK",
+               "LZMA_GET_CHECK",
+               "LZMA_MEM_ERROR",
+               "LZMA_MEMLIMIT_ERROR",
+               "LZMA_FORMAT_ERROR",
+               "LZMA_OPTIONS_ERROR",
+               "LZMA_DATA_ERROR",
+               "LZMA_BUF_ERROR",
+               "LZMA_PROG_ERROR"
+       };
+
+       if (xz->error_code <= LZMA_PROG_ERROR)
+               return msgs[xz->error_code];
+       else
+               return "unknown LZMA error code";
+}
 
 
 
@@ -127,6 +152,7 @@ xz_stream(stream *inner, int preset)
        state->get_buffer = xz_get_buffer;
        state->worker = xz_work;
        state->finalizer = xz_finalizer;
+       state->get_error = xz_get_error;
 
        lzma_ret ret;
        if (inner->readonly) {
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to