I was working on adding the tar streaming functionality we talked about at
the developer meeting to pg_basebackup, and rapidly ran across the issue
that Andres has been complaining about for a while. The code in
receivelog.c just passes an insane number of parameters around. Adding or
changing even a small thing ends up touching a huge number of places.

Here's an attempt to refactor the code to instead pass around a control
structure. I think it's a definite win already now, and we can't just keep
adding new functionality on top of the current one.

I'll proceed to work on the actual functionality I was working on to go on
top of this separately, but would appreciate a review of this part
independently. It's mostly mechanical, but there may definitely be mistakes
- or thinkos in the whole idea...

-- 
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 372,381 **** typedef struct
  static int
  LogStreamerMain(logstreamer_param *param)
  {
! 	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
! 						   param->sysidentifier, param->xlogdir,
! 						   reached_end_position, standby_message_timeout,
! 						   NULL, false, true))
  
  		/*
  		 * Any errors will already have been reported in the function process,
--- 372,391 ----
  static int
  LogStreamerMain(logstreamer_param *param)
  {
! 	StreamCtl	stream;
! 
! 	MemSet(&stream, sizeof(stream), 0);
! 	stream.startpos = param->startptr;
! 	stream.timeline = param->timeline;
! 	stream.sysidentifier = param->sysidentifier;
! 	stream.stream_stop = reached_end_position;
! 	stream.standby_message_timeout = standby_message_timeout;
! 	stream.synchronous = false;
! 	stream.mark_done = true;
! 	stream.basedir = param->xlogdir;
! 	stream.partial_suffix = NULL;
! 
! 	if (!ReceiveXlogStream(param->bgconn, &stream))
  
  		/*
  		 * Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 276,285 **** FindStreamingStart(uint32 *tli)
  static void
  StreamLog(void)
  {
! 	XLogRecPtr	startpos,
! 				serverpos;
! 	TimeLineID	starttli,
! 				servertli;
  
  	/*
  	 * Connect in replication mode to the server
--- 276,286 ----
  static void
  StreamLog(void)
  {
! 	XLogRecPtr	serverpos;
! 	TimeLineID	servertli;
! 	StreamCtl	stream;
! 
! 	MemSet(&stream, 0, sizeof(stream));
  
  	/*
  	 * Connect in replication mode to the server
***************
*** 311,327 **** StreamLog(void)
  	/*
  	 * Figure out where to start streaming.
  	 */
! 	startpos = FindStreamingStart(&starttli);
! 	if (startpos == InvalidXLogRecPtr)
  	{
! 		startpos = serverpos;
! 		starttli = servertli;
  	}
  
  	/*
  	 * Always start streaming at the beginning of a segment
  	 */
! 	startpos -= startpos % XLOG_SEG_SIZE;
  
  	/*
  	 * Start the replication
--- 312,328 ----
  	/*
  	 * Figure out where to start streaming.
  	 */
! 	stream.startpos = FindStreamingStart(&stream.timeline);
! 	if (stream.startpos == InvalidXLogRecPtr)
  	{
! 		stream.startpos = serverpos;
! 		stream.timeline = servertli;
  	}
  
  	/*
  	 * Always start streaming at the beginning of a segment
  	 */
! 	stream.startpos -= stream.startpos % XLOG_SEG_SIZE;
  
  	/*
  	 * Start the replication
***************
*** 329,340 **** StreamLog(void)
  	if (verbose)
  		fprintf(stderr,
  				_("%s: starting log streaming at %X/%X (timeline %u)\n"),
! 				progname, (uint32) (startpos >> 32), (uint32) startpos,
! 				starttli);
  
! 	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial",
! 					  synchronous, false);
  
  	PQfinish(conn);
  	conn = NULL;
--- 330,346 ----
  	if (verbose)
  		fprintf(stderr,
  				_("%s: starting log streaming at %X/%X (timeline %u)\n"),
! 		progname, (uint32) (stream.startpos >> 32), (uint32) stream.startpos,
! 				stream.timeline);
! 
! 	stream.stream_stop = stop_streaming;
! 	stream.standby_message_timeout = standby_message_timeout;
! 	stream.synchronous = synchronous;
! 	stream.mark_done = false;
! 	stream.basedir = basedir;
! 	stream.partial_suffix = ".partial";
  
! 	ReceiveXlogStream(conn, &stream);
  
  	PQfinish(conn);
  	conn = NULL;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 33,59 **** static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
  
  static bool still_sending = true;		/* feedback still needs to be sent? */
  
