While working on the instance encryption I found it annoying to apply decyption of XLOG page to three different functions. Attached is a patch that tries to merge them all into one function, XLogRead(). The existing implementations differ in the way new segment is opened. So I added a pointer to callback function as a new argument. This callback handles the specific ways to determine segment file name and to open the file.
I can split the patch into multiple diffs to make detailed review easier, but first I'd like to hear if anything is seriously wrong about this design. Thanks. -- Antonin Houska Web: https://www.cybertec-postgresql.com
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index f9a4960f8a..444b5bf910 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1369,7 +1369,7 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed) bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); } - +static XLogReadPos *readPos = NULL; /* * Reads 2PC data from xlog. During checkpoint this data will be moved to @@ -1386,8 +1386,17 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) XLogReaderState *xlogreader; char *errormsg; - xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page, - NULL); + + /* First time through? */ + if (readPos == NULL) + readPos = XLogReadInitPos(); + + /* + * read_local_xlog_page() eventually calls XLogRead(), so pass the initial + * position. + */ + xlogreader = XLogReaderAllocate(wal_segment_size, read_local_xlog_page, + readPos); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 9196aa3aae..7d0fdfba87 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -17,6 +17,8 @@ */ #include "postgres.h" +#include <unistd.h> + #include "access/transam.h" #include "access/xlogrecord.h" #include "access/xlog_internal.h" @@ -26,6 +28,7 @@ #include "replication/origin.h" #ifndef FRONTEND +#include "pgstat.h" #include "utils/memutils.h" #endif @@ -1005,6 +1008,191 @@ out: #endif /* FRONTEND */ +/* + * Initialize XLOG file position for callers of XLogRead(). + */ +XLogReadPos * +XLogReadInitPos(void) +{ + XLogReadPos *pos = (XLogReadPos *) palloc(sizeof(XLogReadPos)); + + pos->segFile = -1; + pos->segNo = 0; + pos->segOff = 0; + pos->tli = 0; + pos->dir = NULL; + + return pos; +} + +#ifdef FRONTEND +/* + * Currently only pg_waldump.c is supposed to set these variables. + */ +const char *progname; +int WalSegSz; + +/* + * This is a front-end counterpart of XLogFileNameP. + */ +static char * +XLogFileNameFE(TimeLineID tli, XLogSegNo segno) +{ + char *result = palloc(MAXFNAMELEN); + + XLogFileName(result, tli, segno, WalSegSz); + return result; +} + +/* + * XXX pg_waldump.c needs this function. Is there a smart way to put it into + * src/common? + */ +static void fatal_error(const char *fmt,...) pg_attribute_printf(1, 2); + +static void +fatal_error(const char *fmt,...) +{ + va_list args; + + fflush(stdout); + + fprintf(stderr, _("%s: FATAL: "), progname); + va_start(args, fmt); + vfprintf(stderr, _(fmt), args); + va_end(args); + fputc('\n', stderr); + + exit(EXIT_FAILURE); +} +#endif /* FRONTEND */ + +/* + * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'. If + * tli is passed, get the data from timeline *tli. 'pos' is the current + * position in the XLOG file and openSegment is a callback that opens the next + * segment for reading. + * + * XXX probably this should be improved to suck data directly from the + * WAL buffers when possible. + * + * Will open, and keep open, one WAL segment stored in the global file + * descriptor sendFile. This means if XLogRead is used once, there will + * always be one descriptor left open until the process ends, but never + * more than one. + */ +void +XLogRead(char *buf, XLogRecPtr startptr, Size count, + TimeLineID *tli, XLogReadPos *pos, XLogOpenSegment openSegment) +{ + char *p; + XLogRecPtr recptr; + Size nbytes; + + p = buf; + recptr = startptr; + nbytes = count; + + while (nbytes > 0) + { + uint32 startoff; + int segbytes; + int readbytes; + int segsize; + +#ifndef FRONTEND + segsize = wal_segment_size; +#else + segsize = WalSegSz; +#endif + + startoff = XLogSegmentOffset(recptr, segsize); + + if (pos->segFile < 0 || + !XLByteInSeg(recptr, pos->segNo, segsize) || + (tli != NULL && *tli != pos->tli)) + { + XLogSegNo nextSegNo; + + /* Switch to another logfile segment */ + if (pos->segFile >= 0) + close(pos->segFile); + + XLByteToSeg(recptr, nextSegNo, segsize); + + /* Open the next segment in the caller's way. */ + openSegment(nextSegNo, tli, pos); + } + + /* Need to seek in the file? */ + if (pos->segOff != startoff) + { + if (lseek(pos->segFile, (off_t) startoff, SEEK_SET) < 0) +#ifndef FRONTEND + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek in log segment %s to offset %u: %m", + XLogFileNameP(pos->tli, pos->segNo), + startoff))); +#else + fatal_error("could not seek in log segment %s to offset %u", + XLogFileNameFE(pos->tli, pos->segNo), startoff); +#endif + pos->segOff = startoff; + } + + /* How many bytes are within this segment? */ + if (nbytes > (segsize - startoff)) + segbytes = segsize - startoff; + else + segbytes = nbytes; + +#ifndef FRONTEND + pgstat_report_wait_start(WAIT_EVENT_WAL_READ); +#endif + + readbytes = read(pos->segFile, p, segbytes); + +#ifndef FRONTEND + pgstat_report_wait_end(); +#endif + if (readbytes < 0) + { +#ifndef FRONTEND + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from log segment %s, offset %u, length %zu: %m", + XLogFileNameP(pos->tli, pos->segNo), pos->segOff, + (Size) segbytes))); +#else + fatal_error("could not read from log segment %s, offset %u, length %zu", + XLogFileNameFE(pos->tli, pos->segNo), pos->segOff, + (Size) segbytes); +#endif + } + else if (readbytes == 0) + { +#ifndef FRONTEND + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read from log segment %s, offset %u: read %d of %zu", + XLogFileNameP(pos->tli, pos->segNo), pos->segOff, + readbytes, (Size) segbytes))); +#else + fatal_error("could not read from log segment %s, offset %u: read %d of %zu", + XLogFileNameFE(pos->tli, pos->segNo), pos->segOff, + readbytes, (Size) segbytes); +#endif + } + + /* Update state for read */ + recptr += readbytes; + + pos->segOff += readbytes; + nbytes -= readbytes; + p += readbytes; + } +} /* ---------------------------------------- * Functions for decoding the data and block references in a record. diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 10a663bae6..4f29c89c06 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -639,128 +639,6 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, forget_invalid_pages(rnode, forkNum, nblocks); } -/* - * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' - * in timeline 'tli'. - * - * Will open, and keep open, one WAL segment stored in the static file - * descriptor 'sendFile'. This means if XLogRead is used once, there will - * always be one descriptor left open until the process ends, but never - * more than one. - * - * XXX This is very similar to pg_waldump's XLogDumpXLogRead and to XLogRead - * in walsender.c but for small differences (such as lack of elog() in - * frontend). Probably these should be merged at some point. - */ -static void -XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, - Size count) -{ - char *p; - XLogRecPtr recptr; - Size nbytes; - - /* state maintained across calls */ - static int sendFile = -1; - static XLogSegNo sendSegNo = 0; - static TimeLineID sendTLI = 0; - static uint32 sendOff = 0; - - Assert(segsize == wal_segment_size); - - p = buf; - recptr = startptr; - nbytes = count; - - while (nbytes > 0) - { - uint32 startoff; - int segbytes; - int readbytes; - - startoff = XLogSegmentOffset(recptr, segsize); - - /* Do we need to switch to a different xlog segment? */ - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, segsize) || - sendTLI != tli) - { - char path[MAXPGPATH]; - - if (sendFile >= 0) - close(sendFile); - - XLByteToSeg(recptr, sendSegNo, segsize); - - XLogFilePath(path, tli, sendSegNo, segsize); - - sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY); - - if (sendFile < 0) - { - if (errno == ENOENT) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("requested WAL segment %s has already been removed", - path))); - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); - } - sendOff = 0; - sendTLI = tli; - } - - /* Need to seek in the file? */ - if (sendOff != startoff) - { - if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) - { - char path[MAXPGPATH]; - int save_errno = errno; - - XLogFilePath(path, tli, sendSegNo, segsize); - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - path, startoff))); - } - sendOff = startoff; - } - - /* How many bytes are within this segment? */ - if (nbytes > (segsize - startoff)) - segbytes = segsize - startoff; - else - segbytes = nbytes; - - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - readbytes = read(sendFile, p, segbytes); - pgstat_report_wait_end(); - if (readbytes <= 0) - { - char path[MAXPGPATH]; - int save_errno = errno; - - XLogFilePath(path, tli, sendSegNo, segsize); - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u, length %lu: %m", - path, sendOff, (unsigned long) segbytes))); - } - - /* Update state for read */ - recptr += readbytes; - - sendOff += readbytes; - nbytes -= readbytes; - p += readbytes; - } -} - /* * Determine which timeline to read an xlog page from and set the * XLogReaderState's currTLI to that timeline ID. @@ -896,6 +774,37 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa } } +/* + * Callback for XLogRead() to open the next segment. + */ +static void +read_local_xlog_page_open_segment(XLogSegNo segNo, TimeLineID *tli, + XLogReadPos *pos) +{ + char path[MAXPGPATH]; + + XLogFilePath(path, *tli, segNo, wal_segment_size); + pos->segFile = BasicOpenFile(path, O_RDONLY | PG_BINARY); + + if (pos->segFile < 0) + { + if (errno == ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + path))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + } + + pos->segNo = segNo; + pos->segOff = 0; + pos->tli = *tli; +} + /* * read_page callback for reading local xlog files * @@ -1017,14 +926,16 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, count = read_upto - targetPagePtr; } + Assert(state->wal_segment_size == wal_segment_size); + /* * Even though we just determined how much of the page can be validly read * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - XLogRead(cur_page, state->wal_segment_size, *pageTLI, targetPagePtr, - XLOG_BLCKSZ); - + XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, pageTLI, + (XLogReadPos *) state->private_data, + read_local_xlog_page_open_segment); /* number of valid bytes in the buffer */ return count; } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 424fe86a1b..20c1ad4a35 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -124,6 +124,7 @@ StartupDecodingContext(List *output_plugin_options, bool need_full_snapshot, bool fast_forward, XLogPageReadCB read_page, + void *read_page_arg, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -172,14 +173,13 @@ StartupDecodingContext(List *output_plugin_options, ctx->slot = slot; - ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, ctx); + ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, + read_page_arg); if (!ctx->reader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); - ctx->reader->private_data = ctx; - ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn, @@ -234,6 +234,7 @@ CreateInitDecodingContext(char *plugin, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogPageReadCB read_page, + void *read_page_arg, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -330,8 +331,8 @@ CreateInitDecodingContext(char *plugin, ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, need_full_snapshot, false, - read_page, prepare_write, do_write, - update_progress); + read_page, read_page_arg, prepare_write, + do_write, update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -376,6 +377,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, + void *read_page_arg, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -428,8 +430,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, - fast_forward, read_page, prepare_write, - do_write, update_progress); + fast_forward, read_page, read_page_arg, + prepare_write, do_write, update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index d974400d6e..b2f30d53f5 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -248,13 +248,20 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin PG_TRY(); { - /* restart at slot's confirmed_flush */ + /* + * Restart at slot's confirmed_flush. + * + * logical_read_local_xlog_page() eventually calls XLogRead(), so set + * the initial position. + */ ctx = CreateDecodingContext(InvalidXLogRecPtr, options, false, logical_read_local_xlog_page, + XLogReadInitPos(), LogicalOutputPrepareWrite, - LogicalOutputWrite, NULL); + LogicalOutputWrite, + NULL); MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 182fe5bc82..dbcaa9c1d8 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -14,6 +14,7 @@ #include "access/htup_details.h" #include "access/xlog_internal.h" +#include "access/xlogreader.h" #include "funcapi.h" #include "miscadmin.h" #include "replication/decode.h" @@ -144,8 +145,9 @@ create_logical_replication_slot(char *name, char *plugin, ctx = CreateInitDecodingContext(plugin, NIL, false, /* do not build snapshot */ restart_lsn, - logical_read_local_xlog_page, NULL, NULL, - NULL); + logical_read_local_xlog_page, + XLogReadInitPos(), + NULL, NULL, NULL); /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); @@ -401,11 +403,15 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) * Create our decoding context in fast_forward mode, passing start_lsn * as InvalidXLogRecPtr, so that we start processing from my slot's * confirmed_flush. + * + * logical_read_local_xlog_page() eventually calls XLogRead(), so set + * the initial position. */ ctx = CreateDecodingContext(InvalidXLogRecPtr, NIL, true, /* fast_forward */ logical_read_local_xlog_page, + XLogReadInitPos(), NULL, NULL, NULL); /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index aae6adc15c..56f9ae88b1 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -128,16 +128,7 @@ bool log_replication_commands = false; */ bool wake_wal_senders = false; -/* - * These variables are used similarly to openLogFile/SegNo/Off, - * but for walsender to read the XLOG. - */ -static int sendFile = -1; -static XLogSegNo sendSegNo = 0; -static uint32 sendOff = 0; - -/* Timeline ID of the currently open file */ -static TimeLineID curFileTimeLine = 0; +static XLogReadPos *sendPos= NULL; /* * These variables keep track of the state of the timeline we're currently @@ -256,7 +247,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); -static void XLogRead(char *buf, XLogRecPtr startptr, Size count); +static void WalSndOpenSegment(XLogSegNo segNo, TimeLineID *tli, + XLogReadPos *pos); /* Initialize walsender process before entering the main command loop */ @@ -285,6 +277,9 @@ InitWalSender(void) /* Initialize empty timestamp buffer for lag tracking. */ lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker)); + + /* Make sure we can remember the current read position in XLOG. */ + sendPos = XLogReadInitPos(); } /* @@ -301,10 +296,10 @@ WalSndErrorCleanup(void) ConditionVariableCancelSleep(); pgstat_report_wait_end(); - if (sendFile >= 0) + if (sendPos && sendPos->segFile >= 0) { - close(sendFile); - sendFile = -1; + close(sendPos->segFile); + sendPos->segFile = -1; } if (MyReplicationSlot != NULL) @@ -787,7 +782,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count = flushptr - targetPagePtr; /* part of the page available */ /* now actually read the data, we know it's there */ - XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ); + XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, NULL, sendPos, + WalSndOpenSegment); return count; } @@ -933,9 +929,13 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) need_full_snapshot = true; } + /* + * logical_read_xlog_page() eventually calls XLogRead(), so pass the + * initial position. + */ ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, InvalidXLogRecPtr, - logical_read_xlog_page, + logical_read_xlog_page, sendPos, WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); @@ -1083,10 +1083,13 @@ StartLogicalReplication(StartReplicationCmd *cmd) * position. * * Do this before sending CopyBoth, so that any errors are reported early. + * + * logical_read_xlog_page() eventually calls XLogRead(), so pass the + * initial position. */ logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options, false, - logical_read_xlog_page, + logical_read_xlog_page, sendPos, WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); @@ -2344,187 +2347,76 @@ WalSndKill(int code, Datum arg) } /* - * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' - * - * XXX probably this should be improved to suck data directly from the - * WAL buffers when possible. - * - * Will open, and keep open, one WAL segment stored in the global file - * descriptor sendFile. This means if XLogRead is used once, there will - * always be one descriptor left open until the process ends, but never - * more than one. + * Callback for XLogRead() to open the next segment. */ -static void -XLogRead(char *buf, XLogRecPtr startptr, Size count) +void +WalSndOpenSegment(XLogSegNo segNo, TimeLineID *tli, XLogReadPos *pos) { - char *p; - XLogRecPtr recptr; - Size nbytes; - XLogSegNo segno; + char path[MAXPGPATH]; -retry: - p = buf; - recptr = startptr; - nbytes = count; + /* + * The timeline is determined below, caller should not do anything about + * it. + */ + Assert(tli == NULL); - while (nbytes > 0) + /*------- + * When reading from a historic timeline, and there is a timeline switch + * within this segment, read from the WAL segment belonging to the new + * timeline. + * + * For example, imagine that this server is currently on timeline 5, and + * we're streaming timeline 4. The switch from timeline 4 to 5 happened at + * 0/13002088. In pg_wal, we have these files: + * + * ... + * 000000040000000000000012 + * 000000040000000000000013 + * 000000050000000000000013 + * 000000050000000000000014 + * ... + * + * In this situation, when requested to send the WAL from segment 0x13, on + * timeline 4, we read the WAL from file 000000050000000000000013. Archive + * recovery prefers files from newer timelines, so if the segment was + * restored from the archive on this server, the file belonging to the old + * timeline, 000000040000000000000013, might not exist. Their contents are + * equal up to the switchpoint, because at a timeline switch, the used + * portion of the old segment is copied to the new file. ------- + */ + pos->tli = sendTimeLine; + if (sendTimeLineIsHistoric) { - uint32 startoff; - int segbytes; - int readbytes; - - startoff = XLogSegmentOffset(recptr, wal_segment_size); - - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size)) - { - char path[MAXPGPATH]; - - /* Switch to another logfile segment */ - if (sendFile >= 0) - close(sendFile); + XLogSegNo endSegNo; - XLByteToSeg(recptr, sendSegNo, wal_segment_size); - - /*------- - * When reading from a historic timeline, and there is a timeline - * switch within this segment, read from the WAL segment belonging - * to the new timeline. - * - * For example, imagine that this server is currently on timeline - * 5, and we're streaming timeline 4. The switch from timeline 4 - * to 5 happened at 0/13002088. In pg_wal, we have these files: - * - * ... - * 000000040000000000000012 - * 000000040000000000000013 - * 000000050000000000000013 - * 000000050000000000000014 - * ... - * - * In this situation, when requested to send the WAL from - * segment 0x13, on timeline 4, we read the WAL from file - * 000000050000000000000013. Archive recovery prefers files from - * newer timelines, so if the segment was restored from the - * archive on this server, the file belonging to the old timeline, - * 000000040000000000000013, might not exist. Their contents are - * equal up to the switchpoint, because at a timeline switch, the - * used portion of the old segment is copied to the new file. - *------- - */ - curFileTimeLine = sendTimeLine; - if (sendTimeLineIsHistoric) - { - XLogSegNo endSegNo; - - XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size); - if (sendSegNo == endSegNo) - curFileTimeLine = sendTimeLineNextTLI; - } - - XLogFilePath(path, curFileTimeLine, sendSegNo, wal_segment_size); - - sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (sendFile < 0) - { - /* - * If the file is not found, assume it's because the standby - * asked for a too old WAL segment that has already been - * removed or recycled. - */ - if (errno == ENOENT) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("requested WAL segment %s has already been removed", - XLogFileNameP(curFileTimeLine, sendSegNo)))); - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); - } - sendOff = 0; - } - - /* Need to seek in the file? */ - if (sendOff != startoff) - { - if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - XLogFileNameP(curFileTimeLine, sendSegNo), - startoff))); - sendOff = startoff; - } + XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size); + if (pos->segNo == endSegNo) + pos->tli = sendTimeLineNextTLI; + } - /* How many bytes are within this segment? */ - if (nbytes > (wal_segment_size - startoff)) - segbytes = wal_segment_size - startoff; - else - segbytes = nbytes; + XLogFilePath(path, pos->tli, segNo, wal_segment_size); + pos->segFile = BasicOpenFile(path, O_RDONLY | PG_BINARY); - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - readbytes = read(sendFile, p, segbytes); - pgstat_report_wait_end(); - if (readbytes < 0) - { + if (pos->segFile < 0) + { + /* + * If the file is not found, assume it's because the standby asked for + * a too old WAL segment that has already been removed or recycled. + */ + if (errno == ENOENT) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u, length %zu: %m", - XLogFileNameP(curFileTimeLine, sendSegNo), - sendOff, (Size) segbytes))); - } - else if (readbytes == 0) - { + errmsg("requested WAL segment %s has already been removed", + XLogFileNameP(pos->tli, pos->segNo)))); + else ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read from log segment %s, offset %u: read %d of %zu", - XLogFileNameP(curFileTimeLine, sendSegNo), - sendOff, readbytes, (Size) segbytes))); - } - - /* Update state for read */ - recptr += readbytes; - - sendOff += readbytes; - nbytes -= readbytes; - p += readbytes; + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); } - /* - * After reading into the buffer, check that what we read was valid. We do - * this after reading, because even though the segment was present when we - * opened it, it might get recycled or removed while we read it. The - * read() succeeds in that case, but the data we tried to read might - * already have been overwritten with new WAL records. - */ - XLByteToSeg(startptr, segno, wal_segment_size); - CheckXLogRemoved(segno, ThisTimeLineID); - - /* - * During recovery, the currently-open WAL file might be replaced with the - * file of the same name retrieved from archive. So we always need to - * check what we read was valid after reading into the buffer. If it's - * invalid, we try to open and read the file again. - */ - if (am_cascading_walsender) - { - WalSnd *walsnd = MyWalSnd; - bool reload; - - SpinLockAcquire(&walsnd->mutex); - reload = walsnd->needreload; - walsnd->needreload = false; - SpinLockRelease(&walsnd->mutex); - - if (reload && sendFile >= 0) - { - close(sendFile); - sendFile = -1; - - goto retry; - } - } + pos->segNo = segNo; + pos->segOff = 0; } /* @@ -2544,6 +2436,7 @@ XLogSendPhysical(void) XLogRecPtr startptr; XLogRecPtr endptr; Size nbytes; + XLogSegNo segno; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) @@ -2686,9 +2579,9 @@ XLogSendPhysical(void) if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) { /* close the current file. */ - if (sendFile >= 0) - close(sendFile); - sendFile = -1; + if (sendPos->segFile >= 0) + close(sendPos->segFile); + sendPos->segFile = -1; /* Send CopyDone */ pq_putmessage_noblock('c', NULL, 0); @@ -2759,7 +2652,48 @@ XLogSendPhysical(void) * calls. */ enlargeStringInfo(&output_message, nbytes); - XLogRead(&output_message.data[output_message.len], startptr, nbytes); + +retry: + XLogRead(&output_message.data[output_message.len], startptr, nbytes, + NULL, /* WalSndOpenSegment will determine TLI */ + sendPos, + WalSndOpenSegment); + + /* + * After reading into the buffer, check that what we read was valid. We do + * this after reading, because even though the segment was present when we + * opened it, it might get recycled or removed while we read it. The + * read() succeeds in that case, but the data we tried to read might + * already have been overwritten with new WAL records. + */ + XLByteToSeg(startptr, segno, wal_segment_size); + CheckXLogRemoved(segno, ThisTimeLineID); + + /* + * During recovery, the currently-open WAL file might be replaced with the + * file of the same name retrieved from archive. So we always need to + * check what we read was valid after reading into the buffer. If it's + * invalid, we try to open and read the file again. + */ + if (am_cascading_walsender) + { + WalSnd *walsnd = MyWalSnd; + bool reload; + + SpinLockAcquire(&walsnd->mutex); + reload = walsnd->needreload; + walsnd->needreload = false; + SpinLockRelease(&walsnd->mutex); + + if (reload && sendPos->segFile >= 0) + { + close(sendPos->segFile); + sendPos->segFile = -1; + + goto retry; + } + } + output_message.len += nbytes; output_message.data[output_message.len] = '\0'; diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index e106fb2ed1..7dd63dd735 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -26,9 +26,9 @@ #include "rmgrdesc.h" -static const char *progname; +const char *progname; -static int WalSegSz; +int WalSegSz; typedef struct XLogDumpPrivate { @@ -37,6 +37,7 @@ typedef struct XLogDumpPrivate XLogRecPtr startptr; XLogRecPtr endptr; bool endptr_reached; + XLogReadPos *pos; } XLogDumpPrivate; typedef struct XLogDumpConfig @@ -296,126 +297,45 @@ identify_target_directory(XLogDumpPrivate *private, char *directory, fatal_error("could not find any WAL file"); } -/* - * Read count bytes from a segment file in the specified directory, for the - * given timeline, containing the specified record pointer; store the data in - * the passed buffer. - */ static void -XLogDumpXLogRead(const char *directory, TimeLineID timeline_id, - XLogRecPtr startptr, char *buf, Size count) +XLogDumpOpenSegment(XLogSegNo segNo, TimeLineID *tli, XLogReadPos *pos) { - char *p; - XLogRecPtr recptr; - Size nbytes; - - static int sendFile = -1; - static XLogSegNo sendSegNo = 0; - static uint32 sendOff = 0; + char fname[MAXPGPATH]; + int tries; - p = buf; - recptr = startptr; - nbytes = count; + XLogFileName(fname, *tli, segNo, WalSegSz); - while (nbytes > 0) + /* + * In follow mode there is a short period of time after the server has + * written the end of the previous file before the new file is + * available. So we loop for 5 seconds looking for the file to appear + * before giving up. + */ + for (tries = 0; tries < 10; tries++) { - uint32 startoff; - int segbytes; - int readbytes; - - startoff = XLogSegmentOffset(recptr, WalSegSz); - - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, WalSegSz)) - { - char fname[MAXFNAMELEN]; - int tries; - - /* Switch to another logfile segment */ - if (sendFile >= 0) - close(sendFile); - - XLByteToSeg(recptr, sendSegNo, WalSegSz); - - XLogFileName(fname, timeline_id, sendSegNo, WalSegSz); - - /* - * In follow mode there is a short period of time after the server - * has written the end of the previous file before the new file is - * available. So we loop for 5 seconds looking for the file to - * appear before giving up. - */ - for (tries = 0; tries < 10; tries++) - { - sendFile = open_file_in_directory(directory, fname); - if (sendFile >= 0) - break; - if (errno == ENOENT) - { - int save_errno = errno; - - /* File not there yet, try again */ - pg_usleep(500 * 1000); - - errno = save_errno; - continue; - } - /* Any other error, fall through and fail */ - break; - } - - if (sendFile < 0) - fatal_error("could not find file \"%s\": %s", - fname, strerror(errno)); - sendOff = 0; - } - - /* Need to seek in the file? */ - if (sendOff != startoff) - { - if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) - { - int err = errno; - char fname[MAXPGPATH]; - - XLogFileName(fname, timeline_id, sendSegNo, WalSegSz); - - fatal_error("could not seek in log file %s to offset %u: %s", - fname, startoff, strerror(err)); - } - sendOff = startoff; - } - - /* How many bytes are within this segment? */ - if (nbytes > (WalSegSz - startoff)) - segbytes = WalSegSz - startoff; - else - segbytes = nbytes; - - readbytes = read(sendFile, p, segbytes); - if (readbytes <= 0) + pos->segFile = open_file_in_directory(pos->dir, fname); + if (pos->segFile >= 0) + break; + if (errno == ENOENT) { - int err = errno; - char fname[MAXPGPATH]; int save_errno = errno; - XLogFileName(fname, timeline_id, sendSegNo, WalSegSz); - errno = save_errno; + /* File not there yet, try again */ + pg_usleep(500 * 1000); - if (readbytes < 0) - fatal_error("could not read from log file %s, offset %u, length %d: %s", - fname, sendOff, segbytes, strerror(err)); - else if (readbytes == 0) - fatal_error("could not read from log file %s, offset %u: read %d of %zu", - fname, sendOff, readbytes, (Size) segbytes); + errno = save_errno; + continue; } + /* Any other error, fall through and fail */ + break; + } - /* Update state for read */ - recptr += readbytes; + if (pos->segFile < 0) + fatal_error("could not find file \"%s\": %s", + fname, strerror(errno)); - sendOff += readbytes; - nbytes -= readbytes; - p += readbytes; - } + pos->segNo = segNo; + pos->segOff = 0; } /* @@ -441,8 +361,8 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, } } - XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr, - readBuff, count); + XLogRead(readBuff, targetPagePtr, count, &private->timeline, + private->pos, XLogDumpOpenSegment); return count; } @@ -852,6 +772,7 @@ main(int argc, char **argv) private.startptr = InvalidXLogRecPtr; private.endptr = InvalidXLogRecPtr; private.endptr_reached = false; + private.pos = XLogReadInitPos(); config.bkp_details = false; config.stop_after_records = -1; @@ -1083,6 +1004,9 @@ main(int argc, char **argv) else identify_target_directory(&private, private.inpath, NULL); + /* The XLOG position can be used separate from "private". */ + private.pos->dir = private.inpath; + /* we don't know what to print */ if (XLogRecPtrIsInvalid(private.startptr)) { diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index f3bae0bf49..9bddbd3042 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -219,6 +219,31 @@ extern void XLogReaderInvalReadState(XLogReaderState *state); extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); #endif /* FRONTEND */ +/* + * Position in XLOG file while reading it. + */ +typedef struct XLogReadPos +{ + int segFile; /* segment file descriptor */ + XLogSegNo segNo; /* segment number */ + uint32 segOff; /* offset in the segment */ + TimeLineID tli; /* timeline ID of the currently open file */ + + char *dir; /* directory (only needed by frontends) */ +} XLogReadPos; + +/* + * Callback to open the specified XLOG segment 'segNo' in timeline 'tli' for + * reading and update the position accordingly. + */ +typedef void (*XLogOpenSegment) (XLogSegNo segNo, TimeLineID *tli, + XLogReadPos *pos); + +extern XLogReadPos *XLogReadInitPos(void); +extern void XLogRead(char *buf, XLogRecPtr startptr, Size count, + TimeLineID *tli, XLogReadPos *pos, + XLogOpenSegment openSegment); + /* Functions for decoding an XLogRecord */ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 0a2a63a48c..59b29433eb 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -99,6 +99,7 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogPageReadCB read_page, + void *read_page_arg, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress); @@ -107,6 +108,7 @@ extern LogicalDecodingContext *CreateDecodingContext( List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, + void *read_page_arg, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress);