Hi John, > >> Also, I don't think > >> the new logic for the ctrl/c variables is an improvement: > >> > >> 1. iter->ctrlc is intialized with '8' (even in the uncompressed case, > >> which is confusing). Any time you initialize with something not 0 or > >> 1, it's a magic number, and here it's far from where the loop variable > >> is used. This is harder to read. > > > > `iter->ctrlc` is used to record the value of `ctrl` in pglz_decompress > at the end of > > the last iteration(or loop). In the pglz_decompress, `ctrlc`’s valid > value is 0~7, > > When `ctrlc` reaches 8, a control byte is read from the source > > buffer to `ctrl` then set `ctrlc` to 0. And a control bytes should be > read from the > > source buffer to `ctrlc` on the first iteration. So `iter->ctrlc` should > be intialized with '8'. > > My point here is it looks strange out of context, but "0" looked > normal. Maybe a comment in init_detoast_buffer(), something like "8 > means read a control byte from the source buffer on the first > iteration, see pg_lzdecompress_iterate()". > > Or, possibly, we could have a macro like INVALID_CTRLC. That might > even improve the readability of the original function. This is just an > idea, and maybe others would disagree, so you don't need to change it > for now. >
All in all, the idea is much better than a magic number 8. So, I've implemented it. > At this point, there are no functional things that I think we need to > change. It's close to ready-for-committer. For the next version, I'd > like you go through the comments and edit for grammar, spelling, and > clarity as you see fit. I know you're not a native speaker of English, > so I can help you with anything that remains. I've tried my best to improve the comments, but there should be room for further improvement I hope you can help me perfect it. > Also note we use braces > on their own lines > { > like this > } > > Done. -- Best regards, Binguo Bao
From 13648adb56b96a75910bb5d0a8e21d358b266d51 Mon Sep 17 00:00:00 2001 From: BBG <djydew...@gmail.com> Date: Tue, 4 Jun 2019 22:56:42 +0800 Subject: [PATCH] de-TOASTing using a iterator --- src/backend/access/heap/tuptoaster.c | 470 +++++++++++++++++++++++++++++++++++ src/backend/utils/adt/varlena.c | 40 ++- src/include/access/tuptoaster.h | 97 ++++++++ src/include/fmgr.h | 7 + 4 files changed, 608 insertions(+), 6 deletions(-) diff --git a/src/backend/access/heap/tuptoaster.c b/src/backend/access/heap/tuptoaster.c index 74233bb..a8924f1 100644 --- a/src/backend/access/heap/tuptoaster.c +++ b/src/backend/access/heap/tuptoaster.c @@ -83,6 +83,13 @@ static int toast_open_indexes(Relation toastrel, static void toast_close_indexes(Relation *toastidxs, int num_indexes, LOCKMODE lock); static void init_toast_snapshot(Snapshot toast_snapshot); +static FetchDatumIterator create_fetch_datum_iterator(struct varlena *attr); +static bool free_fetch_datum_iterator(FetchDatumIterator iter); +static void fetch_datum_iterate(FetchDatumIterator iter); +static void init_toast_buffer(ToastBuffer *buf, int size, bool compressed); +static bool free_toast_buffer(ToastBuffer *buf); +static void pglz_decompress_iterate(ToastBuffer *source, ToastBuffer *dest, + DetoastIterator iter); /* ---------- @@ -347,6 +354,125 @@ heap_tuple_untoast_attr_slice(struct varlena *attr, /* ---------- + * init_detoast_iterator - + * + * The "iterator" variable is normally just a local variable in the caller. + * It only make sense to initialize de-TOAST iterator for external on-disk value. + * + * ---------- + */ +bool init_detoast_iterator(struct varlena *attr, DetoastIterator iterator) +{ + struct varatt_external toast_pointer; + if (VARATT_IS_EXTERNAL_ONDISK(attr)) + { + /* + * This is an externally stored datum --- initialize fetch datum iterator + */ + iterator->fetch_datum_iterator = create_fetch_datum_iterator(attr); + VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr); + if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)) + { + /* If it's compressed, prepare buffer for raw data */ + iterator->buf = (ToastBuffer *) palloc0(sizeof(ToastBuffer)); + init_toast_buffer(iterator->buf, toast_pointer.va_rawsize, false); + iterator->ctrl = 0; + iterator->ctrlc = INVALID_CTRLC; + iterator->compressed = true; + iterator->done = false; + } + else + { + iterator->buf = iterator->fetch_datum_iterator->buf; + iterator->ctrl = 0; + iterator->ctrlc = INVALID_CTRLC; + iterator->compressed = false; + iterator->done = false; + } + return true; + } + else if (VARATT_IS_EXTERNAL_INDIRECT(attr)) + { + /* + * This is an indirect pointer --- dereference it + */ + struct varatt_indirect redirect; + + VARATT_EXTERNAL_GET_POINTER(redirect, attr); + attr = (struct varlena *) redirect.pointer; + + /* nested indirect Datums aren't allowed */ + Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr)); + + /* recurse in case value is still extended in some other way */ + return init_detoast_iterator(attr, iterator); + + } + else if (VARATT_IS_COMPRESSED(attr)) + { + /* + * This is a compressed value inside of the main tuple + * skip the iterator and just decompress the whole thing. + */ + return false; + } + + return false; +} + + +/* ---------- + * free_detoast_iterator - + * + * Free the memory space occupied by the de-TOAST iterator include buffers and + * fetch datum iterator. + * + * Note: "iter" variable is normally just a local variable in the caller, so + * shouldn't free de-TOAST iterator itself. + * ---------- + */ +bool free_detoast_iterator(DetoastIterator iter) +{ + if (iter == NULL) + { + return false; + } + if (iter->buf != iter->fetch_datum_iterator->buf) + { + free_toast_buffer(iter->buf); + } + free_fetch_datum_iterator(iter->fetch_datum_iterator); + return true; +} + + +/* ---------- + * detoast_iterate - + * + * Iterate through the toasted value referenced by iterator. + * + * As long as there is another data chunk in compression or external storage, + * de-TOAST it into toast buffer in iterator. + * ---------- + */ +extern void detoast_iterate(DetoastIterator iter) +{ + FetchDatumIterator fetch_iter = iter->fetch_datum_iterator; + + Assert(iter != NULL && !iter->done); + + fetch_datum_iterate(fetch_iter); + + if (iter->compressed) + pglz_decompress_iterate(fetch_iter->buf, iter->buf, iter); + + if (iter->buf->limit == iter->buf->capacity) { + iter->done = true; + } +} + + +/* ---------- * toast_raw_datum_size - * * Return the raw (detoasted) size of a varlena datum @@ -2419,3 +2545,347 @@ init_toast_snapshot(Snapshot toast_snapshot) InitToastSnapshot(*toast_snapshot, snapshot->lsn, snapshot->whenTaken); } + + +/* ---------- + * create_fetch_datum_iterator - + * + * Initialize fetch datum iterator. + * ---------- + */ +static FetchDatumIterator +create_fetch_datum_iterator(struct varlena *attr) +{ + int validIndex; + FetchDatumIterator iterator; + + if (!VARATT_IS_EXTERNAL_ONDISK(attr)) + elog(ERROR, "create_fetch_datum_itearator shouldn't be called for non-ondisk datums"); + + iterator = (FetchDatumIterator) palloc0(sizeof(FetchDatumIteratorData)); + + /* Must copy to access aligned fields */ + VARATT_EXTERNAL_GET_POINTER(iterator->toast_pointer, attr); + + iterator->ressize = iterator->toast_pointer.va_extsize; + iterator->numchunks = ((iterator->ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1; + + /* + * Open the toast relation and its indexes + */ + iterator->toastrel = table_open(iterator->toast_pointer.va_toastrelid, AccessShareLock); + + /* Look for the valid index of the toast relation */ + validIndex = toast_open_indexes(iterator->toastrel, + AccessShareLock, + &iterator->toastidxs, + &iterator->num_indexes); + + /* + * Setup a scan key to fetch from the index by va_valueid + */ + ScanKeyInit(&iterator->toastkey, + (AttrNumber) 1, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(iterator->toast_pointer.va_valueid)); + + /* + * Read the chunks by index + * + * Note that because the index is actually on (valueid, chunkidx) we will + * see the chunks in chunkidx order, even though we didn't explicitly ask + * for it. + */ + + init_toast_snapshot(&iterator->SnapshotToast); + iterator->toastscan = systable_beginscan_ordered(iterator->toastrel, iterator->toastidxs[validIndex], + &iterator->SnapshotToast, 1, &iterator->toastkey); + + iterator->buf = (ToastBuffer *) palloc0(sizeof(ToastBuffer)); + init_toast_buffer(iterator->buf, iterator->ressize + VARHDRSZ, VARATT_EXTERNAL_IS_COMPRESSED(iterator->toast_pointer)); + + iterator->nextidx = 0; + iterator->done = false; + + return iterator; +} + +static bool +free_fetch_datum_iterator(FetchDatumIterator iter) +{ + if (iter == NULL) + { + return false; + } + + if (!iter->done) + { + systable_endscan_ordered(iter->toastscan); + toast_close_indexes(iter->toastidxs, iter->num_indexes, AccessShareLock); + table_close(iter->toastrel, AccessShareLock); + } + free_toast_buffer(iter->buf); + pfree(iter); + return true; +} + +/* ---------- + * fetch_datum_iterate - + * + * Iterate through the toasted value referenced by iterator. + * + * As long as there is another chunk data in compression or external storage, + * fetch it into buffer in iterator. + * ---------- + */ +static void +fetch_datum_iterate(FetchDatumIterator iter) +{ + HeapTuple ttup; + TupleDesc toasttupDesc; + int32 residx; + Pointer chunk; + bool isnull; + char *chunkdata; + int32 chunksize; + + Assert(iter != NULL && !iter->done); + + ttup = systable_getnext_ordered(iter->toastscan, ForwardScanDirection); + if (ttup == NULL) + { + /* + * Final checks that we successfully fetched the datum + */ + if (iter->nextidx != iter->numchunks) + elog(ERROR, "missing chunk number %d for toast value %u in %s", + iter->nextidx, + iter->toast_pointer.va_valueid, + RelationGetRelationName(iter->toastrel)); + + /* + * End scan and close relations + */ + systable_endscan_ordered(iter->toastscan); + toast_close_indexes(iter->toastidxs, iter->num_indexes, AccessShareLock); + table_close(iter->toastrel, AccessShareLock); + + iter->done = true; + return; + } + + /* + * Have a chunk, extract the sequence number and the data + */ + toasttupDesc = iter->toastrel->rd_att; + residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull)); + Assert(!isnull); + chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull)); + Assert(!isnull); + if (!VARATT_IS_EXTENDED(chunk)) + { + chunksize = VARSIZE(chunk) - VARHDRSZ; + chunkdata = VARDATA(chunk); + } + else if (VARATT_IS_SHORT(chunk)) + { + /* could happen due to heap_form_tuple doing its thing */ + chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT; + chunkdata = VARDATA_SHORT(chunk); + } + else + { + /* should never happen */ + elog(ERROR, "found toasted toast chunk for toast value %u in %s", + iter->toast_pointer.va_valueid, + RelationGetRelationName(iter->toastrel)); + chunksize = 0; /* keep compiler quiet */ + chunkdata = NULL; + } + + /* + * Some checks on the data we've found + */ + if (residx != iter->nextidx) + elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s", + residx, iter->nextidx, + iter->toast_pointer.va_valueid, + RelationGetRelationName(iter->toastrel)); + if (residx < iter->numchunks - 1) + { + if (chunksize != TOAST_MAX_CHUNK_SIZE) + elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s", + chunksize, (int) TOAST_MAX_CHUNK_SIZE, + residx, iter->numchunks, + iter->toast_pointer.va_valueid, + RelationGetRelationName(iter->toastrel)); + } + else if (residx == iter->numchunks - 1) + { + if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != iter->ressize) + elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s", + chunksize, + (int) (iter->ressize - residx * TOAST_MAX_CHUNK_SIZE), + residx, + iter->toast_pointer.va_valueid, + RelationGetRelationName(iter->toastrel)); + } + else + elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s", + residx, + 0, iter->numchunks - 1, + iter->toast_pointer.va_valueid, + RelationGetRelationName(iter->toastrel)); + + /* + * Copy the data into proper place in our iterator buffer + */ + memcpy(iter->buf->limit, chunkdata, chunksize); + iter->buf->limit += chunksize; + + iter->nextidx++; +} + + +static void +init_toast_buffer(ToastBuffer *buf, int32 size, bool compressed) +{ + buf->buf = (const char *) palloc0(size); + if (compressed) { + SET_VARSIZE_COMPRESSED(buf->buf, size); + /* + * Note the constrain buf->position <= buf->limit may be broken + * at initialization. Make sure that the constrain is satisfied + * when consume chars. + */ + buf->position = VARDATA_4B_C(buf->buf); + } + else + { + SET_VARSIZE(buf->buf, size); + buf->position = VARDATA_4B(buf->buf); + } + buf->limit = VARDATA(buf->buf); + buf->capacity = buf->buf + size; + buf->buf_size = size; +} + + +static bool +free_toast_buffer(ToastBuffer *buf) +{ + if (buf == NULL) + { + return false; + } + + pfree((void *)buf->buf); + pfree(buf); + + return true; +} + + +/* ---------- + * pglz_decompress_iterate - + * + * Decompresses source into dest until the source is exhausted. + * ---------- + */ +static void +pglz_decompress_iterate(ToastBuffer *source, ToastBuffer *dest, DetoastIterator iter) +{ + const unsigned char *sp; + const unsigned char *srcend; + unsigned char *dp; + unsigned char *destend; + + /* + * In the while loop, sp may go beyond the srcend, provides a four-byte + * buffer to prevent sp from reading unallocated bytes from source buffer. + * When source->limit reaches source->capacity, don't worry about reading + * unallocated bytes. + */ + srcend = (const unsigned char *) + (source->limit == source->capacity ? source->limit : (source->limit - 4)); + sp = (const unsigned char *) source->position; + dp = (unsigned char *) dest->limit; + destend = (unsigned char *) dest->capacity; + + while (sp < srcend && dp < destend) + { + /* + * Read one control byte and process the next 8 items (or as many as + * remain in the compressed input). + */ + unsigned char ctrl; + int ctrlc; + + if (iter->ctrlc != INVALID_CTRLC) + { + ctrl = iter->ctrl; + ctrlc = iter->ctrlc; + } + else + { + ctrl = *sp++; + ctrlc = 0; + } + + + for (; ctrlc < 8 && sp < srcend && dp < destend; ctrlc++) + { + + if (ctrl & 1) + { + /* + * Otherwise it contains the match length minus 3 and the + * upper 4 bits of the offset. The next following byte + * contains the lower 8 bits of the offset. If the length is + * coded as 18, another extension tag byte tells how much + * longer the match really was (0-255). + */ + int32 len; + int32 off; + + len = (sp[0] & 0x0f) + 3; + off = ((sp[0] & 0xf0) << 4) | sp[1]; + sp += 2; + if (len == 18) + len += *sp++; + + /* + * Now we copy the bytes specified by the tag from OUTPUT to + * OUTPUT. It is dangerous and platform dependent to use + * memcpy() here, because the copied areas could overlap + * extremely! + */ + len = Min(len, destend - dp); + while (len--) + { + *dp = dp[-off]; + dp++; + } + } + else + { + /* + * An unset control bit means LITERAL BYTE. So we just copy + * one from INPUT to OUTPUT. + */ + *dp++ = *sp++; + } + + /* + * Advance the control bit + */ + ctrl >>= 1; + } + + iter->ctrlc = ctrlc; + iter->ctrl = ctrl; + } + + source->position = (char *) sp; + dest->limit = (char *) dp; +} diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c index fa08b55..cb63891 100644 --- a/src/backend/utils/adt/varlena.c +++ b/src/backend/utils/adt/varlena.c @@ -56,6 +56,8 @@ typedef struct int len1; /* string lengths in bytes */ int len2; + DetoastIterator iter; + /* Skip table for Boyer-Moore-Horspool search algorithm: */ int skiptablemask; /* mask for ANDing with skiptable subscripts */ int skiptable[256]; /* skip distance for given mismatched char */ @@ -122,7 +124,7 @@ static text *text_substring(Datum str, int32 length, bool length_not_specified); static text *text_overlay(text *t1, text *t2, int sp, int sl); -static int text_position(text *t1, text *t2, Oid collid); +static int text_position(text *t1, text *t2, Oid collid, DetoastIterator iter); static void text_position_setup(text *t1, text *t2, Oid collid, TextPositionState *state); static bool text_position_next(TextPositionState *state); static char *text_position_next_internal(char *start_ptr, TextPositionState *state); @@ -1092,10 +1094,22 @@ text_overlay(text *t1, text *t2, int sp, int sl) Datum textpos(PG_FUNCTION_ARGS) { - text *str = PG_GETARG_TEXT_PP(0); + text *str; + DetoastIteratorData iteratorData; + DetoastIterator iter = &iteratorData; text *search_str = PG_GETARG_TEXT_PP(1); - PG_RETURN_INT32((int32) text_position(str, search_str, PG_GET_COLLATION())); + if (init_detoast_iterator((struct varlena *) (DatumGetPointer(PG_GETARG_DATUM(0))), iter)) + { + str = (text *) iter->buf->buf; + } + else + { + str = PG_GETARG_TEXT_PP(0); + iter = NULL; + } + + PG_RETURN_INT32((int32) text_position(str, search_str, PG_GET_COLLATION(), iter)); } /* @@ -1113,7 +1127,7 @@ textpos(PG_FUNCTION_ARGS) * functions. */ static int -text_position(text *t1, text *t2, Oid collid) +text_position(text *t1, text *t2, Oid collid, DetoastIterator iter) { TextPositionState state; int result; @@ -1122,6 +1136,7 @@ text_position(text *t1, text *t2, Oid collid) return 0; text_position_setup(t1, t2, collid, &state); + state.iter = iter; if (!text_position_next(&state)) result = 0; else @@ -1130,7 +1145,6 @@ text_position(text *t1, text *t2, Oid collid) return result; } - /* * text_position_setup, text_position_next, text_position_cleanup - * Component steps of text_position() @@ -1196,6 +1210,7 @@ text_position_setup(text *t1, text *t2, Oid collid, TextPositionState *state) state->str2 = VARDATA_ANY(t2); state->len1 = len1; state->len2 = len2; + state->iter = NULL; state->last_match = NULL; state->refpoint = state->str1; state->refpos = 0; @@ -1358,6 +1373,11 @@ text_position_next_internal(char *start_ptr, TextPositionState *state) hptr = start_ptr; while (hptr < haystack_end) { + if (state->iter != NULL) + { + PG_DETOAST_ITERATE(state->iter, hptr); + } + if (*hptr == nchar) return (char *) hptr; hptr++; @@ -1375,6 +1395,11 @@ text_position_next_internal(char *start_ptr, TextPositionState *state) const char *nptr; const char *p; + if (state->iter != NULL) + { + PG_DETOAST_ITERATE(state->iter, hptr); + } + nptr = needle_last; p = hptr; while (*nptr == *p) @@ -1438,7 +1463,10 @@ text_position_get_match_pos(TextPositionState *state) static void text_position_cleanup(TextPositionState *state) { - /* no cleanup needed */ + if (state->iter != NULL) + { + free_detoast_iterator(state->iter); + } } static void diff --git a/src/include/access/tuptoaster.h b/src/include/access/tuptoaster.h index f0aea24..79fd4c4 100644 --- a/src/include/access/tuptoaster.h +++ b/src/include/access/tuptoaster.h @@ -17,6 +17,103 @@ #include "storage/lockdefs.h" #include "utils/relcache.h" +#ifndef FRONTEND +#include "access/genam.h" + +/* + * TOAST buffer is a producer consumer buffer. + * + * +--+--+--+--+--+--+--+--+--+--+--+--+--+ + * | | | | | | | | | | | | | | + * +--+--+--+--+--+--+--+--+--+--+--+--+--+ + * ^ ^ ^ ^ + * buf position limit capacity + * + * buf: point to the start of buffer. + * position: point to the next char to be consumed. + * limit: point to the next char to be produced. + * capacity: point to the end of buffer. + * + * Constrains that need to be satisfied: + * buf <= position <= limit <= capacity + */ +typedef struct ToastBuffer +{ + const char *buf; + const char *position; + char *limit; + const char *capacity; + int32 buf_size; +} ToastBuffer; + + +typedef struct FetchDatumIteratorData +{ + ToastBuffer *buf; + Relation toastrel; + Relation *toastidxs; + SysScanDesc toastscan; + ScanKeyData toastkey; + SnapshotData SnapshotToast; + struct varatt_external toast_pointer; + int32 ressize; + int32 nextidx; + int32 numchunks; + int num_indexes; + bool done; +} FetchDatumIteratorData; + +typedef struct FetchDatumIteratorData *FetchDatumIterator; + +/* + * If "ctrlc" field in iterator is equal to INVALID_CTRLC, it means that + * the field is invalid and need to read the control byte from the + * source buffer in the next iteration, see pglz_decompress_iterate(). + */ +#define INVALID_CTRLC 8 + +typedef struct DetoastIteratorData +{ + ToastBuffer *buf; + FetchDatumIterator fetch_datum_iterator; + unsigned char ctrl; + int ctrlc; + bool compressed; /* toast value is compressed? */ + bool done; +} DetoastIteratorData; + +typedef struct DetoastIteratorData *DetoastIterator; + +/* ---------- + * init_detoast_iterator - + * + * Initialize de-TOAST iterator. + * ---------- + */ +extern bool init_detoast_iterator(struct varlena *attr, DetoastIterator iterator); + +/* ---------- + * free_detoast_iterator - + * + * Free the memory space occupied by the de-TOAST iterator. + * ---------- + */ +extern bool free_detoast_iterator(DetoastIterator iter); + +/* ---------- + * detoast_iterate - + * + * Iterate through the toasted value referenced by iterator. + * + * As long as there is another slice in compression or external storage, + * de-TOAST it into buffer in iterator. + * ---------- + */ +extern void detoast_iterate(DetoastIterator iter); + +#endif + + /* * This enables de-toasting of index entries. Needed until VACUUM is * smart enough to rebuild indexes from scratch. diff --git a/src/include/fmgr.h b/src/include/fmgr.h index 3ff0999..446c880 100644 --- a/src/include/fmgr.h +++ b/src/include/fmgr.h @@ -239,6 +239,13 @@ extern struct varlena *pg_detoast_datum_packed(struct varlena *datum); #define PG_DETOAST_DATUM_SLICE(datum,f,c) \ pg_detoast_datum_slice((struct varlena *) DatumGetPointer(datum), \ (int32) (f), (int32) (c)) +#define PG_DETOAST_ITERATE(iter, need) \ + do { \ + Assert(need >= iter->buf->buf && need <= iter->buf->capacity); \ + while (!iter->done && need >= iter->buf->limit) { \ + detoast_iterate(iter); \ + } \ + } while (0) /* WARNING -- unaligned pointer */ #define PG_DETOAST_DATUM_PACKED(datum) \ pg_detoast_datum_packed((struct varlena *) DatumGetPointer(datum)) -- 2.7.4