Changeset: 835cc241a8d0 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=835cc241a8d0 Added Files: common/stream/pump.h Modified Files: common/stream/xz_stream.c Branch: makelibstreamgreatagain Log Message:
Introduce generic_pump_{in,out}, use in xz_stream diffs (truncated from 475 to 300 lines): diff --git a/common/stream/pump.h b/common/stream/pump.h new file mode 100644 --- /dev/null +++ b/common/stream/pump.h @@ -0,0 +1,216 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * Copyright 1997 - July 2008 CWI, August 2008 - 2020 MonetDB B.V. + */ + +/* streams working on a lzma/xz-compressed disk file */ + +#include "monetdb_config.h" + +/* The logic of zlib's inflate and various other compression libraries + * is always very similar. At the heart there is a work function which is + * called with an input buffer, an output buffer and some internal state. + * The function tries to move bits out of the input buffer through the + * internal state to the output buffer until the input buffer is empty + * or the output buffer is full. + * + * It has an action parameter which controls flushing: sometimes, for example + * when closing the file, you want to flush the internal state out instead + * of hanging onto it waiting for more compression opportunities. If that + * parameter is set, the protocol is that you keep making room in the output + * buffer and keep offering *exactly the same* input buffer until the + * worker indicates all content has been flushed. + * + * All compression libraries we use have such an API but of course the + * parameter types etc are always different. + * This function encapsulates the logic of copying a whole buffer + * out to a stream in a type-agnostic way. + */ + +/* Helper functions used to access the state in a generic way */ + +typedef enum { + PUMP_OK, + PUMP_END, + PUMP_ERROR, +} pump_result; + +typedef enum { + PUMP_WORK, + PUMP_FLUSH_DATA, + PUMP_FLUSH_ALL, + PUMP_FINISH, +} pump_action; + +typedef pump_result (*pump_worker)(void *state, pump_action action); +typedef ssize_t (*pump_io)(void *state, char *data, size_t len); + +typedef struct { + char *start; + size_t count; +} pump_buffer; + +typedef struct { + char **start; + size_t *count; +} pump_buffer_location; + +// These helper functions help make sure we don't accidentally +// write `buf.start == 0` where we meant `*buf.start == 0`. + +static inline char * +start(pump_buffer_location b) +{ + return *b.start; +} + +static inline size_t +count(pump_buffer_location b) +{ + return *b.count; +} + +static inline void +set_count(pump_buffer_location b, size_t count) +{ + *b.count = count; +} + +static inline void +set_start(pump_buffer_location b, char *start) +{ + *b.start = start; +} + +static inline void +reset_buffer(pump_buffer_location b, pump_buffer buf) +{ + *b.start = buf.start; + *b.count = buf.count; +} + +static inline pump_result +generic_pump_out( + void *state, + pump_action action, + pump_buffer buffer, + pump_buffer_location window_in, + pump_buffer_location window_out, + pump_worker worker, + pump_io ship_out) +{ + while (1) { + // Make sure there is room in the output buffer + if (count(window_out) == 0) { + size_t amount = start(window_out) - buffer.start; + ssize_t nwritten = ship_out(state, buffer.start, amount); + if (nwritten != (ssize_t)amount) + return PUMP_ERROR; + reset_buffer(window_out, buffer); + } + + // Try to make progress + pump_result ret = worker(state, action); + if (ret == PUMP_ERROR) + return PUMP_ERROR; + + // There was no error but if input is still available, we definitely + // need another round + if (count(window_in) > 0) + continue; + + // Though the input data has been consumed, some may still linger + // in the internal state. + if (action == PUMP_WORK) { + // Let it linger, we'll combine it with the next batch + assert(ret == PUMP_OK); // worker would never PUMP_END + return PUMP_OK; + } + + // We are flushing or finishing or whatever. + // We may need to keep iterating to flush the internal state. + // Is there any internal state left? + if (ret == PUMP_OK) + // yes, there is + continue; + + // All internal state has been drained. + // Now drain the output buffer + assert(ret == PUMP_END); + size_t amount = start(window_out) - buffer.start; + if (amount > 0) { + ssize_t nwritten = ship_out(state, buffer.start, amount); + if (nwritten != (ssize_t)amount) + return PUMP_ERROR; + } + reset_buffer(window_out, buffer); + return PUMP_END; + } +} + +/* Similar to generic_pump_out, but for reading. +* +* In every iteration, fill the input buffer if empty, and let the worker try to +* make progress. Stop iterating if the output buffer is full, when ship_in fails +* or when it does not fill the whole buffer. The latter case is important when +* we're reading for example from a socket. It's more important to return the +* data that has come in so far than to wait until the output buffer is +* completely full. +* +* Returns PUMP_END if the input is exhausted and no data lingers in the internal +* state, PUMP_ERROR when the input has a failure or PUMP_OK otherwise. + */ +static inline pump_result +generic_pump_in( + void *state, + pump_buffer buffer, + pump_buffer_location window_in, + pump_buffer_location window_out, + pump_worker worker, + pump_io ship_in) +{ + char *orig_out = start(window_out); // nice for debugging + (void)orig_out; + while (1) { + if (count(window_out) == 0) + // Output buffer is sufficiently full. + return PUMP_OK; + + // Handle input, if possible and necessary + if (start(window_in) != NULL && count(window_in) == 0) { + // start != NULL means we haven't encountered EOF yet + ssize_t nread = ship_in(state, buffer.start, buffer.count); + if (nread < 0) + // Error. Return directly, discarding any data lingering + // in the internal state. + return PUMP_ERROR; + if (nread == 0) + // Set to NULL so we'll remember next time. + // Maybe there is some data in the internal state we don't + // return immediately. + set_start(window_in, NULL); + else + // All good + set_start(window_in, buffer.start); + set_count(window_in, (ssize_t)nread); + } + + // Try to make some progress + pump_action action = (start(window_in) != NULL) ? PUMP_WORK : PUMP_FINISH; + assert(count(window_out) > 0); + assert(count(window_in) > 0 || action == PUMP_FINISH); + pump_result ret = worker(state, action); + if (ret == PUMP_ERROR) + return PUMP_ERROR; + + if (ret == PUMP_END) + // If you say so + return PUMP_END; + + // If we get here we made some progress so we're ready for a new iteration. + } +} + diff --git a/common/stream/xz_stream.c b/common/stream/xz_stream.c --- a/common/stream/xz_stream.c +++ b/common/stream/xz_stream.c @@ -11,6 +11,7 @@ #include "monetdb_config.h" #include "stream.h" #include "stream_internal.h" +#include "pump.h" #ifdef HAVE_LIBLZMA @@ -21,6 +22,7 @@ typedef struct xz_state { uint8_t buf[XZBUFSIZ]; } xz_state; + /* Keep calling lzma_code until the whole input buffer has been consumed * and all necessary output has been written. * @@ -34,132 +36,95 @@ typedef struct xz_state { * * Returns > 0 on succes, 0 on error. */ -static int -pump_out(stream *s, lzma_action action) +static pump_result +pumper(void *state, pump_action action) { - xz_state *xz = (xz_state *) s->stream_data.p; - - while (1) { - // Make sure there is room in the output buffer - if (xz->strm.avail_out == 0) { - size_t nwritten = fwrite(xz->buf, 1, XZBUFSIZ, xz->fp); - if (nwritten != XZBUFSIZ) { - return 0; - } - xz->strm.next_out = xz->buf; - xz->strm.avail_out = XZBUFSIZ; - } - - lzma_ret ret = lzma_code(&xz->strm, action); - if (ret != LZMA_OK && ret != LZMA_STREAM_END) { - // Some kind of error. - return 0; - } + stream *s = (stream*) state; + xz_state *xz = (xz_state*) s->stream_data.p; - if (xz->strm.avail_in > 0) { - // Definitely not done yet. Flush the buffer and encode - // some more. - continue; - } - - // Whether we are already done or not depends on the mode. - if (action == LZMA_RUN) { - assert(ret == LZMA_OK); - // More input will follow so we can leave the output buffer - // for later. - return 1; - } + lzma_action a; + switch (action) { + case PUMP_WORK: + a = LZMA_RUN; + break; + case PUMP_FLUSH_DATA: + a = LZMA_SYNC_FLUSH; + break; + case PUMP_FLUSH_ALL: + a = LZMA_FULL_FLUSH; + break; + case PUMP_FINISH: + a = LZMA_FINISH; + break; + } - // We need to flush all data out of the encoder and out of our _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list