! static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
! 				 uint32 timeline, char *basedir,
! 			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos,
! 				 bool synchronous, bool mark_done);
  static int	CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int	CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
  					XLogRecPtr blockpos, int64 *last_status);
! static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
! 				   XLogRecPtr *blockpos, uint32 timeline,
! 				   char *basedir, stream_stop_callback stream_stop,
! 				   char *partial_suffix, bool mark_done);
! static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
! 					XLogRecPtr blockpos, char *basedir, char *partial_suffix,
! 					  XLogRecPtr *stoppos, bool mark_done);
! static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
! 					uint32 timeline, char *basedir,
! 					stream_stop_callback stream_stop,
! 					char *partial_suffix, XLogRecPtr *stoppos,
! 					bool mark_done);
  static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
  							 int64 last_status);
  
--- 33,50 ----
  
  static bool still_sending = true;		/* feedback still needs to be sent? */
  
! static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
! 				 XLogRecPtr *stoppos);
  static int	CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int	CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
  					XLogRecPtr blockpos, int64 *last_status);
! static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
! 				   XLogRecPtr *blockpos);
! static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
! 					  XLogRecPtr blockpos, XLogRecPtr *stoppos);
! static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
! 					XLogRecPtr *stoppos);
  static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
  							 int64 last_status);
  
***************
*** 99,106 **** mark_file_as_archived(const char *basedir, const char *fname)
   * partial_suffix) is stored in current_walfile_name.
   */
  static bool
! open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
! 			 char *partial_suffix)
  {
  	int			f;
  	char		fn[MAXPGPATH];
--- 90,96 ----
   * partial_suffix) is stored in current_walfile_name.
   */
  static bool
! open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
  {
  	int			f;
  	char		fn[MAXPGPATH];
***************
*** 110,119 **** open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
  	XLogSegNo	segno;
  
  	XLByteToSeg(startpoint, segno);
! 	XLogFileName(current_walfile_name, timeline, segno);
  
! 	snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name,
! 			 partial_suffix ? partial_suffix : "");
  	f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
  	if (f == -1)
  	{
--- 100,109 ----
  	XLogSegNo	segno;
  
  	XLByteToSeg(startpoint, segno);
! 	XLogFileName(current_walfile_name, stream->timeline, segno);
  
! 	snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name,
! 			 stream->partial_suffix ? stream->partial_suffix : "");
  	f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
  	if (f == -1)
  	{
***************
*** 185,191 **** open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
   * and returns false, otherwise returns true.
   */
  static bool
! close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_done)
  {
  	off_t		currpos;
  
--- 175,181 ----
   * and returns false, otherwise returns true.
   */
  static bool
! close_walfile(StreamCtl *stream, XLogRecPtr pos)
  {
  	off_t		currpos;
  
***************
*** 220,232 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
  	/*
  	 * If we finished writing a .partial file, rename it into place.
  	 */
! 	if (currpos == XLOG_SEG_SIZE && partial_suffix)
  	{
  		char		oldfn[MAXPGPATH];
  		char		newfn[MAXPGPATH];
  
! 		snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix);
! 		snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name);
  		if (rename(oldfn, newfn) != 0)
  		{
  			fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
--- 210,222 ----
  	/*
  	 * If we finished writing a .partial file, rename it into place.
  	 */
! 	if (currpos == XLOG_SEG_SIZE && stream->partial_suffix)
  	{
  		char		oldfn[MAXPGPATH];
  		char		newfn[MAXPGPATH];
  
! 		snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix);
! 		snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name);
  		if (rename(oldfn, newfn) != 0)
  		{
  			fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
***************
*** 234,243 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
  			return false;
  		}
  	}
