I was working on adding the tar streaming functionality we talked about at the developer meeting to pg_basebackup, and rapidly ran across the issue that Andres has been complaining about for a while. The code in receivelog.c just passes an insane number of parameters around. Adding or changing even a small thing ends up touching a huge number of places.
Here's an attempt to refactor the code to instead pass around a control structure. I think it's a definite win already now, and we can't just keep adding new functionality on top of the current one. I'll proceed to work on the actual functionality I was working on to go on top of this separately, but would appreciate a review of this part independently. It's mostly mechanical, but there may definitely be mistakes - or thinkos in the whole idea... -- Magnus Hagander Me: http://www.hagander.net/ Work: http://www.redpill-linpro.com/
*** a/src/bin/pg_basebackup/pg_basebackup.c --- b/src/bin/pg_basebackup/pg_basebackup.c *************** *** 372,381 **** typedef struct static int LogStreamerMain(logstreamer_param *param) { ! if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, ! param->sysidentifier, param->xlogdir, ! reached_end_position, standby_message_timeout, ! NULL, false, true)) /* * Any errors will already have been reported in the function process, --- 372,391 ---- static int LogStreamerMain(logstreamer_param *param) { ! StreamCtl stream; ! ! MemSet(&stream, sizeof(stream), 0); ! stream.startpos = param->startptr; ! stream.timeline = param->timeline; ! stream.sysidentifier = param->sysidentifier; ! stream.stream_stop = reached_end_position; ! stream.standby_message_timeout = standby_message_timeout; ! stream.synchronous = false; ! stream.mark_done = true; ! stream.basedir = param->xlogdir; ! stream.partial_suffix = NULL; ! ! if (!ReceiveXlogStream(param->bgconn, &stream)) /* * Any errors will already have been reported in the function process, *** a/src/bin/pg_basebackup/pg_receivexlog.c --- b/src/bin/pg_basebackup/pg_receivexlog.c *************** *** 276,285 **** FindStreamingStart(uint32 *tli) static void StreamLog(void) { ! XLogRecPtr startpos, ! serverpos; ! TimeLineID starttli, ! servertli; /* * Connect in replication mode to the server --- 276,286 ---- static void StreamLog(void) { ! XLogRecPtr serverpos; ! TimeLineID servertli; ! StreamCtl stream; ! ! MemSet(&stream, 0, sizeof(stream)); /* * Connect in replication mode to the server *************** *** 311,327 **** StreamLog(void) /* * Figure out where to start streaming. */ ! startpos = FindStreamingStart(&starttli); ! if (startpos == InvalidXLogRecPtr) { ! startpos = serverpos; ! starttli = servertli; } /* * Always start streaming at the beginning of a segment */ ! startpos -= startpos % XLOG_SEG_SIZE; /* * Start the replication --- 312,328 ---- /* * Figure out where to start streaming. */ ! stream.startpos = FindStreamingStart(&stream.timeline); ! if (stream.startpos == InvalidXLogRecPtr) { ! stream.startpos = serverpos; ! stream.timeline = servertli; } /* * Always start streaming at the beginning of a segment */ ! stream.startpos -= stream.startpos % XLOG_SEG_SIZE; /* * Start the replication *************** *** 329,340 **** StreamLog(void) if (verbose) fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"), ! progname, (uint32) (startpos >> 32), (uint32) startpos, ! starttli); ! ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, ! stop_streaming, standby_message_timeout, ".partial", ! synchronous, false); PQfinish(conn); conn = NULL; --- 330,346 ---- if (verbose) fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"), ! progname, (uint32) (stream.startpos >> 32), (uint32) stream.startpos, ! stream.timeline); ! ! stream.stream_stop = stop_streaming; ! stream.standby_message_timeout = standby_message_timeout; ! stream.synchronous = synchronous; ! stream.mark_done = false; ! stream.basedir = basedir; ! stream.partial_suffix = ".partial"; ! ReceiveXlogStream(conn, &stream); PQfinish(conn); conn = NULL; *** a/src/bin/pg_basebackup/receivelog.c --- b/src/bin/pg_basebackup/receivelog.c *************** *** 33,59 **** static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; static bool still_sending = true; /* feedback still needs to be sent? */ ! static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, ! uint32 timeline, char *basedir, ! stream_stop_callback stream_stop, int standby_message_timeout, ! char *partial_suffix, XLogRecPtr *stoppos, ! bool synchronous, bool mark_done); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, XLogRecPtr blockpos, int64 *last_status); ! static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, ! XLogRecPtr *blockpos, uint32 timeline, ! char *basedir, stream_stop_callback stream_stop, ! char *partial_suffix, bool mark_done); ! static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf, ! XLogRecPtr blockpos, char *basedir, char *partial_suffix, ! XLogRecPtr *stoppos, bool mark_done); ! static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, ! uint32 timeline, char *basedir, ! stream_stop_callback stream_stop, ! char *partial_suffix, XLogRecPtr *stoppos, ! bool mark_done); static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, int64 last_status); --- 33,50 ---- static bool still_sending = true; /* feedback still needs to be sent? */ ! static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream, ! XLogRecPtr *stoppos); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, XLogRecPtr blockpos, int64 *last_status); ! static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, ! XLogRecPtr *blockpos); ! static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, ! XLogRecPtr blockpos, XLogRecPtr *stoppos); ! static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos, ! XLogRecPtr *stoppos); static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, int64 last_status); *************** *** 99,106 **** mark_file_as_archived(const char *basedir, const char *fname) * partial_suffix) is stored in current_walfile_name. */ static bool ! open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, ! char *partial_suffix) { int f; char fn[MAXPGPATH]; --- 90,96 ---- * partial_suffix) is stored in current_walfile_name. */ static bool ! open_walfile(StreamCtl *stream, XLogRecPtr startpoint) { int f; char fn[MAXPGPATH]; *************** *** 110,119 **** open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, XLogSegNo segno; XLByteToSeg(startpoint, segno); ! XLogFileName(current_walfile_name, timeline, segno); ! snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name, ! partial_suffix ? partial_suffix : ""); f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); if (f == -1) { --- 100,109 ---- XLogSegNo segno; XLByteToSeg(startpoint, segno); ! XLogFileName(current_walfile_name, stream->timeline, segno); ! snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name, ! stream->partial_suffix ? stream->partial_suffix : ""); f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); if (f == -1) { *************** *** 185,191 **** open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, * and returns false, otherwise returns true. */ static bool ! close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_done) { off_t currpos; --- 175,181 ---- * and returns false, otherwise returns true. */ static bool ! close_walfile(StreamCtl *stream, XLogRecPtr pos) { off_t currpos; *************** *** 220,232 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don /* * If we finished writing a .partial file, rename it into place. */ ! if (currpos == XLOG_SEG_SIZE && partial_suffix) { char oldfn[MAXPGPATH]; char newfn[MAXPGPATH]; ! snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix); ! snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name); if (rename(oldfn, newfn) != 0) { fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"), --- 210,222 ---- /* * If we finished writing a .partial file, rename it into place. */ ! if (currpos == XLOG_SEG_SIZE && stream->partial_suffix) { char oldfn[MAXPGPATH]; char newfn[MAXPGPATH]; ! snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix); ! snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name); if (rename(oldfn, newfn) != 0) { fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"), *************** *** 234,243 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don return false; } } ! else if (partial_suffix) fprintf(stderr, _("%s: not renaming \"%s%s\", segment is not complete\n"), ! progname, current_walfile_name, partial_suffix); /* * Mark file as archived if requested by the caller - pg_basebackup needs --- 224,233 ---- return false; } } ! else if (stream->partial_suffix) fprintf(stderr, _("%s: not renaming \"%s%s\", segment is not complete\n"), ! progname, current_walfile_name, stream->partial_suffix); /* * Mark file as archived if requested by the caller - pg_basebackup needs *************** *** 245,254 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don * new node. This is in line with walreceiver.c always doing a * XLogArchiveForceDone() after a complete segment. */ ! if (currpos == XLOG_SEG_SIZE && mark_done) { /* writes error message if failed */ ! if (!mark_file_as_archived(basedir, current_walfile_name)) return false; } --- 235,244 ---- * new node. This is in line with walreceiver.c always doing a * XLogArchiveForceDone() after a complete segment. */ ! if (currpos == XLOG_SEG_SIZE && stream->mark_done) { /* writes error message if failed */ ! if (!mark_file_as_archived(stream->basedir, current_walfile_name)) return false; } *************** *** 261,267 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don * Check if a timeline history file exists. */ static bool ! existsTimeLineHistoryFile(char *basedir, TimeLineID tli) { char path[MAXPGPATH]; char histfname[MAXFNAMELEN]; --- 251,257 ---- * Check if a timeline history file exists. */ static bool ! existsTimeLineHistoryFile(StreamCtl *stream) { char path[MAXPGPATH]; char histfname[MAXFNAMELEN]; *************** *** 271,282 **** existsTimeLineHistoryFile(char *basedir, TimeLineID tli) * Timeline 1 never has a history file. We treat that as if it existed, * since we never need to stream it. */ ! if (tli == 1) return true; ! TLHistoryFileName(histfname, tli); ! snprintf(path, sizeof(path), "%s/%s", basedir, histfname); fd = open(path, O_RDONLY | PG_BINARY, 0); if (fd < 0) --- 261,272 ---- * Timeline 1 never has a history file. We treat that as if it existed, * since we never need to stream it. */ ! if (stream->timeline == 1) return true; ! TLHistoryFileName(histfname, stream->timeline); ! snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname); fd = open(path, O_RDONLY | PG_BINARY, 0); if (fd < 0) *************** *** 294,301 **** existsTimeLineHistoryFile(char *basedir, TimeLineID tli) } static bool ! writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, ! char *content, bool mark_done) { int size = strlen(content); char path[MAXPGPATH]; --- 284,290 ---- } static bool ! writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) { int size = strlen(content); char path[MAXPGPATH]; *************** *** 307,321 **** writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, * Check that the server's idea of how timeline history files should be * named matches ours. */ ! TLHistoryFileName(histfname, tli); if (strcmp(histfname, filename) != 0) { fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"), ! progname, tli, filename); return false; } ! snprintf(path, sizeof(path), "%s/%s", basedir, histfname); /* * Write into a temp file name. --- 296,310 ---- * Check that the server's idea of how timeline history files should be * named matches ours. */ ! TLHistoryFileName(histfname, stream->timeline); if (strcmp(histfname, filename) != 0) { fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"), ! progname, stream->timeline, filename); return false; } ! snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname); /* * Write into a temp file name. *************** *** 375,384 **** writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, } /* Maintain archive_status, check close_walfile() for details. */ ! if (mark_done) { /* writes error message if failed */ ! if (!mark_file_as_archived(basedir, histfname)) return false; } --- 364,373 ---- } /* Maintain archive_status, check close_walfile() for details. */ ! if (stream->mark_done) { /* writes error message if failed */ ! if (!mark_file_as_archived(stream->basedir, histfname)) return false; } *************** *** 498,508 **** CheckServerVersionForStreaming(PGconn *conn) * Note: The log position *must* be at a log segment start! */ bool ! ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ! char *sysidentifier, char *basedir, ! stream_stop_callback stream_stop, ! int standby_message_timeout, char *partial_suffix, ! bool synchronous, bool mark_done) { char query[128]; char slotcmd[128]; --- 487,493 ---- * Note: The log position *must* be at a log segment start! */ bool ! ReceiveXlogStream(PGconn *conn, StreamCtl *stream) { char query[128]; char slotcmd[128]; *************** *** 539,545 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, slotcmd[0] = 0; } ! if (sysidentifier != NULL) { /* Validate system identifier hasn't changed */ res = PQexec(conn, "IDENTIFY_SYSTEM"); --- 524,530 ---- slotcmd[0] = 0; } ! if (stream->sysidentifier != NULL) { /* Validate system identifier hasn't changed */ res = PQexec(conn, "IDENTIFY_SYSTEM"); *************** *** 559,565 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, PQclear(res); return false; } ! if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0) { fprintf(stderr, _("%s: system identifier does not match between base backup and streaming connection\n"), --- 544,550 ---- PQclear(res); return false; } ! if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0) { fprintf(stderr, _("%s: system identifier does not match between base backup and streaming connection\n"), *************** *** 567,577 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, PQclear(res); return false; } ! if (timeline > atoi(PQgetvalue(res, 0, 1))) { fprintf(stderr, _("%s: starting timeline %u is not present in the server\n"), ! progname, timeline); PQclear(res); return false; } --- 552,562 ---- PQclear(res); return false; } ! if (stream->timeline > atoi(PQgetvalue(res, 0, 1))) { fprintf(stderr, _("%s: starting timeline %u is not present in the server\n"), ! progname, stream->timeline); PQclear(res); return false; } *************** *** 582,588 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * initialize flush position to starting point, it's the caller's * responsibility that that's sane. */ ! lastFlushPosition = startpos; while (1) { --- 567,573 ---- * initialize flush position to starting point, it's the caller's * responsibility that that's sane. */ ! lastFlushPosition = stream->startpos; while (1) { *************** *** 590,598 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Fetch the timeline history file for this timeline, if we don't have * it already. */ ! if (!existsTimeLineHistoryFile(basedir, timeline)) { ! snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline); res = PQexec(conn, query); if (PQresultStatus(res) != PGRES_TUPLES_OK) { --- 575,583 ---- * Fetch the timeline history file for this timeline, if we don't have * it already. */ ! if (!existsTimeLineHistoryFile(stream)) { ! snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline); res = PQexec(conn, query); if (PQresultStatus(res) != PGRES_TUPLES_OK) { *************** *** 615,624 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, } /* Write the history file to disk */ ! writeTimeLineHistoryFile(basedir, timeline, PQgetvalue(res, 0, 0), ! PQgetvalue(res, 0, 1), ! mark_done); PQclear(res); } --- 600,608 ---- } /* Write the history file to disk */ ! writeTimeLineHistoryFile(stream, PQgetvalue(res, 0, 0), ! PQgetvalue(res, 0, 1)); PQclear(res); } *************** *** 627,640 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Before we start streaming from the requested location, check if the * callback tells us to stop here. */ ! if (stream_stop(startpos, timeline, false)) return true; /* Initiate the replication stream at specified location */ snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u", slotcmd, ! (uint32) (startpos >> 32), (uint32) startpos, ! timeline); res = PQexec(conn, query); if (PQresultStatus(res) != PGRES_COPY_BOTH) { --- 611,624 ---- * Before we start streaming from the requested location, check if the * callback tells us to stop here. */ ! if (stream->stream_stop(stream->startpos, stream->timeline, false)) return true; /* Initiate the replication stream at specified location */ snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u", slotcmd, ! (uint32) (stream->startpos >> 32), (uint32) stream->startpos, ! stream->timeline); res = PQexec(conn, query); if (PQresultStatus(res) != PGRES_COPY_BOTH) { *************** *** 646,654 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, PQclear(res); /* Stream the WAL */ ! res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, ! standby_message_timeout, partial_suffix, ! &stoppos, synchronous, mark_done); if (res == NULL) goto error; --- 630,636 ---- PQclear(res); /* Stream the WAL */ ! res = HandleCopyStream(conn, stream, &stoppos); if (res == NULL) goto error; *************** *** 676,701 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, uint32 newtimeline; bool parsed; ! parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline); PQclear(res); if (!parsed) goto error; /* Sanity check the values the server gave us */ ! if (newtimeline <= timeline) { fprintf(stderr, _("%s: server reported unexpected next timeline %u, following timeline %u\n"), ! progname, newtimeline, timeline); goto error; } ! if (startpos > stoppos) { fprintf(stderr, _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"), progname, ! timeline, (uint32) (stoppos >> 32), (uint32) stoppos, ! newtimeline, (uint32) (startpos >> 32), (uint32) startpos); goto error; } --- 658,683 ---- uint32 newtimeline; bool parsed; ! parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline); PQclear(res); if (!parsed) goto error; /* Sanity check the values the server gave us */ ! if (newtimeline <= stream->timeline) { fprintf(stderr, _("%s: server reported unexpected next timeline %u, following timeline %u\n"), ! progname, newtimeline, stream->timeline); goto error; } ! if (stream->startpos > stoppos) { fprintf(stderr, _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"), progname, ! stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos, ! newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos); goto error; } *************** *** 715,722 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Loop back to start streaming from the new timeline. Always * start streaming at the beginning of a segment. */ ! timeline = newtimeline; ! startpos = startpos - (startpos % XLOG_SEG_SIZE); continue; } else if (PQresultStatus(res) == PGRES_COMMAND_OK) --- 697,704 ---- * Loop back to start streaming from the new timeline. Always * start streaming at the beginning of a segment. */ ! stream->timeline = newtimeline; ! stream->startpos = stream->startpos - (stream->startpos % XLOG_SEG_SIZE); continue; } else if (PQresultStatus(res) == PGRES_COMMAND_OK) *************** *** 729,735 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Check if the callback thinks it's OK to stop here. If not, * complain. */ ! if (stream_stop(stoppos, timeline, false)) return true; else { --- 711,717 ---- * Check if the callback thinks it's OK to stop here. If not, * complain. */ ! if (stream->stream_stop(stoppos, stream->timeline, false)) return true; else { *************** *** 810,823 **** ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline) * On any other sort of error, returns NULL. */ static PGresult * ! HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ! char *basedir, stream_stop_callback stream_stop, ! int standby_message_timeout, char *partial_suffix, ! XLogRecPtr *stoppos, bool synchronous, bool mark_done) { char *copybuf = NULL; int64 last_status = -1; ! XLogRecPtr blockpos = startpos; still_sending = true; --- 792,803 ---- * On any other sort of error, returns NULL. */ static PGresult * ! HandleCopyStream(PGconn *conn, StreamCtl *stream, ! XLogRecPtr *stoppos) { char *copybuf = NULL; int64 last_status = -1; ! XLogRecPtr blockpos = stream->startpos; still_sending = true; *************** *** 830,838 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* * Check if we should continue streaming, or abort at this point. */ ! if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, ! stream_stop, partial_suffix, stoppos, ! mark_done)) goto error; now = feGetCurrentTimestamp(); --- 810,816 ---- /* * Check if we should continue streaming, or abort at this point. */ ! if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos)) goto error; now = feGetCurrentTimestamp(); *************** *** 841,847 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * If synchronous option is true, issue sync command as soon as there * are WAL data which has not been flushed yet. */ ! if (synchronous && lastFlushPosition < blockpos && walfile != -1) { if (fsync(walfile) != 0) { --- 819,825 ---- * If synchronous option is true, issue sync command as soon as there * are WAL data which has not been flushed yet. */ ! if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1) { if (fsync(walfile) != 0) { *************** *** 863,871 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* * Potentially send a status message to the master */ ! if (still_sending && standby_message_timeout > 0 && feTimestampDifferenceExceeds(last_status, now, ! standby_message_timeout)) { /* Time to send feedback! */ if (!sendFeedback(conn, blockpos, now, false)) --- 841,849 ---- /* * Potentially send a status message to the master */ ! if (still_sending && stream->standby_message_timeout > 0 && feTimestampDifferenceExceeds(last_status, now, ! stream->standby_message_timeout)) { /* Time to send feedback! */ if (!sendFeedback(conn, blockpos, now, false)) *************** *** 876,882 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* * Calculate how long send/receive loops should sleep */ ! sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout, last_status); r = CopyStreamReceive(conn, sleeptime, ©buf); --- 854,860 ---- /* * Calculate how long send/receive loops should sleep */ ! sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout, last_status); r = CopyStreamReceive(conn, sleeptime, ©buf); *************** *** 886,894 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, goto error; if (r == -2) { ! PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos, ! basedir, partial_suffix, ! stoppos, mark_done); if (res == NULL) goto error; --- 864,870 ---- goto error; if (r == -2) { ! PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos); if (res == NULL) goto error; *************** *** 905,922 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, } else if (copybuf[0] == 'w') { ! if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos, ! timeline, basedir, stream_stop, ! partial_suffix, mark_done)) goto error; /* * Check if we should continue streaming, or abort at this * point. */ ! if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, ! stream_stop, partial_suffix, stoppos, ! mark_done)) goto error; } else --- 881,894 ---- } else if (copybuf[0] == 'w') { ! if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos)) goto error; /* * Check if we should continue streaming, or abort at this * point. */ ! if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos)) goto error; } else *************** *** 1113,1122 **** ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, * Process XLogData message. */ static bool ! ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, ! XLogRecPtr *blockpos, uint32 timeline, ! char *basedir, stream_stop_callback stream_stop, ! char *partial_suffix, bool mark_done) { int xlogoff; int bytes_left; --- 1085,1092 ---- * Process XLogData message. */ static bool ! ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, ! XLogRecPtr *blockpos) { int xlogoff; int bytes_left; *************** *** 1196,1203 **** ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, if (walfile == -1) { ! if (!open_walfile(*blockpos, timeline, ! basedir, partial_suffix)) { /* Error logged by open_walfile */ return false; --- 1166,1172 ---- if (walfile == -1) { ! if (!open_walfile(stream, *blockpos)) { /* Error logged by open_walfile */ return false; *************** *** 1224,1236 **** ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, /* Did we reach the end of a WAL segment? */ if (*blockpos % XLOG_SEG_SIZE == 0) { ! if (!close_walfile(basedir, partial_suffix, *blockpos, mark_done)) /* Error message written in close_walfile() */ return false; xlogoff = 0; ! if (still_sending && stream_stop(*blockpos, timeline, true)) { if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) { --- 1193,1205 ---- /* Did we reach the end of a WAL segment? */ if (*blockpos % XLOG_SEG_SIZE == 0) { ! if (!close_walfile(stream, *blockpos)) /* Error message written in close_walfile() */ return false; xlogoff = 0; ! if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true)) { if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) { *************** *** 1252,1260 **** ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, * Handle end of the copy stream. */ static PGresult * ! HandleEndOfCopyStream(PGconn *conn, char *copybuf, ! XLogRecPtr blockpos, char *basedir, char *partial_suffix, ! XLogRecPtr *stoppos, bool mark_done) { PGresult *res = PQgetResult(conn); --- 1221,1228 ---- * Handle end of the copy stream. */ static PGresult * ! HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, ! XLogRecPtr blockpos, XLogRecPtr *stoppos) { PGresult *res = PQgetResult(conn); *************** *** 1265,1271 **** HandleEndOfCopyStream(PGconn *conn, char *copybuf, */ if (still_sending) { ! if (!close_walfile(basedir, partial_suffix, blockpos, mark_done)) { /* Error message written in close_walfile() */ PQclear(res); --- 1233,1239 ---- */ if (still_sending) { ! if (!close_walfile(stream, blockpos)) { /* Error message written in close_walfile() */ PQclear(res); *************** *** 1295,1307 **** HandleEndOfCopyStream(PGconn *conn, char *copybuf, * Check if we should continue streaming, or abort at this point. */ static bool ! CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline, ! char *basedir, stream_stop_callback stream_stop, ! char *partial_suffix, XLogRecPtr *stoppos, bool mark_done) { ! if (still_sending && stream_stop(blockpos, timeline, false)) { ! if (!close_walfile(basedir, partial_suffix, blockpos, mark_done)) { /* Potential error message is written by close_walfile */ return false; --- 1263,1274 ---- * Check if we should continue streaming, or abort at this point. */ static bool ! CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos, ! XLogRecPtr *stoppos) { ! if (still_sending && stream->stream_stop(blockpos, stream->timeline, false)) { ! if (!close_walfile(stream, blockpos)) { /* Potential error message is written by close_walfile */ return false; *** a/src/bin/pg_basebackup/receivelog.h --- b/src/bin/pg_basebackup/receivelog.h *************** *** 22,37 **** */ typedef bool (*stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished); extern bool CheckServerVersionForStreaming(PGconn *conn); extern bool ReceiveXlogStream(PGconn *conn, ! XLogRecPtr startpos, ! uint32 timeline, ! char *sysidentifier, ! char *basedir, ! stream_stop_callback stream_stop, ! int standby_message_timeout, ! char *partial_suffix, ! bool synchronous, ! bool mark_done); #endif /* RECEIVELOG_H */ --- 22,49 ---- */ typedef bool (*stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished); + /* + * Global parameters when receiving xlog stream + */ + typedef struct + { + XLogRecPtr startpos; + TimeLineID timeline; + char *sysidentifier; + int standby_message_timeout; + bool synchronous; + bool mark_done; + + stream_stop_callback stream_stop; + + char *basedir; + char *partial_suffix; + } StreamCtl; + + + extern bool CheckServerVersionForStreaming(PGconn *conn); extern bool ReceiveXlogStream(PGconn *conn, ! StreamCtl *stream); #endif /* RECEIVELOG_H */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers