On 02/07/2018 09:02 PM, Markus Nullmeier wrote:
One general comment I can already make is that enabling compression should be made optional, which appears to be a small and easy addition to the generic WAL API.
The new version of the patch is attached. In order to control generic WAL compression the new structure PageXLogCompressParams is introduced. It is passed as an additional parameter into GenericXLogRegisterBufferEx, so the access method can control its own WAL compression. GenericXLogRegisterBuffer uses default compression settings which appeared to be a reasonable tradeoff between performance overheads and compression rate on RUM. On my HDD PostgreSQL works even 10% faster for some RUM workloads because of reducing size of generic WAL to be written. Oleg Ivanov Postgres Professional The Russian PostgreSQL Company
diff --git a/src/backend/access/transam/generic_xlog.c b/src/backend/access/transam/generic_xlog.c index ce02354..fa3cd4e 100644 --- a/src/backend/access/transam/generic_xlog.c +++ b/src/backend/access/transam/generic_xlog.c @@ -43,19 +43,41 @@ * a full page's worth of data. *------------------------------------------------------------------------- */ + +#define MAX_ALIGN_MISMATCHES 255 +/* MAX_ALIGN_MISMATCHES is not supposed to be greater than PG_UINT8_MAX */ +#if MAX_ALIGN_MISMATCHES > PG_UINT8_MAX +#error "MAX_ALIGN_MISMATCHES must be not greater than PG_UINT8_MAX" +#endif + #define FRAGMENT_HEADER_SIZE (2 * sizeof(OffsetNumber)) +#define REGION_HEADER_SIZE (sizeof(char) + sizeof(int)) +#define DIFF_DELTA_HEADER_SIZE (sizeof(char) + 2 * sizeof(OffsetNumber)) +#define MISMATCH_HEADER_SIZE (sizeof(char) + sizeof(uint8) + \ + sizeof(OffsetNumber)) #define MATCH_THRESHOLD FRAGMENT_HEADER_SIZE -#define MAX_DELTA_SIZE (BLCKSZ + 2 * FRAGMENT_HEADER_SIZE) +#define MAX_DELTA_SIZE (BLCKSZ + \ + 2 * REGION_HEADER_SIZE + \ + 2 * FRAGMENT_HEADER_SIZE + \ + 2 * DIFF_DELTA_HEADER_SIZE + \ + MAX_ALIGN_MISMATCHES * MISMATCH_HEADER_SIZE \ + + sizeof(bool)) + +#define writeToPtr(ptr, x) memcpy(ptr, &(x), sizeof(x)), ptr += sizeof(x) +#define readFromPtr(ptr, x) memcpy(&(x), ptr, sizeof(x)), ptr += sizeof(x) /* Struct of generic xlog data for single page */ typedef struct { Buffer buffer; /* registered buffer */ int flags; /* flags for this buffer */ + int deltaLen; /* space consumed in delta field */ char *image; /* copy of page image for modification, do not * do it in-place to have aligned memory chunk */ char delta[MAX_DELTA_SIZE]; /* delta between page images */ + + PageXLogCompressParams compressParams; /* compress parameters */ } PageData; /* State of generic xlog record construction */ @@ -71,15 +93,77 @@ struct GenericXLogState bool isLogged; }; +/* Describes for the region which type of delta is used in it */ +typedef enum +{ + DIFF_DELTA = 0, /* diff delta with insert, remove and replace + * operations */ + BASE_DELTA = 1 /* base delta with update operations only */ +} DeltaType; + +/* Diff delta operations for transforming current page to target page */ +typedef enum +{ + DIFF_INSERT = 0, + DIFF_REMOVE = 1, + DIFF_REPLACE = 2 +} DiffDeltaOperations; + +/* Describes the kind of region of the page */ +typedef enum +{ + UPPER_REGION = 0, + LOWER_REGION = 1 +} RegionType; + static void writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber len, const char *data); static void computeRegionDelta(PageData *pageData, const char *curpage, const char *targetpage, int targetStart, int targetEnd, - int validStart, int validEnd); + int validStart, int validEnd, + uint8 maxMismatches); static void computeDelta(PageData *pageData, Page curpage, Page targetpage); static void applyPageRedo(Page page, const char *delta, Size deltaSize); +static int alignRegions(const char *curRegion, const char *targetRegion, + int curRegionLen, int targetRegionLen, uint8 maxMismatches); +static int restoreCompactAlignment(const char *curRegion, + const char *targetRegion, + int curRegionLen, + int targetRegionLen, + int numMismatches); + +static bool computeRegionDiffDelta(PageData *pageData, + const char *curpage, const char *targetpage, + int targetStart, int targetEnd, + int validStart, int validEnd, + uint8 maxMismatches); +static const char *applyPageDiffRedo(Page page, const char *delta, Size deltaSize); + +static void computeRegionBaseDelta(PageData *pageData, + const char *curpage, const char *targetpage, + int targetStart, int targetEnd, + int validStart, int validEnd); +static const char *applyPageBaseRedo(Page page, const char *delta, Size deltaSize); + +static bool pageDataContainsDiffDelta(PageData *pageData); +static void downgradeDeltaToBaseFormat(PageData *pageData); + +/* Arrays for the alignment building and for the resulting alignments */ +static int V[MAX_ALIGN_MISMATCHES + 1][2 * MAX_ALIGN_MISMATCHES + 1]; +static int prevDiag[MAX_ALIGN_MISMATCHES + 1][2 * MAX_ALIGN_MISMATCHES + 1]; +static int alignmentDiag[MAX_ALIGN_MISMATCHES + 1]; +static char curRegionAligned[MAX_ALIGN_MISMATCHES]; +static bool curRegionAlignedGap[MAX_ALIGN_MISMATCHES]; +static char targetRegionAligned[MAX_ALIGN_MISMATCHES]; +static bool targetRegionAlignedGap[MAX_ALIGN_MISMATCHES]; +static int curRegionAlignedPos[MAX_ALIGN_MISMATCHES]; +static int targetRegionAlignedPos[MAX_ALIGN_MISMATCHES]; + +/* Array for diff delta application */ +static char localPage[BLCKSZ]; + /* * Write next fragment into pageData's delta. @@ -115,14 +199,82 @@ writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber length, * Bytes in curpage outside the range validStart to validEnd-1 should be * considered invalid, and always overwritten with target data. * - * This function is a hot spot, so it's worth being as tense as possible - * about the data-matching loops. + * If forceBaseDelta is true, fucntion just calls computeRegionBaseDelta. + * Otherwise this function tries to build diff delta first and, if it fails, + * uses the base delta. It also provides the header before the delta in which + * the type and the length of the delta are contained. */ static void computeRegionDelta(PageData *pageData, const char *curpage, const char *targetpage, int targetStart, int targetEnd, - int validStart, int validEnd) + int validStart, int validEnd, + uint8 maxMismatches) +{ + int length; + char header; + int prevDeltaLen; + bool diff = false; + char *ptr = pageData->delta + pageData->deltaLen; + + /* Verify we have enough space */ + Assert(pageData->deltaLen + sizeof(header) + + sizeof(length) <= MAX_DELTA_SIZE); + + pageData->deltaLen += sizeof(header) + sizeof(length); + prevDeltaLen = pageData->deltaLen; + + /* Not sure what to do with too big maxMismathes. Now we just clip it. */ + if (maxMismatches > MAX_ALIGN_MISMATCHES) + maxMismatches = MAX_ALIGN_MISMATCHES; + + /* Try building diff delta only if necessary */ + if (maxMismatches > 0) + { + diff = computeRegionDiffDelta(pageData, + curpage, targetpage, + targetStart, targetEnd, + validStart, validEnd, + maxMismatches); + } + + /* + * If we succeeded to make diff delta, just set the header. Otherwise, + * make base delta. + */ + if (diff) + { + header = DIFF_DELTA; + } + else + { + header = BASE_DELTA; + computeRegionBaseDelta(pageData, + curpage, targetpage, + targetStart, targetEnd, + validStart, validEnd); + } + length = pageData->deltaLen - prevDeltaLen; + + writeToPtr(ptr, header); + writeToPtr(ptr, length); +} + +/* + * Compute the XLOG fragments needed to transform a region of curpage into the + * corresponding region of targetpage, and append them to pageData's delta + * field. The region to transform runs from targetStart to targetEnd-1. + * Bytes in curpage outside the range validStart to validEnd-1 should be + * considered invalid, and always overwritten with target data. + * + * This function is a hot spot, so it's worth being as tense as possible + * about the data-matching loops. + */ +static void +computeRegionBaseDelta(PageData *pageData, + const char *curpage, const char *targetpage, + int targetStart, int targetEnd, + int validStart, int validEnd) { int i, loopEnd, @@ -222,27 +374,508 @@ computeRegionDelta(PageData *pageData, } /* + * Align curRegion and targetRegion and return the number of mismatches + * or -1 if the alignment with number of mismatching positions less than + * or equal to maxMismatches does not exist. + * The algorithm guarantees to find the alignment with the least possible + * number of mismathing positions or return that such least number is greater + * than maxMismatches. + * + * For a good introduction to the subject, read about the "Levenshtein + * distance" in Wikipedia. + * + * The basic algorithm is described in: + * "An O(ND) Difference Algorithm and its Variations", Eugene W. Myers, + * Algorithmica Vol. 1, 1986, pp. 251-266, + * <http://dx.doi.org/10.1007/BF01840446>, + * PDF: <http://mail.xmailserver.net/diff2.pdf>. + * See especially section 3, which describes the variation used below. + * + * This variation requires O(N + D ^ 2) memory and has time complexity O(ND). + * We choose it because it is faster than described in section 4.2 modification + * with O(N + D) memory requirement. + * + * The only modification we made to the original algorithm is the introduction + * of REPLACE operations, while in the original algorithm only INSERT and + * REMOVE are considered. This introduction doesn't affect time and memory + * complexity of the algorithm. + */ +static int +alignRegions(const char *curRegion, const char *targetRegion, + int curRegionLen, int targetRegionLen, + uint8 maxMismatches) +{ + /* Number of mismatches */ + int m; + + /* Difference between curRegion and targetRegion prefix lengths */ + int k; + + /* Curbuf and targetRegion prefix lengths */ + int i, + j; + + /* Number of mismathes in the answer */ + int numMismatches = -1; + + /* + * If lengths differ too much, there is no alignment with a small number + * of mismatches. + */ + if (!(-maxMismatches < curRegionLen - targetRegionLen && + curRegionLen - targetRegionLen < maxMismatches)) + return -1; + + /* + * V is an array to store the values of dynamic programming. The first + * dimension corresponds to m, i. e. to the number of performed editing + * operations, and the second one is for k + m, where k is the number of a + * diagonal. A diagonal numbered k is such points (i, j) where i - j = k. + * Here i means the length of curRegion prefix and j means the length of + * targetRegion prefix. V[m][m + k] is the length of the longest prefix of + * curRegion i which can be aligned with the prefix of length j of + * targetRegion using m editing operations. In the loop below we + * initialize V[0][0] and then compute V[m][m + k] based on, if defined, + * V[m - 1][m + k - 1], V[m - 1][m + k], and V[m - 1][m + k + 1]. V[m][m + + * k] is undefined if k < -m or k > m. + */ + V[0][0] = 0; + + /* Find the longest path with the given number of mismatches */ + for (m = 0; m <= maxMismatches; ++m) + { + /* + * Find the largest prefix alignment with the given number of + * mismatches and the given diagonal, i. e. difference between + * curRegion and targetRegion prefix lengths. + */ + for (k = -m; k <= m; ++k) + { + /* Dynamic programming recurrent step */ + if (m > 0) + { + i = -1; + if (k != -m && k != m && + V[m - 1][m - 1 + k] + 1 > i) + { + i = V[m - 1][m - 1 + k] + 1; + prevDiag[m][m + k] = k; + } + if (k != -m && k != -m + 1 && + V[m - 1][m - 1 + k - 1] + 1 > i) + { + i = V[m - 1][m - 1 + k - 1] + 1; + prevDiag[m][m + k] = k - 1; + } + if (k != m && k != m - 1 && + V[m - 1][m - 1 + k + 1] > i) + { + i = V[m - 1][m - 1 + k + 1]; + prevDiag[m][m + k] = k + 1; + } + } + else + i = 0; + j = i - k; + + /* Boundary checks */ + Assert(i >= 0); + Assert(j >= 0); + + /* Increase the prefixes while the bytes are equal */ + while (i < curRegionLen && j < targetRegionLen && + curRegion[i] == targetRegion[j]) + i++, j++; + + /* + * Save the largest curRegion prefix that was aligned with given + * number of mismatches and difference between curRegion and + * targetRegion prefix lengths. + */ + V[m][m + k] = i; + + /* If we find the alignment, save its length and break */ + if (i == curRegionLen && j == targetRegionLen) + { + numMismatches = m; + break; + } + } + /* Break if we find an alignment */ + if (numMismatches != -1) + break; + } + /* No alignment was found */ + if (numMismatches == -1) + return -1; + + /* + * Restore the path k-s for each iteration of the main loop for the found + * alignment. + */ + Assert(m >= 0); + while (m != 0) + { + Assert(-m <= k && k <= m); + alignmentDiag[m] = k; + k = prevDiag[m][m + k]; + m--; + } + Assert(k == 0); + alignmentDiag[0] = 0; + + return numMismatches; +} + +/* + * Restore the mismathcing parts of the alignment based on V, alignmentDiag, + * and numMismacthes. Do some alignment assertions also. + * + * The compressed alignment is stored in curRegionAligned, targetRegionAligned, + * curRegionAlignedGap, targetRegionAlignedGap, curRegionAlignedPos, and + * targetRegionAlignedPos. The first two arrays contain the aligned data, + * the second two contain the map of align gaps in the first two arrays, + * and the last two arrays contain the positions of the aligned parts in + * the original arrays. + */ +int +restoreCompactAlignment(const char *curRegion, const char *targetRegion, + int curRegionLen, int targetRegionLen, + int numMismatches) +{ + int i, + j, + k, + m; + + /* The length of the equal block */ + int curLen = 0; + + /* Result alignment length */ + int resLen = 0; + + /* Maximal possible result alignment length */ + int maxResLen = Min(curRegionLen, targetRegionLen) + numMismatches; + + /* Keep compiler quiet */ + if (curLen != 0) + curLen = maxResLen; + + /* Check whether the first equal block is computed correctly */ + curLen = V[0][0]; + Assert(curLen >= 0); + Assert(resLen + curLen <= maxResLen); + Assert(memcmp(curRegion, targetRegion, curLen) == 0); + + /* Restore the alignment */ + for (m = 1; m <= numMismatches; ++m) + { + /* Initialize the variables for the block */ + int dk = alignmentDiag[m] - alignmentDiag[m - 1]; + int prevDiag = alignmentDiag[m - 1]; + + k = alignmentDiag[m]; + i = V[m - 1][m - 1 + prevDiag]; + j = i - prevDiag; + /* Check state consistency */ + Assert(0 <= i && i <= curRegionLen); + Assert(0 <= j && j <= targetRegionLen); + + /* Check the block operation correctness */ + Assert(dk == -1 || i < curRegionLen); + Assert(dk == 1 || j < targetRegionLen); + Assert(-1 <= dk && dk <= 1); + Assert(resLen + 1 <= maxResLen); + + /* Do the alignment operation of the block */ + curRegionAlignedPos[resLen] = i; + targetRegionAlignedPos[resLen] = j; + if (dk == 1 || dk == 0) + { + curRegionAlignedGap[resLen] = false; + curRegionAligned[resLen] = curRegion[i++]; + } + else + curRegionAlignedGap[resLen] = true; + if (dk == 0 || dk == -1) + { + targetRegionAlignedGap[resLen] = false; + targetRegionAligned[resLen] = targetRegion[j++]; + } + else + targetRegionAlignedGap[resLen] = true; + resLen++; + + /* Compute the size of the equal part of the block */ + curLen = V[m][m + k] - i; + + /* Check whether the equal part of the block is computed correctly */ + Assert(curLen >= 0); + Assert(resLen + curLen <= maxResLen); + Assert(memcmp(&curRegion[i], &targetRegion[j], curLen) == 0); + } + + return resLen; +} + +/* + * Try to build a short alignment in order to produce a short diff delta. + * If fails, return false, otherwise return true and write the delta to + * pageData->delta. + */ +static bool +computeRegionDiffDelta(PageData *pageData, + const char *curpage, const char *targetpage, + int targetStart, int targetEnd, + int validStart, int validEnd, + uint8 maxMismatches) +{ + char *ptr = pageData->delta + pageData->deltaLen; + int i, + j; + char type; + uint8 len; + OffsetNumber start; + OffsetNumber tmp; + + int numMismatches; + int alignmentLength; + + int curRegionLen = validEnd - validStart; + int targetRegionLen = targetEnd - targetStart; + + numMismatches = alignRegions(&curpage[validStart], + &targetpage[targetStart], + curRegionLen, + targetRegionLen, + maxMismatches); + Assert(numMismatches <= maxMismatches); + + /* If no proper alignment was found return false */ + if (numMismatches < 0) + return false; + + /* Restore the alignment in a compact form */ + alignmentLength = restoreCompactAlignment(&curpage[validStart], + &targetpage[targetStart], + curRegionLen, + targetRegionLen, + numMismatches); + + /* + * Translate the alignment into the set of instructions for transformation + * from curRegion into targetRegion, and write these instructions into + * pageData->delta. + */ + + /* Verify we have enough space */ + Assert(pageData->deltaLen + sizeof(type) + 2 * sizeof(tmp) <= MAX_DELTA_SIZE); + + /* Check whether the region is the upper or the lower part of the page */ + Assert((validStart == 0 && targetStart == 0) || + (validEnd == BLCKSZ && targetEnd == BLCKSZ)); + + /* Write start and end indexes of the buffers */ + if (validStart == 0 && targetStart == 0) + { + /* We are in the upper part, there is no need to store Starts */ + type = UPPER_REGION; + writeToPtr(ptr, type); + tmp = validEnd; + writeToPtr(ptr, tmp); + tmp = targetEnd; + writeToPtr(ptr, tmp); + } + else + { + /* We are in the lower part, there is no need to store Ends */ + type = LOWER_REGION; + writeToPtr(ptr, type); + tmp = validStart; + writeToPtr(ptr, tmp); + tmp = targetStart; + writeToPtr(ptr, tmp); + } + + /* Transform the alignment into the set of instructions */ + for (i = 0; i < alignmentLength; ++i) + { + /* Verify the alignment */ + Assert(!curRegionAlignedGap[i] || !targetRegionAlignedGap[i]); + Assert(curRegionAligned[i] != targetRegionAligned[i] || + curRegionAlignedGap[i] || targetRegionAlignedGap[i]); + + /* Determine the type of the instruction */ + if (curRegionAlignedGap[i]) + type = DIFF_INSERT; + else if (targetRegionAlignedGap[i]) + type = DIFF_REMOVE; + else + type = DIFF_REPLACE; + + /* Find the end of the block of the same instructions */ + j = i + 1; + while (j < alignmentLength) + { + bool sameBlock; + + sameBlock = ( + (curRegionAlignedPos[j] <= + curRegionAlignedPos[j - 1] + 1) && + (targetRegionAlignedPos[j] <= + targetRegionAlignedPos[j - 1] + 1) + ); + + switch (type) + { + case DIFF_INSERT: + sameBlock &= (curRegionAlignedGap[j]); + break; + case DIFF_REMOVE: + sameBlock &= (targetRegionAlignedGap[j]); + break; + case DIFF_REPLACE: + sameBlock &= (!curRegionAlignedGap[j] && + !targetRegionAlignedGap[j] && + (curRegionAligned[j] != + targetRegionAligned[j])); + break; + default: + elog(ERROR, "unrecognized delta operation type: %d", type); + break; + } + if (sameBlock) + j++; + else + break; + } + len = j - i; + + start = curRegionAlignedPos[i]; + /* Verify we have enough space */ + Assert(pageData->deltaLen + sizeof(type) + + sizeof(len) + sizeof(start) <= MAX_DELTA_SIZE); + /* Write the header for instruction */ + writeToPtr(ptr, type); + writeToPtr(ptr, len); + writeToPtr(ptr, start); + + /* Write instruction data and go to the end of the block */ + if (type != DIFF_REMOVE) + { + /* Verify we have enough space */ + Assert(pageData->deltaLen + len <= MAX_DELTA_SIZE); + while (i < j) + { + char c = targetRegionAligned[i++]; + + writeToPtr(ptr, c); + } + } + else + i = j; + i--; + } + + pageData->deltaLen = ptr - pageData->delta; + + return true; +} + +/* + * Return whether pageData->delta contains diff deltas or not. + */ +static bool +pageDataContainsDiffDelta(PageData *pageData) +{ + char *ptr = pageData->delta + sizeof(bool); + char *end = pageData->delta + pageData->deltaLen; + char header; + int length; + + while (ptr < end) + { + readFromPtr(ptr, header); + readFromPtr(ptr, length); + + if (header == DIFF_DELTA) + return true; + ptr += length; + } + return false; +} + +/* + * Downgrade pageData->delta to base delta format. + * + * Only base diffs are allowed to perform the transformation. + */ +static void +downgradeDeltaToBaseFormat(PageData *pageData) +{ + char *ptr = pageData->delta; + char *end = pageData->delta + pageData->deltaLen; + char *cur; + bool containsDiffDelta; + char header; + int length; + int newDeltaLength = 0; + + /* Check whether containsDiffDelta is false */ + readFromPtr(ptr, containsDiffDelta); + Assert(!containsDiffDelta); + + cur = ptr; + while (ptr < end) + { + readFromPtr(ptr, header); + readFromPtr(ptr, length); + + /* Check whether the region delta is base delta */ + Assert(header == BASE_DELTA); + newDeltaLength += length; + + memmove(cur, ptr, length); + cur += length; + ptr += length; + } + pageData->deltaLen = newDeltaLength; +} + +/* * Compute the XLOG delta record needed to transform curpage into targetpage, * and store it in pageData's delta field. */ static void computeDelta(PageData *pageData, Page curpage, Page targetpage) { + bool *containsDiffDelta = pageData->delta; int targetLower = ((PageHeader) targetpage)->pd_lower, targetUpper = ((PageHeader) targetpage)->pd_upper, curLower = ((PageHeader) curpage)->pd_lower, curUpper = ((PageHeader) curpage)->pd_upper; - pageData->deltaLen = 0; + pageData->deltaLen = sizeof(bool); /* Compute delta records for lower part of page ... */ computeRegionDelta(pageData, curpage, targetpage, 0, targetLower, - 0, curLower); + 0, curLower, + pageData->compressParams.lowerMaxMismatches); /* ... and for upper part, ignoring what's between */ computeRegionDelta(pageData, curpage, targetpage, targetUpper, BLCKSZ, - curUpper, BLCKSZ); + curUpper, BLCKSZ, + pageData->compressParams.upperMaxMismatches); + + /* + * Set first byte to true if at least one of the region deltas is diff + * delta. Otherwise set first byte to false and downgrade all regions to + * base format without extra headers. + */ + *containsDiffDelta = pageDataContainsDiffDelta(pageData); + if (!(*containsDiffDelta)) + downgradeDeltaToBaseFormat(pageData); /* * If xlog debug is enabled, then check produced delta. Result of delta @@ -291,11 +924,13 @@ GenericXLogStart(Relation relation) * is what the caller should modify. * * If the buffer is already registered, just return its existing entry. - * (It's not very clear what to do with the flags in such a case, but - * for now we stay with the original flags.) + * (It's not very clear what to do with the flags and parameters in such + * a case, but for now we stay with the original flags and parameters.) */ Page -GenericXLogRegisterBuffer(GenericXLogState *state, Buffer buffer, int flags) +GenericXLogRegisterBufferEx(GenericXLogState *state, + Buffer buffer, int flags, + PageXLogCompressParams compressParams) { int block_id; @@ -309,6 +944,7 @@ GenericXLogRegisterBuffer(GenericXLogState *state, Buffer buffer, int flags) /* Empty slot, so use it (there cannot be a match later) */ page->buffer = buffer; page->flags = flags; + page->compressParams = compressParams; memcpy(page->image, BufferGetPage(buffer), BLCKSZ); return (Page) page->image; } @@ -329,6 +965,22 @@ GenericXLogRegisterBuffer(GenericXLogState *state, Buffer buffer, int flags) } /* + * An alias for GenericXLogRegisterBufferEx with default parameters + * which are optimal for Bloom and RUM indexes. + * Left for backward compatibility. + */ +Page +GenericXLogRegisterBuffer(GenericXLogState *state, Buffer buffer, int flags) +{ + PageXLogCompressParams defaultParameters; + + defaultParameters.upperMaxMismatches = 0; + defaultParameters.lowerMaxMismatches = 24; + return GenericXLogRegisterBufferEx(state, buffer, flags, + defaultParameters); +} + +/* * Apply changes represented by GenericXLogState to the actual buffers, * and emit a generic xlog record. */ @@ -451,12 +1103,58 @@ GenericXLogAbort(GenericXLogState *state) /* * Apply delta to given page image. + * + * Read blocks of instructions and apply them based on their type: + * BASE_DELTA or DIFF_DELTA. */ static void applyPageRedo(Page page, const char *delta, Size deltaSize) { const char *ptr = delta; const char *end = delta + deltaSize; + char header; + int length; + bool containsDiffDelta; + + /* If page delta is base delta, apply it. */ + readFromPtr(ptr, containsDiffDelta); + if (!containsDiffDelta) + { + applyPageBaseRedo(page, ptr, end - ptr); + return; + } + + /* Otherwise apply each region delta. */ + while (ptr < end) + { + readFromPtr(ptr, header); + readFromPtr(ptr, length); + + switch (header) + { + case DIFF_DELTA: + ptr = applyPageDiffRedo(page, ptr, length); + break; + case BASE_DELTA: + ptr = applyPageBaseRedo(page, ptr, length); + break; + default: + elog(ERROR, + "unrecognized delta type: %d", + header); + break; + } + } +} + +/* + * Apply base delta to given page image. + */ +static const char * +applyPageBaseRedo(Page page, const char *delta, Size deltaSize) +{ + const char *ptr = delta; + const char *end = delta + deltaSize; while (ptr < end) { @@ -472,6 +1170,96 @@ applyPageRedo(Page page, const char *delta, Size deltaSize) ptr += length; } + return ptr; +} + +/* + * Apply diff delta to given page image. + */ +static const char * +applyPageDiffRedo(Page page, const char *delta, Size deltaSize) +{ + const char *ptr = delta; + const char *end = delta + deltaSize; + char type; + uint8 len; + OffsetNumber targetStart, + targetEnd; + OffsetNumber validStart, + validEnd; + int i, + j; + OffsetNumber start; + + /* Read start and end indexes of the buffers */ + validStart = 0; + validEnd = BLCKSZ; + targetStart = 0; + targetEnd = BLCKSZ; + readFromPtr(ptr, type); + switch (type) + { + case UPPER_REGION: + readFromPtr(ptr, validEnd); + readFromPtr(ptr, targetEnd); + break; + case LOWER_REGION: + readFromPtr(ptr, validStart); + readFromPtr(ptr, targetStart); + break; + default: + elog(ERROR, + "unrecognized region type: %d", + type); + break; + } + + /* Read and apply the instructions */ + i = 0, j = 0; + while (ptr < end) + { + /* Read the header of the current instruction */ + readFromPtr(ptr, type); + readFromPtr(ptr, len); + readFromPtr(ptr, start); + + /* Copy the data before current instruction to buffer */ + memcpy(&localPage[j], page + validStart + i, start - i); + j += start - i; + i = start; + + /* Apply the instruction */ + switch (type) + { + case DIFF_INSERT: + memcpy(&localPage[j], ptr, len); + ptr += len; + j += len; + break; + case DIFF_REMOVE: + i += len; + break; + case DIFF_REPLACE: + memcpy(&localPage[j], ptr, len); + i += len; + j += len; + ptr += len; + break; + default: + elog(ERROR, + "unrecognized delta instruction type: %d", + type); + break; + } + } + + /* Copy the data after the last instruction */ + memcpy(&localPage[j], page + validStart + i, validEnd - validStart - i); + j += validEnd - validStart - i; + i = validEnd - validStart; + + memcpy(page + targetStart, localPage, j); + return ptr; } /* diff --git a/src/include/access/generic_xlog.h b/src/include/access/generic_xlog.h index b23e1f6..a8e406b 100644 --- a/src/include/access/generic_xlog.h +++ b/src/include/access/generic_xlog.h @@ -25,6 +25,17 @@ /* Flag bits for GenericXLogRegisterBuffer */ #define GENERIC_XLOG_FULL_IMAGE 0x0001 /* write full-page image */ +/* Struct of generic xlog compression parameters for a single page */ +typedef struct +{ + /* + * maximal number of mismatches for diff delta for the upper part of the + * page. Zero value disables diff delta for the part + */ + uint8 upperMaxMismatches; + uint8 lowerMaxMismatches; /* the same for the lower part */ +} PageXLogCompressParams; + /* state of generic xlog record construction */ struct GenericXLogState; typedef struct GenericXLogState GenericXLogState; @@ -33,6 +44,8 @@ typedef struct GenericXLogState GenericXLogState; extern GenericXLogState *GenericXLogStart(Relation relation); extern Page GenericXLogRegisterBuffer(GenericXLogState *state, Buffer buffer, int flags); +extern Page GenericXLogRegisterBufferEx(GenericXLogState *state, Buffer buffer, + int flags, PageXLogCompressParams compressParams); extern XLogRecPtr GenericXLogFinish(GenericXLogState *state); extern void GenericXLogAbort(GenericXLogState *state);
# ----------------------------- # PostgreSQL configuration file # ----------------------------- # # This file consists of lines of the form: # # name = value # # (The "=" is optional.) Whitespace may be used. Comments are introduced with # "#" anywhere on a line. The complete list of parameter names and allowed # values can be found in the PostgreSQL documentation. # # The commented-out settings shown in this file represent the default values. # Re-commenting a setting is NOT sufficient to revert it to the default value; # you need to reload the server. # # This file is read on server startup and when the server receives a SIGHUP # signal. If you edit the file on a running system, you have to SIGHUP the # server for the changes to take effect, run "pg_ctl reload", or execute # "SELECT pg_reload_conf()". Some parameters, which are marked below, # require a server shutdown and restart to take effect. # # Any parameter can also be given as a command-line option to the server, e.g., # "postgres -c log_connections=on". Some parameters can be changed at run time # with the "SET" SQL command. # # Memory units: kB = kilobytes Time units: ms = milliseconds # MB = megabytes s = seconds # GB = gigabytes min = minutes # TB = terabytes h = hours # d = days #------------------------------------------------------------------------------ # FILE LOCATIONS #------------------------------------------------------------------------------ # The default values of these variables are driven from the -D command-line # option or PGDATA environment variable, represented here as ConfigDir. #data_directory = 'ConfigDir' # use data in another directory # (change requires restart) #hba_file = 'ConfigDir/pg_hba.conf' # host-based authentication file # (change requires restart) #ident_file = 'ConfigDir/pg_ident.conf' # ident configuration file # (change requires restart) # If external_pid_file is not explicitly set, no extra PID file is written. #external_pid_file = '' # write an extra PID file # (change requires restart) #------------------------------------------------------------------------------ # CONNECTIONS AND AUTHENTICATION #------------------------------------------------------------------------------ # - Connection Settings - #listen_addresses = 'localhost' # what IP address(es) to listen on; # comma-separated list of addresses; # defaults to 'localhost'; use '*' for all # (change requires restart) #port = 5845 # (change requires restart) max_connections = 100 # (change requires restart) #superuser_reserved_connections = 3 # (change requires restart) #unix_socket_directories = '/tmp' # comma-separated list of directories # (change requires restart) #unix_socket_group = '' # (change requires restart) #unix_socket_permissions = 0777 # begin with 0 to use octal notation # (change requires restart) #bonjour = off # advertise server via Bonjour # (change requires restart) #bonjour_name = '' # defaults to the computer name # (change requires restart) # - Security and Authentication - #authentication_timeout = 1min # 1s-600s #ssl = off #ssl_ciphers = 'HIGH:MEDIUM:+3DES:!aNULL' # allowed SSL ciphers #ssl_prefer_server_ciphers = on #ssl_ecdh_curve = 'prime256v1' #ssl_dh_params_file = '' #ssl_cert_file = 'server.crt' #ssl_key_file = 'server.key' #ssl_ca_file = '' #ssl_crl_file = '' #password_encryption = md5 # md5 or scram-sha-256 #db_user_namespace = off #row_security = on # GSSAPI using Kerberos #krb_server_keyfile = '' #krb_caseins_users = off # - TCP Keepalives - # see "man 7 tcp" for details #tcp_keepalives_idle = 0 # TCP_KEEPIDLE, in seconds; # 0 selects the system default #tcp_keepalives_interval = 0 # TCP_KEEPINTVL, in seconds; # 0 selects the system default #tcp_keepalives_count = 0 # TCP_KEEPCNT; # 0 selects the system default #------------------------------------------------------------------------------ # RESOURCE USAGE (except WAL) #------------------------------------------------------------------------------ # - Memory - shared_buffers = 1024MB # min 128kB # (change requires restart) #huge_pages = try # on, off, or try # (change requires restart) #temp_buffers = 8MB # min 800kB #max_prepared_transactions = 0 # zero disables the feature # (change requires restart) # Caution: it is not advisable to set max_prepared_transactions nonzero unless # you actively intend to use prepared transactions. #work_mem = 4MB # min 64kB #maintenance_work_mem = 64MB # min 1MB #replacement_sort_tuples = 150000 # limits use of replacement selection sort #autovacuum_work_mem = -1 # min 1MB, or -1 to use maintenance_work_mem #max_stack_depth = 2MB # min 100kB dynamic_shared_memory_type = posix # the default is the first option # supported by the operating system: # posix # sysv # windows # mmap # use none to disable dynamic shared memory # (change requires restart) # - Disk - #temp_file_limit = -1 # limits per-process temp file space # in kB, or -1 for no limit # - Kernel Resource Usage - #max_files_per_process = 1000 # min 25 # (change requires restart) #shared_preload_libraries = '' # (change requires restart) # - Cost-Based Vacuum Delay - #vacuum_cost_delay = 0 # 0-100 milliseconds #vacuum_cost_page_hit = 1 # 0-10000 credits #vacuum_cost_page_miss = 10 # 0-10000 credits #vacuum_cost_page_dirty = 20 # 0-10000 credits #vacuum_cost_limit = 200 # 1-10000 credits # - Background Writer - bgwriter_delay = 20ms # 10-10000ms between rounds bgwriter_lru_maxpages = 800 # 0-1000 max buffers written/round bgwriter_lru_multiplier = 8.0 # 0-10.0 multiplier on buffers scanned/round #bgwriter_flush_after = 512kB # measured in pages, 0 disables # - Asynchronous Behavior - #effective_io_concurrency = 1 # 1-1000; 0 disables prefetching #max_worker_processes = 8 # (change requires restart) #max_parallel_workers_per_gather = 2 # taken from max_parallel_workers #max_parallel_workers = 8 # maximum number of max_worker_processes that # can be used in parallel queries #old_snapshot_threshold = -1 # 1min-60d; -1 disables; 0 is immediate # (change requires restart) #backend_flush_after = 0 # measured in pages, 0 disables #------------------------------------------------------------------------------ # WRITE AHEAD LOG #------------------------------------------------------------------------------ # - Settings - wal_level = minimal # minimal, replica, or logical # (change requires restart) fsync = off # flush data to disk for crash safety # (turning this off can cause # unrecoverable data corruption) synchronous_commit = off # synchronization level; # off, local, remote_write, remote_apply, or on #wal_sync_method = fsync # the default is the first option # supported by the operating system: # open_datasync # fdatasync (default on Linux) # fsync # fsync_writethrough # open_sync full_page_writes = off # recover from partial page writes #wal_compression = off # enable compression of full-page writes #wal_log_hints = off # also do full page writes of non-critical updates # (change requires restart) #wal_buffers = -1 # min 32kB, -1 sets based on shared_buffers # (change requires restart) #wal_writer_delay = 200ms # 1-10000 milliseconds #wal_writer_flush_after = 1MB # measured in pages, 0 disables #commit_delay = 0 # range 0-100000, in microseconds #commit_siblings = 5 # range 1-1000 # - Checkpoints - checkpoint_timeout = 120min # range 30s-1d max_wal_size = 8GB min_wal_size = 1GB #checkpoint_completion_target = 0.5 # checkpoint target duration, 0.0 - 1.0 #checkpoint_flush_after = 256kB # measured in pages, 0 disables #checkpoint_warning = 30s # 0 disables # - Archiving - #archive_mode = off # enables archiving; off, on, or always # (change requires restart) #archive_command = '' # command to use to archive a logfile segment # placeholders: %p = path of file to archive # %f = file name only # e.g. 'test ! -f /mnt/server/archivedir/%f && cp %p /mnt/server/archivedir/%f' #archive_timeout = 0 # force a logfile segment switch after this # number of seconds; 0 disables #------------------------------------------------------------------------------ # REPLICATION #------------------------------------------------------------------------------ # - Sending Server(s) - # Set these on the master and on any standby that will send replication data. max_wal_senders = 0 # max number of walsender processes # (change requires restart) #wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables #wal_sender_timeout = 60s # in milliseconds; 0 disables #max_replication_slots = 10 # max number of replication slots # (change requires restart) #track_commit_timestamp = off # collect timestamp of transaction commit # (change requires restart) # - Master Server - # These settings are ignored on a standby server. #synchronous_standby_names = '' # standby servers that provide sync rep # method to choose sync standbys, number of sync standbys, # and comma-separated list of application_name # from standby(s); '*' = all #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed # - Standby Servers - # These settings are ignored on a master server. #hot_standby = on # "off" disallows queries during recovery # (change requires restart) #max_standby_archive_delay = 30s # max delay before canceling queries # when reading WAL from archive; # -1 allows indefinite delay #max_standby_streaming_delay = 30s # max delay before canceling queries # when reading streaming WAL; # -1 allows indefinite delay #wal_receiver_status_interval = 10s # send replies at least this often # 0 disables #hot_standby_feedback = off # send info from standby to prevent # query conflicts #wal_receiver_timeout = 60s # time that receiver waits for # communication from master # in milliseconds; 0 disables #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt # - Subscribers - # These settings are ignored on a publisher. #max_logical_replication_workers = 4 # taken from max_worker_processes # (change requires restart) #max_sync_workers_per_subscription = 2 # taken from max_logical_replication_workers #------------------------------------------------------------------------------ # QUERY TUNING #------------------------------------------------------------------------------ # - Planner Method Configuration - #enable_bitmapscan = on #enable_hashagg = on #enable_hashjoin = on #enable_indexscan = on #enable_indexonlyscan = on #enable_material = on #enable_mergejoin = on #enable_nestloop = on #enable_seqscan = on #enable_sort = on #enable_tidscan = on # - Planner Cost Constants - #seq_page_cost = 1.0 # measured on an arbitrary scale #random_page_cost = 4.0 # same scale as above #cpu_tuple_cost = 0.01 # same scale as above #cpu_index_tuple_cost = 0.005 # same scale as above #cpu_operator_cost = 0.0025 # same scale as above #parallel_tuple_cost = 0.1 # same scale as above #parallel_setup_cost = 1000.0 # same scale as above #min_parallel_table_scan_size = 8MB #min_parallel_index_scan_size = 512kB #effective_cache_size = 4GB # - Genetic Query Optimizer - #geqo = on #geqo_threshold = 12 #geqo_effort = 5 # range 1-10 #geqo_pool_size = 0 # selects default based on effort #geqo_generations = 0 # selects default based on effort #geqo_selection_bias = 2.0 # range 1.5-2.0 #geqo_seed = 0.0 # range 0.0-1.0 # - Other Planner Options - #default_statistics_target = 100 # range 1-10000 #constraint_exclusion = partition # on, off, or partition #cursor_tuple_fraction = 0.1 # range 0.0-1.0 #from_collapse_limit = 8 #join_collapse_limit = 8 # 1 disables collapsing of explicit # JOIN clauses #force_parallel_mode = off #------------------------------------------------------------------------------ # ERROR REPORTING AND LOGGING #------------------------------------------------------------------------------ # - Where to Log - #log_destination = 'stderr' # Valid values are combinations of # stderr, csvlog, syslog, and eventlog, # depending on platform. csvlog # requires logging_collector to be on. # This is used when logging to stderr: #logging_collector = off # Enable capturing of stderr and csvlog # into log files. Required to be on for # csvlogs. # (change requires restart) # These are only used if logging_collector is on: #log_directory = 'log' # directory where log files are written, # can be absolute or relative to PGDATA #log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log' # log file name pattern, # can include strftime() escapes #log_file_mode = 0600 # creation mode for log files, # begin with 0 to use octal notation #log_truncate_on_rotation = off # If on, an existing log file with the # same name as the new log file will be # truncated rather than appended to. # But such truncation only occurs on # time-driven rotation, not on restarts # or size-driven rotation. Default is # off, meaning append to existing files # in all cases. #log_rotation_age = 1d # Automatic rotation of logfiles will # happen after that time. 0 disables. #log_rotation_size = 10MB # Automatic rotation of logfiles will # happen after that much log output. # 0 disables. # These are relevant when logging to syslog: #syslog_facility = 'LOCAL0' #syslog_ident = 'postgres' #syslog_sequence_numbers = on #syslog_split_messages = on # This is only relevant when logging to eventlog (win32): # (change requires restart) #event_source = 'PostgreSQL' # - When to Log - #client_min_messages = notice # values in order of decreasing detail: # debug5 # debug4 # debug3 # debug2 # debug1 # log # notice # warning # error #log_min_messages = warning # values in order of decreasing detail: # debug5 # debug4 # debug3 # debug2 # debug1 # info # notice # warning # error # log # fatal # panic #log_min_error_statement = error # values in order of decreasing detail: # debug5 # debug4 # debug3 # debug2 # debug1 # info # notice # warning # error # log # fatal # panic (effectively off) #log_min_duration_statement = -1 # -1 is disabled, 0 logs all statements # and their durations, > 0 logs only # statements running at least this number # of milliseconds # - What to Log - #debug_print_parse = off #debug_print_rewritten = off #debug_print_plan = off #debug_pretty_print = on #log_checkpoints = off #log_connections = off #log_disconnections = off #log_duration = off #log_error_verbosity = default # terse, default, or verbose messages #log_hostname = off #log_line_prefix = '%m [%p] ' # special values: # %a = application name # %u = user name # %d = database name # %r = remote host and port # %h = remote host # %p = process ID # %t = timestamp without milliseconds # %m = timestamp with milliseconds # %n = timestamp with milliseconds (as a Unix epoch) # %i = command tag # %e = SQL state # %c = session ID # %l = session line number # %s = session start timestamp # %v = virtual transaction ID # %x = transaction ID (0 if none) # %q = stop here in non-session # processes # %% = '%' # e.g. '<%u%%%d> ' #log_lock_waits = off # log lock waits >= deadlock_timeout #log_statement = 'none' # none, ddl, mod, all #log_replication_commands = off #log_temp_files = -1 # log temporary files equal or larger # than the specified size in kilobytes; # -1 disables, 0 logs all temp files log_timezone = 'W-SU' # - Process Title - #cluster_name = '' # added to process titles if nonempty # (change requires restart) #update_process_title = on #------------------------------------------------------------------------------ # RUNTIME STATISTICS #------------------------------------------------------------------------------ # - Query/Index Statistics Collector - #track_activities = on #track_counts = on #track_io_timing = off #track_functions = none # none, pl, all #track_activity_query_size = 1024 # (change requires restart) #stats_temp_directory = 'pg_stat_tmp' # - Statistics Monitoring - #log_parser_stats = off #log_planner_stats = off #log_executor_stats = off #log_statement_stats = off #------------------------------------------------------------------------------ # AUTOVACUUM PARAMETERS #------------------------------------------------------------------------------ #autovacuum = on # Enable autovacuum subprocess? 'on' # requires track_counts to also be on. #log_autovacuum_min_duration = -1 # -1 disables, 0 logs all actions and # their durations, > 0 logs only # actions running at least this number # of milliseconds. #autovacuum_max_workers = 3 # max number of autovacuum subprocesses # (change requires restart) #autovacuum_naptime = 1min # time between autovacuum runs #autovacuum_vacuum_threshold = 50 # min number of row updates before # vacuum #autovacuum_analyze_threshold = 50 # min number of row updates before # analyze #autovacuum_vacuum_scale_factor = 0.2 # fraction of table size before vacuum #autovacuum_analyze_scale_factor = 0.1 # fraction of table size before analyze #autovacuum_freeze_max_age = 200000000 # maximum XID age before forced vacuum # (change requires restart) #autovacuum_multixact_freeze_max_age = 400000000 # maximum multixact age # before forced vacuum # (change requires restart) #autovacuum_vacuum_cost_delay = 20ms # default vacuum cost delay for # autovacuum, in milliseconds; # -1 means use vacuum_cost_delay #autovacuum_vacuum_cost_limit = -1 # default vacuum cost limit for # autovacuum, -1 means use # vacuum_cost_limit #------------------------------------------------------------------------------ # CLIENT CONNECTION DEFAULTS #------------------------------------------------------------------------------ # - Statement Behavior - #search_path = '"$user", public' # schema names #default_tablespace = '' # a tablespace name, '' uses the default #temp_tablespaces = '' # a list of tablespace names, '' uses # only default tablespace #check_function_bodies = on #default_transaction_isolation = 'read committed' #default_transaction_read_only = off #default_transaction_deferrable = off #session_replication_role = 'origin' #statement_timeout = 0 # in milliseconds, 0 is disabled #lock_timeout = 0 # in milliseconds, 0 is disabled #idle_in_transaction_session_timeout = 0 # in milliseconds, 0 is disabled #vacuum_freeze_min_age = 50000000 #vacuum_freeze_table_age = 150000000 #vacuum_multixact_freeze_min_age = 5000000 #vacuum_multixact_freeze_table_age = 150000000 #bytea_output = 'hex' # hex, escape #xmlbinary = 'base64' #xmloption = 'content' #gin_fuzzy_search_limit = 0 #gin_pending_list_limit = 4MB # - Locale and Formatting - datestyle = 'iso, dmy' #intervalstyle = 'postgres' timezone = 'W-SU' #timezone_abbreviations = 'Default' # Select the set of available time zone # abbreviations. Currently, there are # Default # Australia (historical usage) # India # You can create your own file in # share/timezonesets/. #extra_float_digits = 0 # min -15, max 3 #client_encoding = sql_ascii # actually, defaults to database # encoding # These settings are initialized by initdb, but they can be changed. lc_messages = 'en_US.UTF-8' # locale for system error message # strings lc_monetary = 'en_US.UTF-8' # locale for monetary formatting lc_numeric = 'en_US.UTF-8' # locale for number formatting lc_time = 'en_US.UTF-8' # locale for time formatting # default configuration for text search default_text_search_config = 'pg_catalog.english' # - Other Defaults - #dynamic_library_path = '$libdir' #local_preload_libraries = '' #session_preload_libraries = '' #------------------------------------------------------------------------------ # LOCK MANAGEMENT #------------------------------------------------------------------------------ #deadlock_timeout = 1s #max_locks_per_transaction = 64 # min 10 # (change requires restart) #max_pred_locks_per_transaction = 64 # min 10 # (change requires restart) #max_pred_locks_per_relation = -2 # negative values mean # (max_pred_locks_per_transaction # / -max_pred_locks_per_relation) - 1 #max_pred_locks_per_page = 2 # min 0 #------------------------------------------------------------------------------ # VERSION/PLATFORM COMPATIBILITY #------------------------------------------------------------------------------ # - Previous PostgreSQL Versions - #array_nulls = on #backslash_quote = safe_encoding # on, off, or safe_encoding #default_with_oids = off #escape_string_warning = on #lo_compat_privileges = off #operator_precedence_warning = off #quote_all_identifiers = off #standard_conforming_strings = on #synchronize_seqscans = on # - Other Platforms and Clients - #transform_null_equals = off #------------------------------------------------------------------------------ # ERROR HANDLING #------------------------------------------------------------------------------ #exit_on_error = off # terminate session on any error? #restart_after_crash = on # reinitialize after backend crash? #------------------------------------------------------------------------------ # CONFIG FILE INCLUDES #------------------------------------------------------------------------------ # These options allow settings to be loaded from files other than the # default postgresql.conf. #include_dir = 'conf.d' # include files ending in '.conf' from # directory 'conf.d' #include_if_exists = 'exists.conf' # include file only if it exists #include = 'special.conf' # include file #------------------------------------------------------------------------------ # CUSTOMIZED OPTIONS #------------------------------------------------------------------------------ # Add settings for extensions here
test00.sql
Description: application/sql
test01.sql
Description: application/sql
test_correctness.sh
Description: application/shellscript
total_test.sh
Description: application/shellscript