! 	else if (partial_suffix)
  		fprintf(stderr,
  				_("%s: not renaming \"%s%s\", segment is not complete\n"),
! 				progname, current_walfile_name, partial_suffix);
  
  	/*
  	 * Mark file as archived if requested by the caller - pg_basebackup needs
--- 224,233 ----
  			return false;
  		}
  	}
! 	else if (stream->partial_suffix)
  		fprintf(stderr,
  				_("%s: not renaming \"%s%s\", segment is not complete\n"),
! 				progname, current_walfile_name, stream->partial_suffix);
  
  	/*
  	 * Mark file as archived if requested by the caller - pg_basebackup needs
***************
*** 245,254 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
  	 * new node. This is in line with walreceiver.c always doing a
  	 * XLogArchiveForceDone() after a complete segment.
  	 */
! 	if (currpos == XLOG_SEG_SIZE && mark_done)
  	{
  		/* writes error message if failed */
! 		if (!mark_file_as_archived(basedir, current_walfile_name))
  			return false;
  	}
  
--- 235,244 ----
  	 * new node. This is in line with walreceiver.c always doing a
  	 * XLogArchiveForceDone() after a complete segment.
  	 */
! 	if (currpos == XLOG_SEG_SIZE && stream->mark_done)
  	{
  		/* writes error message if failed */
! 		if (!mark_file_as_archived(stream->basedir, current_walfile_name))
  			return false;
  	}
  
***************
*** 261,267 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
   * Check if a timeline history file exists.
   */
  static bool
! existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
  {
  	char		path[MAXPGPATH];
  	char		histfname[MAXFNAMELEN];
--- 251,257 ----
   * Check if a timeline history file exists.
   */
  static bool
! existsTimeLineHistoryFile(StreamCtl *stream)
  {
  	char		path[MAXPGPATH];
  	char		histfname[MAXFNAMELEN];
***************
*** 271,282 **** existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
  	 * Timeline 1 never has a history file. We treat that as if it existed,
  	 * since we never need to stream it.
  	 */
! 	if (tli == 1)
  		return true;
  
! 	TLHistoryFileName(histfname, tli);
  
! 	snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
  
  	fd = open(path, O_RDONLY | PG_BINARY, 0);
  	if (fd < 0)
--- 261,272 ----
  	 * Timeline 1 never has a history file. We treat that as if it existed,
  	 * since we never need to stream it.
  	 */
! 	if (stream->timeline == 1)
  		return true;
  
! 	TLHistoryFileName(histfname, stream->timeline);
  
! 	snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
  
  	fd = open(path, O_RDONLY | PG_BINARY, 0);
  	if (fd < 0)
***************
*** 294,301 **** existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
  }
  
  static bool
! writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
! 						 char *content, bool mark_done)
  {
  	int			size = strlen(content);
  	char		path[MAXPGPATH];
--- 284,290 ----
  }
  
  static bool
! writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
  {
  	int			size = strlen(content);
  	char		path[MAXPGPATH];
***************
*** 307,321 **** writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
  	 * Check that the server's idea of how timeline history files should be
  	 * named matches ours.
  	 */
! 	TLHistoryFileName(histfname, tli);
  	if (strcmp(histfname, filename) != 0)
  	{
  		fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
! 				progname, tli, filename);
  		return false;
  	}
  
! 	snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
  
  	/*
  	 * Write into a temp file name.
--- 296,310 ----
  	 * Check that the server's idea of how timeline history files should be
  	 * named matches ours.
  	 */
! 	TLHistoryFileName(histfname, stream->timeline);
  	if (strcmp(histfname, filename) != 0)
  	{
  		fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
! 				progname, stream->timeline, filename);
  		return false;
  	}
  
! 	snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
  
  	/*
  	 * Write into a temp file name.
***************
*** 375,384 **** writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
  	}
  
  	/* Maintain archive_status, check close_walfile() for details. */
! 	if (mark_done)
  	{
  		/* writes error message if failed */
! 		if (!mark_file_as_archived(basedir, histfname))
  			return false;
  	}
  
--- 364,373 ----
  	}
  
  	/* Maintain archive_status, check close_walfile() for details. */
! 	if (stream->mark_done)
  	{
  		/* writes error message if failed */
! 		if (!mark_file_as_archived(stream->basedir, histfname))
  			return false;
  	}
  
