From: Andres Freund <and...@anarazel.de>

Features:
- streaming reading/writing
- filtering
- reassembly of records

Reusing the ReadRecord infrastructure in situations where the code that wants
to do so is not tightly integrated into xlog.c is rather hard and would require
changes to rather integral parts of the recovery code which doesn't seem to be
a good idea.

Missing:
- "compressing" the stream when removing uninteresting records
- writing out correct CRCs
- validating CRCs
- separating reader/writer
---
 src/backend/access/transam/Makefile     |    2 +-
 src/backend/access/transam/xlogreader.c |  914 +++++++++++++++++++++++++++++++
 src/include/access/xlogreader.h         |  173 ++++++
 3 files changed, 1088 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/access/transam/xlogreader.c
 create mode 100644 src/include/access/xlogreader.h

diff --git a/src/backend/access/transam/Makefile 
b/src/backend/access/transam/Makefile
index f82f10e..660b5fc 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -13,7 +13,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \
-       twophase.o twophase_rmgr.o xlog.o xlogfuncs.o xlogutils.o
+       twophase.o twophase_rmgr.o xlog.o xlogfuncs.o xlogreader.o xlogutils.o
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/access/transam/xlogreader.c 
b/src/backend/access/transam/xlogreader.c
new file mode 100644
index 0000000..6f15d66
--- /dev/null
+++ b/src/backend/access/transam/xlogreader.c
@@ -0,0 +1,914 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogreader.c
+ *
+ * Aa somewhat generic xlog read interface
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *       src/backend/access/transam/readxlog.c
+ *
+ *-------------------------------------------------------------------------
+ *
+ * FIXME:
+ * * CRC computation
+ * * separation of reader/writer
+ */
+
+#include "postgres.h"
+
+#include "access/xlog_internal.h"
+#include "access/transam.h"
+#include "catalog/pg_control.h"
+#include "access/xlogreader.h"
+
+/* FIXME */
+#include "replication/walsender_private.h"
+#include "replication/walprotocol.h"
+
+//#define VERBOSE_DEBUG
+
+XLogReaderState* XLogReaderAllocate(void)
+{
+       XLogReaderState* state = 
(XLogReaderState*)malloc(sizeof(XLogReaderState));
+       int i;
+
+       if (!state)
+               goto oom;
+
+       memset(&state->buf.record, 0, sizeof(XLogRecord));
+       state->buf.record_data_size = XLOG_BLCKSZ*8;
+       state->buf.record_data =
+                       malloc(state->buf.record_data_size);
+
+       if (!state->buf.record_data)
+               goto oom;
+
+       if (!state)
+               goto oom;
+
+       for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+       {
+               state->buf.bkp_block_data[i] =
+                       malloc(BLCKSZ);
+
+               if (!state->buf.bkp_block_data[i])
+                       goto oom;
+       }
+       XLogReaderReset(state);
+       return state;
+
+oom:
+       elog(ERROR, "could not allocate memory for XLogReaderState");
+       return 0;
+}
+
+void XLogReaderReset(XLogReaderState* state)
+{
+       state->in_record = false;
+       state->in_bkp_blocks = 0;
+       state->in_bkp_block_header = false;
+       state->in_skip = false;
+       state->remaining_size = 0;
+       state->nbytes = 0;
+       state->incomplete = false;
+       state->initialized = false;
+       state->needs_input = false;
+       state->needs_output = false;
+}
+
+static inline bool
+XLogReaderHasInput(XLogReaderState* state, Size size)
+{
+       XLogRecPtr tmp = state->curptr;
+       XLByteAdvance(tmp, size);
+       if (XLByteLE(state->endptr, tmp))
+               return false;
+       return true;
+}
+
+static inline bool
+XLogReaderHasOutput(XLogReaderState* state, Size size){
+       if (state->nbytes + size > MAX_SEND_SIZE)
+               return false;
+       return true;
+}
+
+static inline bool
+XLogReaderHasSpace(XLogReaderState* state, Size size)
+{
+       XLogRecPtr tmp = state->curptr;
+       XLByteAdvance(tmp, size);
+       if (XLByteLE(state->endptr, tmp))
+               return false;
+       else if (state->nbytes + size > MAX_SEND_SIZE)
+               return false;
+       return true;
+}
+
+void
+XLogReaderRead(XLogReaderState* state)
+{
+       XLogRecord* temp_record;
+
+       state->needs_input = false;
+       state->needs_output = false;
+
+       /*
+        * Do some basic sanity checking and setup if were starting anew.
+        */
+       if (!state->initialized)
+       {
+               state->initialized = true;
+               /*
+                * we need to start reading at the beginning of the page to 
understand
+                * what we are currently reading. We will skip over that 
because we
+                * check curptr < startptr later.
+                */
+               state->curptr.xrecoff = state->curptr.xrecoff - 
state->curptr.xrecoff % XLOG_BLCKSZ;
+               Assert(state->curptr.xrecoff % XLOG_BLCKSZ == 0);
+               elog(LOG, "start reading from %X/%X, scrolled back to %X/%X",
+                    state->startptr.xlogid, state->startptr.xrecoff,
+                    state->curptr.xlogid, state->curptr.xrecoff);
+
+       }
+       else
+       {
+               /*
+                * We didn't finish reading the last time round. Since then new 
data
+                * could have been appended to the current page. So we need to 
update
+                * our copy of that.
+                *
+                * XXX: We could tie that to state->needs_input but that 
doesn't seem
+                * worth the complication atm.
+                */
+               XLogRecPtr rereadptr = state->curptr;
+               rereadptr.xrecoff -= rereadptr.xrecoff % XLOG_BLCKSZ;
+
+               XLByteAdvance(rereadptr, SizeOfXLogShortPHD);
+
+               if(!XLByteLE(rereadptr, state->endptr))
+                       goto not_enough_input;
+
+               rereadptr.xrecoff -= rereadptr.xrecoff % XLOG_BLCKSZ;
+
+               state->read_page(state, state->cur_page, rereadptr);
+
+               state->page_header = (XLogPageHeader)state->cur_page;
+               state->page_header_size = 
XLogPageHeaderSize(state->page_header);
+
+       }
+
+#ifdef VERBOSE_DEBUG
+       elog(LOG, "starting reading for %X from %X",
+            state->startptr.xrecoff, state->curptr.xrecoff);
+#endif
+       while (XLByteLT(state->curptr, state->endptr))
+       {
+               uint32 len_in_block;
+               /* did we read a partial xlog record due to input/output 
constraints */
+               bool partial_read = false;
+               bool partial_write = false;
+
+#ifdef VERBOSE_DEBUG
+               elog(LOG, "one loop start: record: %u skip: %u bkb_block: %d 
in_bkp_header: %u xrecoff: %X/%X remaining: %u, off: %u",
+                    state->in_record, state->in_skip,
+                    state->in_bkp_blocks, state->in_bkp_block_header,
+                    state->curptr.xlogid, state->curptr.xrecoff,
+                    state->remaining_size,
+                    state->curptr.xrecoff % XLOG_BLCKSZ);
+#endif
+
+               if (state->curptr.xrecoff % XLOG_BLCKSZ == 0)
+               {
+#ifdef VERBOSE_DEBUG
+                       elog(LOG, "reading page header, at %X/%X",
+                            state->curptr.xlogid, state->curptr.xrecoff);
+#endif
+                       /* check whether we can read enough to see the short 
header */
+                       if (!XLogReaderHasInput(state, SizeOfXLogShortPHD))
+                               goto not_enough_input;
+
+                       state->read_page(state, state->cur_page, state->curptr);
+                       state->page_header = (XLogPageHeader)state->cur_page;
+                       state->page_header_size = 
XLogPageHeaderSize(state->page_header);
+
+                       /* check wether we have enough to read/write the full 
header */
+                       if (!XLogReaderHasInput(state, state->page_header_size))
+                               goto not_enough_input;
+
+                       /* writeout page header only if were somewhere 
interesting */
+                       if (!XLByteLT(state->curptr, state->startptr))
+                       {
+                               if (!XLogReaderHasOutput(state, 
state->page_header_size))
+                                       goto not_enough_output;
+
+                               state->writeout_data(state, state->cur_page, 
state->page_header_size);
+                       }
+
+                       XLByteAdvance(state->curptr, state->page_header_size);
+
+                       if (XLByteLT(state->curptr, state->startptr))
+                       {
+                               /* don't intepret anything if were before 
startpoint */
+                       }
+                       else if (state->page_header->xlp_info & 
XLP_FIRST_IS_CONTRECORD)
+                       {
+                               XLogContRecord* temp_contrecord;
+
+                               if(!XLogReaderHasInput(state, 
SizeOfXLogContRecord))
+                                       goto not_enough_input;
+
+                               if(!XLogReaderHasOutput(state, 
SizeOfXLogContRecord))
+                                       goto not_enough_output;
+
+                               temp_contrecord =
+                                       (XLogContRecord*)(state->cur_page
+                                                         + 
state->curptr.xrecoff % XLOG_BLCKSZ);
+
+
+                               state->writeout_data(state, 
(char*)temp_contrecord, SizeOfXLogContRecord);
+
+                               XLByteAdvance(state->curptr, 
SizeOfXLogContRecord);
+
+                               if (!state->in_record)
+                               {
+                                       /* we need to support this case for 
initializing a cluster... */
+                                       elog(WARNING, "contrecord although were 
not in a record at %X/%X, starting at %X/%X",
+                                            state->curptr.xlogid, 
state->curptr.xrecoff,
+                                            state->startptr.xlogid, 
state->startptr.xrecoff);
+                                       state->in_record = true;
+                                       state->in_skip = true;
+                                       state->remaining_size = 
temp_contrecord->xl_rem_len;
+                                       continue;
+                               }
+
+
+                               if(temp_contrecord->xl_rem_len < 
state->remaining_size)
+                                       elog(PANIC, "remaining length is 
smaller than to be read data: %u %u",
+                                            temp_contrecord->xl_rem_len, 
state->remaining_size
+                                               );
+
+                       }
+                       else
+                       {
+                               if (state->in_record)
+                               {
+                                       elog(PANIC, "no contrecord although 
were in a record");
+                               }
+                       }
+               }
+
+               if (!state->in_record)
+               {
+                       /*
+                        * a record must be stored aligned. So skip as far we 
need to
+                        * comply with that.
+                        */
+                       Size skiplen;
+                       skiplen = MAXALIGN(state->curptr.xrecoff)
+                               - state->curptr.xrecoff;
+
+                       if (skiplen)
+                       {
+                               if (!XLogReaderHasSpace(state, skiplen))
+                               {
+#ifdef VERBOSE_DEBUG
+                                       elog(LOG, "not aligning bc of space");
+#endif
+                                       /*
+                                        * We don't have enough space to 
read/write the alignment
+                                        * bytes, so fake up a skip-state
+                                        */
+                                       state->in_record = true;
+                                       state->in_skip = true;
+                                       state->remaining_size = skiplen;
+
+                                       if (!XLogReaderHasInput(state, skiplen))
+                                               goto not_enough_input;
+                                       goto not_enough_output;
+                               }
+#ifdef VERBOSE_DEBUG
+                               elog(LOG, "aligning from %X/%X to %X/%X",
+                                    state->curptr.xlogid, 
state->curptr.xrecoff,
+                                    state->curptr.xlogid, 
state->curptr.xrecoff + (uint32)skiplen);
+#endif
+                               if (!XLByteLT(state->curptr, state->startptr))
+                                       state->writeout_data(state, NULL, 
skiplen);
+                               XLByteAdvance(state->curptr, skiplen);
+                       }
+               }
+
+               /* skip until we reach the part of the page were interested in 
*/
+               if (XLByteLT(state->curptr, state->startptr))
+               {
+
+                       if (state->in_skip)
+                       {
+                               /* the code already handles that, we expect a 
contrecord */
+                       }
+                       else if ((state->curptr.xrecoff % XLOG_BLCKSZ) == 
state->page_header_size &&
+                                state->page_header->xlp_info & 
XLP_FIRST_IS_CONTRECORD)
+                       {
+
+                               XLogContRecord* temp_contrecord = 
(XLogContRecord*)
+                                       (state->cur_page + 
state->curptr.xrecoff % XLOG_BLCKSZ);
+
+                               /*
+                                * we know we have enough space here because we 
didn't start
+                                * writing out data yet because were < startptr
+                                */
+                               Assert(XLogReaderHasSpace(state, 
SizeOfXLogContRecord));
+
+                               XLByteAdvance(state->curptr, 
SizeOfXLogContRecord);
+
+#ifdef VERBOSE_DEBUG
+                               elog(LOG, "skipping contrecord before start");
+#endif
+                               state->in_skip = true;
+                               state->in_record = true;
+                               state->in_bkp_blocks = 0;
+                               state->remaining_size = 
temp_contrecord->xl_rem_len;
+                       }
+                       else
+                       {
+                               Assert(!state->in_record);
+
+                               /* read how much space we have left on the 
current page */
+                               if(state->curptr.xrecoff % XLOG_BLCKSZ == 0)
+                                       len_in_block = 0;
+                               else
+                                       len_in_block = XLOG_BLCKSZ - 
state->curptr.xrecoff % XLOG_BLCKSZ;
+
+                               if(len_in_block < SizeOfXLogRecord)
+                               {
+                                       XLByteAdvance(state->curptr, 
len_in_block);
+                                       continue;
+                               }
+
+                               /*
+                                * now read the record information and start 
skipping till the
+                                * record is over
+                                */
+                               temp_record = (XLogRecord*)(state->cur_page + 
(state->curptr.xrecoff % XLOG_BLCKSZ));
+
+#ifdef VERBOSE_DEBUG
+                               elog(LOG, "skipping record before start %lu, 
tot %u at %X/%X off %d ",
+                                    temp_record->xl_tot_len - SizeOfXLogRecord,
+                                    temp_record->xl_tot_len,
+                                    state->curptr.xlogid, 
state->curptr.xrecoff,
+                                    state->curptr.xrecoff % XLOG_BLCKSZ);
+#endif
+
+                               Assert(XLogReaderHasSpace(state, 
SizeOfXLogRecord));
+
+                               XLByteAdvance(state->curptr, SizeOfXLogRecord);
+
+                               state->in_skip = true;
+                               state->in_record = true;
+                               state->in_bkp_blocks = 0;
+                               state->remaining_size = temp_record->xl_tot_len
+                                       - SizeOfXLogRecord;
+                       }
+               }
+
+               /*
+                * ----------------------------------------
+                * start to read a record
+                *
+                * This will only happen if were already behind state->startptr
+                * ----------------------------------------
+                */
+               if (!state->in_record)
+               {
+                       /*
+                        * if were at the beginning of a page (after the page 
header) it
+                        * could be that were starting in a continuation of an 
earlier
+                        * record. Its debatable wether thats a valid use-case. 
Support it
+                        * for now but cry loudly.
+                        */
+                       if ((state->curptr.xrecoff % XLOG_BLCKSZ) == 
state->page_header_size &&
+                          state->page_header->xlp_info & 
XLP_FIRST_IS_CONTRECORD)
+                       {
+                               XLogContRecord* temp_contrecord = 
(XLogContRecord*)
+                                       (state->cur_page + 
state->curptr.xrecoff % XLOG_BLCKSZ);
+
+                               if (!XLogReaderHasInput(state, 
SizeOfXLogContRecord))
+                                       goto not_enough_input;
+
+                               if (!XLogReaderHasOutput(state, 
SizeOfXLogContRecord))
+                                       goto not_enough_output;
+
+                               state->writeout_data(state,
+                                                    (char*)temp_contrecord,
+                                                    SizeOfXLogContRecord);
+                               XLByteAdvance(state->curptr, 
SizeOfXLogContRecord);
+
+                               elog(PANIC, "hum, ho, first is contrecord, but 
trying to read the record afterwards %X/%X",
+                                    state->curptr.xlogid, 
state->curptr.xrecoff);
+
+                               state->in_skip = true;
+                               state->in_record = true;
+                               state->in_bkp_blocks = 0;
+                               state->remaining_size = 
temp_contrecord->xl_rem_len;
+                               continue;
+                       }
+
+                       /* read how much space we have left on the current page 
*/
+                       if (state->curptr.xrecoff % XLOG_BLCKSZ == 0)
+                               len_in_block = 0;
+                       else
+                               len_in_block = XLOG_BLCKSZ - 
state->curptr.xrecoff % XLOG_BLCKSZ;
+
+                       /* if there is not enough space for the xlog header, 
skip to next page */
+                       if (len_in_block < SizeOfXLogRecord)
+                       {
+
+                               if (!XLogReaderHasOutput(state, len_in_block))
+                                       goto not_enough_input;
+
+                               if (!XLogReaderHasOutput(state, len_in_block))
+                                       goto not_enough_output;
+
+                               state->writeout_data(state,
+                                                    NULL,
+                                                    len_in_block);
+
+                               XLByteAdvance(state->curptr, len_in_block);
+                               continue;
+                       }
+
+                       temp_record = (XLogRecord*)(state->cur_page + 
(state->curptr.xrecoff % XLOG_BLCKSZ));
+
+                       /*
+                        * we quickly loose the original address of a record as 
we can skip
+                        * records and such, so keep the original addresses.
+                        */
+                       state->buf.origptr = state->curptr;
+
+                       /* we writeout data as soon as we know whether were 
writing out something sensible */
+                       XLByteAdvance(state->curptr, SizeOfXLogRecord);
+
+                       /* ----------------------------------------
+                        * normally we don't look at the content of xlog 
records here,
+                        * XLOG_SWITCH is a special case though, as everything 
left in that
+                        * segment won't be sensbible content.
+                        * So skip to the next segment. For that we currently 
simply leave
+                        * the loop as we don't have any mechanism to 
communicate that
+                        * behaviour otherwise.
+                        * ----------------------------------------
+                        */
+                       if (temp_record->xl_rmid == RM_XLOG_ID
+                           && (temp_record->xl_info & ~XLR_INFO_MASK) == 
XLOG_SWITCH)
+                       {
+
+                               /*
+                                * writeout data so that this gap makes sense 
in the written
+                                * out data
+                                */
+                               state->writeout_data(state,
+                                                    (char*)temp_record,
+                                                    SizeOfXLogRecord);
+
+                               /*
+                                * Pretend the current data extends to end of 
segment
+                                *
+                                * FIXME: This logic is copied from xlog.c but 
seems to
+                                *    disregard xrecoff wrapping around to the 
next xlogid?
+                                */
+                               state->curptr.xrecoff += XLogSegSize - 1;
+                               state->curptr.xrecoff -= state->curptr.xrecoff 
% XLogSegSize;
+
+                               state->in_record = false;
+                               state->in_bkp_blocks = 0;
+                               state->in_skip = false;
+                               goto out;
+                       }
+                       /* ----------------------------------------
+                        * Ok, we found interesting data. That means we need to 
do the full
+                        * deal, reading the record, reading the BKP blocks 
afterward and
+                        * then hand of the record to be processed.
+                        * ----------------------------------------
+                        */
+                       else if (state->is_record_interesting(state, 
temp_record))
+                       {
+                               /*
+                                * the rest of the record might be on another 
page so we need a
+                                * copy instead just pointing into the current 
page.
+                                */
+                               memcpy(&state->buf.record,
+                                      temp_record,
+                                      sizeof(XLogRecord));/* really needs 
sizeof(XLogRecord) */
+
+                               state->writeout_data(state,
+                                                    (char*)temp_record,
+                                                    SizeOfXLogRecord);
+                               /*
+                                * read till the record itself finished, after 
that we will
+                                * continue with the bkp blocks et al
+                                */
+                               state->remaining_size = temp_record->xl_len;
+
+                               state->in_record = true;
+                               state->in_bkp_blocks = 0;
+                               state->in_skip = false;
+
+#ifdef VERBOSE_DEBUG
+                               elog(LOG, "found record at %X/%X, tx %u, rmid 
%hhu, len %u tot %u",
+                                    state->buf.origptr.xlogid, 
state->buf.origptr.xrecoff,
+                                    temp_record->xl_xid, temp_record->xl_rmid, 
temp_record->xl_len,
+                                    temp_record->xl_tot_len);
+#endif
+                       }
+                       /* ----------------------------------------
+                        * ok, everybody aggrees, the content of the current 
record are
+                        * just plain boring. So fake-up a record that replaces 
it by a
+                        * NOOP record.
+                        *
+                        * FIXME: we should allow "compressing" the output 
here. That is
+                        * write something that shows how long the record 
should be if
+                        * everything is decompressed again. This can radically 
reduce
+                        * space-usage over the wire.
+                        * It could also be very useful for traditional SR by 
removing
+                        * unneded BKP blocks from being transferred.
+                        * For that we would need to recompute CRCs though, 
which we
+                        * currently don't support.
+                        * ----------------------------------------
+                        */
+                       else
+                       {
+                               /*
+                                * we need to fix up a fake record with correct 
length that can
+                                * be written out.
+                                */
+                               /* needs space for padding to SizeOfXLogRecord 
*/
+                               XLogRecord spacer;
+
+                               /*
+                                * xl_tot_len contains the size of the 
XLogRecord itself, we
+                                * read that already though.
+                                */
+                               state->remaining_size = temp_record->xl_tot_len
+                                       - SizeOfXLogRecord;
+
+                               state->in_record = true;
+                               state->in_bkp_blocks = 0;
+                               state->in_skip = true;
+
+                               /* FIXME: fixup the xl_prev of the next record 
*/
+                               spacer.xl_prev = state->buf.origptr;
+                               spacer.xl_xid = InvalidTransactionId;
+                               spacer.xl_tot_len = temp_record->xl_tot_len;
+                               spacer.xl_len = temp_record->xl_tot_len - 
SizeOfXLogRecord;
+                               spacer.xl_rmid = RM_XLOG_ID;
+                               spacer.xl_info = XLOG_NOOP;
+
+                               state->writeout_data(state,
+                                                    (char*)&spacer,
+                                                    SizeOfXLogRecord);
+                       }
+               }
+               /*
+                * We read an interesting page and now want the BKP
+                * blocks. Unfortunately a bkp header is stored unaligned and 
can be
+                * split across pages. So we copy it to a bit more permanent 
location.
+                */
+               else if (state->in_bkp_blocks > 0
+                       && state->remaining_size == 0)
+               {
+                       Assert(!state->in_bkp_block_header);
+                       Assert(state->buf.record.xl_info &
+                              XLR_SET_BKP_BLOCK(XLR_MAX_BKP_BLOCKS - 
state->in_bkp_blocks));
+
+                       state->in_bkp_block_header = true;
+                       state->remaining_size = sizeof(BkpBlock);
+                       /* in_bkp_blocks will be changed uppon completion */
+                       state->in_skip = false;
+               }
+
+               Assert(state->in_record);
+
+               /* compute how much space on the current page is left */
+               if (state->curptr.xrecoff % XLOG_BLCKSZ == 0)
+                       len_in_block = 0;
+               else
+                       len_in_block = XLOG_BLCKSZ - state->curptr.xrecoff % 
XLOG_BLCKSZ;
+
+               /* we have more data available than we need, so read only as 
much as needed */
+               if(len_in_block > state->remaining_size)
+                       len_in_block = state->remaining_size;
+
+               /*
+                * Handle constraints set by endptr and the size of the output 
buffer.
+                *
+                * Normally we use XLogReaderHasSpace for that, but thats not
+                * convenient because we want to read data in parts. So, 
open-code the
+                * logic for that here.
+                */
+               if (state->curptr.xlogid == state->endptr.xlogid &&
+                  state->curptr.xrecoff + len_in_block > state->endptr.xrecoff)
+               {
+                       Size cur_len = len_in_block;
+                       len_in_block = state->endptr.xrecoff - 
state->curptr.xrecoff;
+                       partial_read = true;
+                       elog(LOG, "truncating len_in_block due to endptr %X/%X 
%lu to %i at %X/%X",
+                            state->startptr.xlogid, state->startptr.xrecoff,
+                            cur_len, len_in_block,
+                            state->curptr.xlogid, state->curptr.xrecoff);
+               }
+               else if (len_in_block > (MAX_SEND_SIZE - state->nbytes))
+               {
+                       Size cur_len = len_in_block;
+                       len_in_block = MAX_SEND_SIZE - state->nbytes;
+                       partial_write = true;
+                       elog(LOG, "truncating len_in_block due to nbytes %lu to 
%i",
+                            cur_len, len_in_block);
+               }
+
+               /* ----------------------------------------
+                * copy data to whatever were currently reading.
+                * ----------------------------------------
+                */
+
+               /* nothing to do if were skipping */
+               if (state->in_skip)
+               {
+                       /* writeout zero data */
+                       if (!XLByteLT(state->curptr, state->startptr))
+                               state->writeout_data(state, NULL, len_in_block);
+               }
+               /* copy data into the current bkp block */
+               else if (state->in_bkp_block_header)
+               {
+                       int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+                       BkpBlock* bkpb = &state->buf.bkp_block[blockno];
+                       Assert(state->in_bkp_blocks);
+
+                       memcpy((char*)bkpb + sizeof(BkpBlock) - 
state->remaining_size,
+                              state->cur_page + state->curptr.xrecoff % 
XLOG_BLCKSZ,
+                              len_in_block);
+
+                       state->writeout_data(state,
+                                            state->cur_page + 
state->curptr.xrecoff % XLOG_BLCKSZ,
+                                            len_in_block);
+
+#ifdef VERBOSE_DEBUG
+                       elog(LOG, "copying bkp header for %d of %u complete %lu 
at %X/%X rem %u",
+                            blockno, len_in_block, sizeof(BkpBlock),
+                            state->curptr.xlogid, state->curptr.xrecoff,
+                            state->remaining_size);
+                       if (state->remaining_size == len_in_block)
+                       {
+                               elog(LOG, "block off %u len %u", 
bkpb->hole_offset, bkpb->hole_length);
+                       }
+#endif
+               }
+               else if (state->in_bkp_blocks)
+               {
+                       int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+                       BkpBlock* bkpb = &state->buf.bkp_block[blockno];
+                       char* data = state->buf.bkp_block_data[blockno];
+
+                       memcpy(data + BLCKSZ - bkpb->hole_length - 
state->remaining_size,
+                              state->cur_page + state->curptr.xrecoff % 
XLOG_BLCKSZ,
+                              len_in_block);
+
+                       state->writeout_data(state,
+                                            state->cur_page + 
state->curptr.xrecoff % XLOG_BLCKSZ,
+                                            len_in_block);
+#ifdef VERBOSE_DEBUG
+                       elog(LOG, "copying data for %d of %u complete %u",
+                            blockno, len_in_block, state->remaining_size);
+#endif
+               }
+               /* read the (rest) of the XLogRecord's data */
+               else if (state->in_record)
+               {
+                       if(state->buf.record_data_size < 
state->buf.record.xl_len){
+                               state->buf.record_data_size = 
state->buf.record.xl_len;
+                               state->buf.record_data =
+                                       realloc(state->buf.record_data, 
state->buf.record_data_size);
+                       }
+
+                       memcpy(state->buf.record_data
+                              + state->buf.record.xl_len
+                              - state->remaining_size,
+                              state->cur_page + state->curptr.xrecoff % 
XLOG_BLCKSZ,
+                              len_in_block);
+
+                       state->writeout_data(state,
+                                            state->cur_page + 
state->curptr.xrecoff % XLOG_BLCKSZ,
+                                            len_in_block);
+               }
+
+               /* should handle wrapping around to next page */
+               XLByteAdvance(state->curptr, len_in_block);
+
+               state->remaining_size -= len_in_block;
+
+               /*
+                * ----------------------------------------
+                * we completed whatever we were reading. So, handle going to 
the next
+                * state.
+                * ----------------------------------------
+                */
+
+               if (state->remaining_size == 0)
+               {
+                       /*
+                        * in the in_skip case we already read backup blocks, 
so everything
+                        * is finished.
+                        */
+                       if (state->in_skip)
+                       {
+                               state->in_record = false;
+                               state->in_bkp_blocks = 0;
+                               state->in_skip = false;
+                               /* alignment is handled when starting to read a 
record */
+                       }
+                       /*
+                        * We read the header of the current block. Start 
reading the
+                        * content of that now.
+                        */
+                       else if (state->in_bkp_block_header)
+                       {
+                               BkpBlock* bkpb;
+                               int blockno = XLR_MAX_BKP_BLOCKS - 
state->in_bkp_blocks;
+
+                               Assert(state->in_bkp_blocks);
+
+                               bkpb = &state->buf.bkp_block[blockno];
+                               state->remaining_size = BLCKSZ - 
bkpb->hole_length;
+                               state->in_bkp_block_header = false;
+#ifdef VERBOSE_DEBUG
+                               elog(LOG, "completed reading of header for %d, 
reading data now %u hole %u, off %u",
+                                    blockno, state->remaining_size, 
bkpb->hole_length,
+                                    bkpb->hole_offset);
+#endif
+                       }
+                       /*
+                        * The current backup block is finished, more maybe 
comming
+                        */
+                       else if (state->in_bkp_blocks)
+                       {
+                               int blockno = XLR_MAX_BKP_BLOCKS - 
state->in_bkp_blocks;
+                               BkpBlock* bkpb;
+                               char* bkpb_data;
+
+                               Assert(!state->in_bkp_block_header);
+
+                               bkpb = &state->buf.bkp_block[blockno];
+                               bkpb_data = state->buf.bkp_block_data[blockno];
+
+                               /*
+                                * reassemble block to its entirety by removing 
the bkp_hole
+                                * "compression"
+                                */
+                               if(bkpb->hole_length){
+                                       memmove(bkpb_data + bkpb->hole_offset,
+                                               bkpb_data + bkpb->hole_offset + 
bkpb->hole_length,
+                                               BLCKSZ - (bkpb->hole_offset + 
bkpb->hole_length));
+                                       memset(bkpb_data + bkpb->hole_offset,
+                                              0,
+                                              bkpb->hole_length);
+                               }
+#if 0
+                               elog(LOG, "finished with bkp block %d", 
blockno);
+#endif
+                               state->in_bkp_blocks--;
+
+                               state->in_skip = false;
+
+                               /*
+                                * only continue with in_record=true if we have 
bkp block
+                                */
+                               while (state->in_bkp_blocks)
+                               {
+                                       if (state->buf.record.xl_info &
+                                          XLR_SET_BKP_BLOCK(XLR_MAX_BKP_BLOCKS 
- state->in_bkp_blocks))
+                                       {
+                                               elog(LOG, "reading record %u", 
XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks);
+                                               break;
+                                       }
+                                       state->in_bkp_blocks--;
+                               }
+
+                               if (!state->in_bkp_blocks)
+                               {
+                                       goto all_bkp_finished;
+                               }
+                               /* bkp blocks are stored without regard for 
alignment */
+                       }
+                       /*
+                        * read a non-skipped record, start reading bkp blocks 
afterwards
+                        */
+                       else if (state->in_record)
+                       {
+                               state->in_record = true;
+                               state->in_skip = false;
+                               state->in_bkp_blocks = XLR_MAX_BKP_BLOCKS;
+
+                               /*
+                                * only continue with in_record=true if we have 
bkp block
+                                */
+                               while (state->in_bkp_blocks)
+                               {
+                                       if (state->buf.record.xl_info &
+                                           
XLR_SET_BKP_BLOCK(XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks))
+                                       {
+#ifdef VERBOSE_DEBUG
+                                               elog(LOG, "reading bkp block 
%u", XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks);
+#endif
+                                               break;
+                                       }
+                                       state->in_bkp_blocks--;
+                               }
+
+                               if (!state->in_bkp_blocks)
+                               {
+                                       goto all_bkp_finished;
+                               }
+                               /* bkp blocks are stored without regard for 
alignment */
+                       }
+
+#ifdef VERBOSE_DEBUG
+                       elog(LOG, "finish with record at %X/%X",
+                            state->curptr.xlogid, state->curptr.xrecoff);
+#endif
+               }
+               /*
+                * Something could only be partially read inside a single block 
because
+                * of input or output space constraints.. This case needs to be
+                * separate because otherwise we would treat it as a 
continuation which
+                * would obviously be wrong (we don't have a continuation 
record).
+                */
+               else if (partial_read)
+               {
+                       partial_read = false;
+                       goto not_enough_input;
+               }
+               else if (partial_write)
+               {
+                       partial_write = false;
+                       goto not_enough_output;
+               }
+               /*
+                * Data continues into the next block. Read the contiuation 
record
+                * there and then continue.
+                */
+               else
+               {
+               }
+#ifdef VERBOSE_DEBUG
+               elog(LOG, "one loop: record: %u skip: %u bkb_block: %d 
in_bkp_header: %u xrecoff: %X/%X remaining: %u, off: %u",
+                    state->in_record, state->in_skip,
+                    state->in_bkp_blocks, state->in_bkp_block_header,
+                    state->curptr.xlogid, state->curptr.xrecoff,
+                    state->remaining_size,
+                    state->curptr.xrecoff % XLOG_BLCKSZ);
+#endif
+               continue;
+
+       all_bkp_finished:
+               {
+                       Assert(!state->in_skip);
+                       Assert(!state->in_bkp_block_header);
+                       Assert(!state->in_bkp_blocks);
+
+                       state->finished_record(state, &state->buf);
+
+                       state->in_record = false;
+
+                       /* alignment is handled when starting to read a record 
*/
+#ifdef VERBOSE_DEBUG
+                       elog(LOG, "currently at %X/%X to %X/%X, wrote nbytes: 
%lu",
+                            state->curptr.xlogid, state->curptr.xrecoff,
+                            state->endptr.xlogid, state->endptr.xrecoff, 
state->nbytes);
+#endif
+               }
+       }
+
+out:
+       if (state->in_skip)
+       {
+               state->incomplete = true;
+       }
+       else if (state->in_record)
+       {
+               state->incomplete = true;
+       }
+       else
+       {
+               state->incomplete = false;
+       }
+       return;
+
+not_enough_input:
+       state->needs_input = true;
+       goto out;
+
+not_enough_output:
+       state->needs_output = true;
+       goto out;
+}
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
new file mode 100644
index 0000000..7df98cf
--- /dev/null
+++ b/src/include/access/xlogreader.h
@@ -0,0 +1,173 @@
+/*-------------------------------------------------------------------------
+ *
+ * readxlog.h
+ *
+ * Generic xlog reading facility.
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ * src/include/access/readxlog.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _READXLOG_H
+#define _READXLOG_H
+
+#include "access/xlog_internal.h"
+
+typedef struct XLogRecordBuffer
+{
+       /* the record itself */
+       XLogRecord record;
+
+       /* at which LSN was that record found at */
+       XLogRecPtr origptr;
+
+       /* the data for xlog record */
+       char* record_data;
+       uint32 record_data_size;
+
+       BkpBlock bkp_block[XLR_MAX_BKP_BLOCKS];
+       char* bkp_block_data[XLR_MAX_BKP_BLOCKS];
+} XLogRecordBuffer;
+
+
+struct XLogReaderState;
+
+typedef bool (*XLogReaderStateInterestingCB)(struct XLogReaderState*, 
XLogRecord* r);
+typedef void (*XLogReaderStateWriteoutCB)(struct XLogReaderState*, char* data, 
Size len);
+typedef void (*XLogReaderStateFinishedRecordCB)(struct XLogReaderState*, 
XLogRecordBuffer* buf);
+typedef void (*XLogReaderStateReadPageCB)(struct XLogReaderState*, char* 
cur_page, XLogRecPtr at);
+
+typedef struct XLogReaderState
+{
+       /* ----------------------------------------
+        * Public parameters
+        * ----------------------------------------
+        */
+
+       /* callbacks */
+
+       /*
+        * Called to decide whether a xlog record is interesting and should be
+        * assembled, analyzed (finished_record) and written out or skipped.
+        */
+       XLogReaderStateInterestingCB is_record_interesting;
+
+       /*
+        * writeout data. This doesn't have to do anything if the data isn't 
needed
+        * lateron.
+        */
+       XLogReaderStateWriteoutCB writeout_data;
+
+       /*
+        * Gets called after a record, including the backup blocks, has been 
fully
+        * reassembled.
+        */
+       XLogReaderStateFinishedRecordCB finished_record;
+
+       /*
+        * Data input function. Has to read XLOG_BLKSZ blocks into cur_page
+        * although everything behind endptr does not have to be valid.
+        */
+       XLogReaderStateReadPageCB read_page;
+
+       /*
+        * this can be used by the caller to pass state to the callbacks without
+        * using global variables or such ugliness
+        */
+       void* private_data;
+
+
+       /* from where to where are we reading */
+
+       /* so we know where interesting data starts after scrolling back to the 
beginning of a page */
+       XLogRecPtr startptr;
+
+       /* continue up to here in this run */
+       XLogRecPtr endptr;
+
+
+       /* ----------------------------------------
+        * output parameters
+        * ----------------------------------------
+        */
+
+       /* we need new input data - a later endptr - to continue reading */
+       bool needs_input;
+
+       /* we need new output space to continue reading */
+       bool needs_output;
+
+       /* track our progress */
+       XLogRecPtr curptr;
+
+       /*
+        * are we in the middle of something? This is useful for the outside to
+        * know whether to start reading anew
+        */
+       bool incomplete;
+
+       /* ----------------------------------------
+        * private parameters
+        * ----------------------------------------
+        */
+
+       char cur_page[XLOG_BLCKSZ];
+       XLogPageHeader page_header;
+       uint32 page_header_size;
+       XLogRecordBuffer buf;
+
+
+       /* ----------------------------------------
+        * state machine variables
+        * ----------------------------------------
+        */
+
+       bool initialized;
+
+       /* are we currently reading a record */
+       bool in_record;
+
+       /* how many bkp blocks remain to be read */
+       int in_bkp_blocks;
+
+       /*
+        * the header of a bkp block can be split across pages, so we need to
+        * support reading that incrementally
+        */
+       int in_bkp_block_header;
+
+       /* we don't want to read this block, so keep track of that */
+       bool in_skip;
+
+       /* how much more to read in the current state */
+       uint32 remaining_size;
+
+       Size nbytes; /* size of sent data*/
+
+} XLogReaderState;
+
+/*
+ * Get a new XLogReader
+ *
+ * The 4 callbacks, startptr and endptr have to be set before the reader can be
+ * used.
+ */
+extern XLogReaderState* XLogReaderAllocate(void);
+
+/*
+ * Reset internal state so it can be used without continuing from the last
+ * state.
+ *
+ * The callbacks and private_data won't be reset
+ */
+extern void XLogReaderReset(XLogReaderState* state);
+
+/*
+ * Read the xlog and call the appropriate callbacks as far as possible within
+ * the constraints of input data (startptr, endptr) and output space.
+ */
+extern void XLogReaderRead(XLogReaderState* state);
+
+#endif
-- 
1.7.10.rc3.3.g19a6c.dirty


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to