***************
*** 498,508 **** CheckServerVersionForStreaming(PGconn *conn)
   * Note: The log position *must* be at a log segment start!
   */
  bool
! ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
! 				  char *sysidentifier, char *basedir,
! 				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix,
! 				  bool synchronous, bool mark_done)
  {
  	char		query[128];
  	char		slotcmd[128];
--- 487,493 ----
   * Note: The log position *must* be at a log segment start!
   */
  bool
! ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
  {
  	char		query[128];
  	char		slotcmd[128];
***************
*** 539,545 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		slotcmd[0] = 0;
  	}
  
! 	if (sysidentifier != NULL)
  	{
  		/* Validate system identifier hasn't changed */
  		res = PQexec(conn, "IDENTIFY_SYSTEM");
--- 524,530 ----
  		slotcmd[0] = 0;
  	}
  
! 	if (stream->sysidentifier != NULL)
  	{
  		/* Validate system identifier hasn't changed */
  		res = PQexec(conn, "IDENTIFY_SYSTEM");
***************
*** 559,565 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			PQclear(res);
  			return false;
  		}
! 		if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0)
  		{
  			fprintf(stderr,
  					_("%s: system identifier does not match between base backup and streaming connection\n"),
--- 544,550 ----
  			PQclear(res);
  			return false;
  		}
! 		if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
  		{
  			fprintf(stderr,
  					_("%s: system identifier does not match between base backup and streaming connection\n"),
***************
*** 567,577 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			PQclear(res);
  			return false;
  		}
! 		if (timeline > atoi(PQgetvalue(res, 0, 1)))
  		{
  			fprintf(stderr,
  				_("%s: starting timeline %u is not present in the server\n"),
! 					progname, timeline);
  			PQclear(res);
  			return false;
  		}
--- 552,562 ----
  			PQclear(res);
  			return false;
  		}
! 		if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
  		{
  			fprintf(stderr,
  				_("%s: starting timeline %u is not present in the server\n"),
! 					progname, stream->timeline);
  			PQclear(res);
  			return false;
  		}
***************
*** 582,588 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  	 * initialize flush position to starting point, it's the caller's
  	 * responsibility that that's sane.
  	 */
! 	lastFlushPosition = startpos;
  
  	while (1)
  	{
--- 567,573 ----
  	 * initialize flush position to starting point, it's the caller's
  	 * responsibility that that's sane.
  	 */
! 	lastFlushPosition = stream->startpos;
  
  	while (1)
  	{
***************
*** 590,598 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		 * Fetch the timeline history file for this timeline, if we don't have
  		 * it already.
  		 */
! 		if (!existsTimeLineHistoryFile(basedir, timeline))
  		{
! 			snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline);
  			res = PQexec(conn, query);
  			if (PQresultStatus(res) != PGRES_TUPLES_OK)
  			{
--- 575,583 ----
  		 * Fetch the timeline history file for this timeline, if we don't have
  		 * it already.
  		 */
! 		if (!existsTimeLineHistoryFile(stream))
  		{
! 			snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
  			res = PQexec(conn, query);
  			if (PQresultStatus(res) != PGRES_TUPLES_OK)
  			{
***************
*** 615,624 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			}
  
  			/* Write the history file to disk */
! 			writeTimeLineHistoryFile(basedir, timeline,
  									 PQgetvalue(res, 0, 0),
! 									 PQgetvalue(res, 0, 1),
! 									 mark_done);
  
  			PQclear(res);
  		}
--- 600,608 ----
  			}
  
  			/* Write the history file to disk */
! 			writeTimeLineHistoryFile(stream,
  									 PQgetvalue(res, 0, 0),
! 									 PQgetvalue(res, 0, 1));
  
  			PQclear(res);
  		}
***************
*** 627,640 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		 * Before we start streaming from the requested location, check if the
  		 * callback tells us to stop here.
  		 */
! 		if (stream_stop(startpos, timeline, false))
  			return true;
  
  		/* Initiate the replication stream at specified location */
  		snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
  				 slotcmd,
! 				 (uint32) (startpos >> 32), (uint32) startpos,
! 				 timeline);
  		res = PQexec(conn, query);
  		if (PQresultStatus(res) != PGRES_COPY_BOTH)
  		{
--- 611,624 ----
  		 * Before we start streaming from the requested location, check if the
  		 * callback tells us to stop here.
  		 */
! 		if (stream->stream_stop(stream->startpos, stream->timeline, false))
  			return true;
  
  		/* Initiate the replication stream at specified location */
  		snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
  				 slotcmd,
! 				 (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
! 				 stream->timeline);
  		res = PQexec(conn, query);
  		if (PQresultStatus(res) != PGRES_COPY_BOTH)
  		{
***************
*** 646,654 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		PQclear(res);
  
  		/* Stream the WAL */
! 		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
! 							   standby_message_timeout, partial_suffix,
! 							   &stoppos, synchronous, mark_done);
  		if (res == NULL)
  			goto error;
  
--- 630,636 ----
  		PQclear(res);
  
  		/* Stream the WAL */
! 		res = HandleCopyStream(conn, stream, &stoppos);
  		if (res == NULL)
  			goto error;
  
***************
*** 676,701 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			uint32		newtimeline;
  			bool		parsed;
  
! 			parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline);
  			PQclear(res);
  			if (!parsed)
  				goto error;
  
  			/* Sanity check the values the server gave us */
! 			if (newtimeline <= timeline)
  			{
  				fprintf(stderr,
  						_("%s: server reported unexpected next timeline %u, following timeline %u\n"),
! 						progname, newtimeline, timeline);
  				goto error;
  			}
! 			if (startpos > stoppos)
  			{
  				fprintf(stderr,
  						_("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
  						progname,
! 						timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
! 				  newtimeline, (uint32) (startpos >> 32), (uint32) startpos);
  				goto error;
  			}
  
--- 658,683 ----
  			uint32		newtimeline;
  			bool		parsed;
  
! 			parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
  			PQclear(res);
  			if (!parsed)
  				goto error;
  
  			/* Sanity check the values the server gave us */
! 			if (newtimeline <= stream->timeline)
  			{
  				fprintf(stderr,
  						_("%s: server reported unexpected next timeline %u, following timeline %u\n"),
! 						progname, newtimeline, stream->timeline);
  				goto error;
  			}
! 			if (stream->startpos > stoppos)
  			{
  				fprintf(stderr,
  						_("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
  						progname,
! 				stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
! 						newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
  				goto error;
  			}
  
***************
*** 715,722 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			 * Loop back to start streaming from the new timeline. Always
  			 * start streaming at the beginning of a segment.
  			 */
! 			timeline = newtimeline;
! 			startpos = startpos - (startpos % XLOG_SEG_SIZE);
  			continue;
  		}
  		else if (PQresultStatus(res) == PGRES_COMMAND_OK)
--- 697,704 ----
  			 * Loop back to start streaming from the new timeline. Always
  			 * start streaming at the beginning of a segment.
  			 */
! 			stream->timeline = newtimeline;
! 			stream->startpos = stream->startpos - (stream->startpos % XLOG_SEG_SIZE);
  			continue;
  		}
  		else if (PQresultStatus(res) == PGRES_COMMAND_OK)
***************
*** 729,735 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			 * Check if the callback thinks it's OK to stop here. If not,
  			 * complain.
  			 */
! 			if (stream_stop(stoppos, timeline, false))
  				return true;
  			else
  			{
--- 711,717 ----
  			 * Check if the callback thinks it's OK to stop here. If not,
  			 * complain.
  			 */
! 			if (stream->stream_stop(stoppos, stream->timeline, false))
  				return true;
  			else
  			{
***************
*** 810,823 **** ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
   * On any other sort of error, returns NULL.
   */
  static PGresult *
! HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
! 				 char *basedir, stream_stop_callback stream_stop,
! 				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos, bool synchronous, bool mark_done)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
! 	XLogRecPtr	blockpos = startpos;
  
  	still_sending = true;
  
--- 792,803 ----
   * On any other sort of error, returns NULL.
   */
  static PGresult *
! HandleCopyStream(PGconn *conn, StreamCtl *stream,
! 				 XLogRecPtr *stoppos)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
! 	XLogRecPtr	blockpos = stream->startpos;
  
  	still_sending = true;
  
***************
*** 830,838 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		/*
  		 * Check if we should continue streaming, or abort at this point.
  		 */
! 		if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
! 								 stream_stop, partial_suffix, stoppos,
! 								 mark_done))
  			goto error;
  
  		now = feGetCurrentTimestamp();
--- 810,816 ----
  		/*
  		 * Check if we should continue streaming, or abort at this point.
  		 */
! 		if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
  			goto error;
  
  		now = feGetCurrentTimestamp();
***************
*** 841,847 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		 * If synchronous option is true, issue sync command as soon as there
  		 * are WAL data which has not been flushed yet.
  		 */
! 		if (synchronous && lastFlushPosition < blockpos && walfile != -1)
  		{
  			if (fsync(walfile) != 0)
  			{
--- 819,825 ----
  		 * If synchronous option is true, issue sync command as soon as there
  		 * are WAL data which has not been flushed yet.
  		 */
! 		if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1)
  		{
  			if (fsync(walfile) != 0)
  			{
***************
*** 863,871 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		/*
  		 * Potentially send a status message to the master
  		 */
! 		if (still_sending && standby_message_timeout > 0 &&
  			feTimestampDifferenceExceeds(last_status, now,
! 										 standby_message_timeout))
  		{
  			/* Time to send feedback! */
  			if (!sendFeedback(conn, blockpos, now, false))
--- 841,849 ----
  		/*
  		 * Potentially send a status message to the master
  		 */
! 		if (still_sending && stream->standby_message_timeout > 0 &&
  			feTimestampDifferenceExceeds(last_status, now,
! 										 stream->standby_message_timeout))
  		{
  			/* Time to send feedback! */
  			if (!sendFeedback(conn, blockpos, now, false))
***************
*** 876,882 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		/*
  		 * Calculate how long send/receive loops should sleep
  		 */
! 		sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
  												 last_status);
  
  		r = CopyStreamReceive(conn, sleeptime, &copybuf);
--- 854,860 ----
  		/*
  		 * Calculate how long send/receive loops should sleep
  		 */
! 		sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
  												 last_status);
  
  		r = CopyStreamReceive(conn, sleeptime, &copybuf);
***************
*** 886,894 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				goto error;
  			if (r == -2)
  			{
! 				PGresult   *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
! 													 basedir, partial_suffix,
! 														stoppos, mark_done);
  
  				if (res == NULL)
  					goto error;
--- 864,870 ----
  				goto error;
  			if (r == -2)
  			{
! 				PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
  
  				if (res == NULL)
  					goto error;
***************
*** 905,922 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			}
  			else if (copybuf[0] == 'w')
  			{
! 				if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
! 										timeline, basedir, stream_stop,
! 										partial_suffix, mark_done))
  					goto error;
  
  				/*
  				 * Check if we should continue streaming, or abort at this
  				 * point.
  				 */
! 				if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
! 										 stream_stop, partial_suffix, stoppos,
! 										 mark_done))
  					goto error;
  			}
  			else
--- 881,894 ----
  			}
  			else if (copybuf[0] == 'w')
  			{
! 				if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
  					goto error;
  
  				/*
  				 * Check if we should continue streaming, or abort at this
  				 * point.
  				 */
! 				if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
  					goto error;
  			}
  			else
***************
*** 1113,1122 **** ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
   * Process XLogData message.
   */
  static bool
! ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
! 				   XLogRecPtr *blockpos, uint32 timeline,
! 				   char *basedir, stream_stop_callback stream_stop,
! 				   char *partial_suffix, bool mark_done)
  {
  	int			xlogoff;
  	int			bytes_left;
--- 1085,1092 ----
   * Process XLogData message.
   */
  static bool
! ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
! 				   XLogRecPtr *blockpos)
  {
  	int			xlogoff;
  	int			bytes_left;
***************
*** 1196,1203 **** ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
  
  		if (walfile == -1)
  		{
! 			if (!open_walfile(*blockpos, timeline,
! 							  basedir, partial_suffix))
  			{
  				/* Error logged by open_walfile */
  				return false;
--- 1166,1172 ----
  
  		if (walfile == -1)
  		{
! 			if (!open_walfile(stream, *blockpos))
  			{
  				/* Error logged by open_walfile */
  				return false;
***************
*** 1224,1236 **** ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
  		/* Did we reach the end of a WAL segment? */
  		if (*blockpos % XLOG_SEG_SIZE == 0)
  		{
! 			if (!close_walfile(basedir, partial_suffix, *blockpos, mark_done))
  				/* Error message written in close_walfile() */
  				return false;
  
  			xlogoff = 0;
  
! 			if (still_sending && stream_stop(*blockpos, timeline, true))
  			{
  				if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
  				{
--- 1193,1205 ----
  		/* Did we reach the end of a WAL segment? */
  		if (*blockpos % XLOG_SEG_SIZE == 0)
  		{
! 			if (!close_walfile(stream, *blockpos))
  				/* Error message written in close_walfile() */
  				return false;
  
  			xlogoff = 0;
  
! 			if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
  			{
  				if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
  				{
***************
*** 1252,1260 **** ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
   * Handle end of the copy stream.
   */
  static PGresult *
! HandleEndOfCopyStream(PGconn *conn, char *copybuf,
! 					XLogRecPtr blockpos, char *basedir, char *partial_suffix,
! 					  XLogRecPtr *stoppos, bool mark_done)
  {
  	PGresult   *res = PQgetResult(conn);
  
--- 1221,1228 ----
   * Handle end of the copy stream.
   */
  static PGresult *
! HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
! 					  XLogRecPtr blockpos, XLogRecPtr *stoppos)
  {
  	PGresult   *res = PQgetResult(conn);
  
***************
*** 1265,1271 **** HandleEndOfCopyStream(PGconn *conn, char *copybuf,
  	 */
  	if (still_sending)
  	{
! 		if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
  		{
  			/* Error message written in close_walfile() */
  			PQclear(res);
--- 1233,1239 ----
  	 */
  	if (still_sending)
  	{
! 		if (!close_walfile(stream, blockpos))
  		{
  			/* Error message written in close_walfile() */
  			PQclear(res);
***************
*** 1295,1307 **** HandleEndOfCopyStream(PGconn *conn, char *copybuf,
   * Check if we should continue streaming, or abort at this point.
   */
  static bool
! CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
! 					char *basedir, stream_stop_callback stream_stop,
! 					char *partial_suffix, XLogRecPtr *stoppos, bool mark_done)
  {
! 	if (still_sending && stream_stop(blockpos, timeline, false))
  	{
! 		if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
  		{
  			/* Potential error message is written by close_walfile */
  			return false;
--- 1263,1274 ----
   * Check if we should continue streaming, or abort at this point.
   */
  static bool
! CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
! 					XLogRecPtr *stoppos)
  {
! 	if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
  	{
! 		if (!close_walfile(stream, blockpos))
  		{
  			/* Potential error message is written by close_walfile */
  			return false;
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 22,37 ****
   */
  typedef bool (*stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
  
  extern bool CheckServerVersionForStreaming(PGconn *conn);
  extern bool ReceiveXlogStream(PGconn *conn,
! 				  XLogRecPtr startpos,
! 				  uint32 timeline,
! 				  char *sysidentifier,
! 				  char *basedir,
! 				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout,
! 				  char *partial_suffix,
! 				  bool synchronous,
! 				  bool mark_done);
  
  #endif   /* RECEIVELOG_H */
--- 22,49 ----
   */
  typedef bool (*stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
  
+ /*
+  * Global parameters when receiving xlog stream
+  */
+ typedef struct
+ {
+ 	XLogRecPtr	startpos;
+ 	TimeLineID	timeline;
+ 	char	   *sysidentifier;
+ 	int			standby_message_timeout;
+ 	bool		synchronous;
+ 	bool		mark_done;
+ 
+ 	stream_stop_callback stream_stop;
+ 
+ 	char	   *basedir;
+ 	char	   *partial_suffix;
+ } StreamCtl;
+ 
+ 
+ 
  extern bool CheckServerVersionForStreaming(PGconn *conn);
  extern bool ReceiveXlogStream(PGconn *conn,
! 				  StreamCtl *stream);
  
  #endif   /* RECEIVELOG_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to