> Moved to next CF with "needs review" state.

Here's an updated series. It's on top of the entry
https://commitfest.postgresql.org/12/883/ for PostgresNode TAP test
enhancements.

It corresponds exactly to patches [2,3,4] in the logical decoding on
standby post at
https://www.postgresql.org/message-id/CAMsr+YEzC=-+eV09A=ra150fjtkmtqt5q70piqbwytbor3c...@mail.gmail.com

If this is committed, the remaining decoding on standby patches will
just be the meat of the feature.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From b7458118d98204d4b44f0d3e2953a117f1ed876a Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Thu, 1 Sep 2016 12:37:40 +0800
Subject: [PATCH 1/3] Add an optional --endpos LSN argument to pg_recvlogical
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

pg_recvlogical usually just runs until cancelled or until the upstream
server disconnects. For some purposes, especially testing, it's useful
to have the ability to stop receive at a specified LSN without having
to parse the output and deal with buffering issues, etc.

Add a --endpos parameter that takes the LSN at which no further
messages should be written and receive should stop.

Craig Ringer, Álvaro Herrera
---
 doc/src/sgml/ref/pg_recvlogical.sgml   |  34 ++++++++
 src/bin/pg_basebackup/pg_recvlogical.c | 145 +++++++++++++++++++++++++++++----
 2 files changed, 164 insertions(+), 15 deletions(-)

diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml
index b35881f..d066ce8 100644
--- a/doc/src/sgml/ref/pg_recvlogical.sgml
+++ b/doc/src/sgml/ref/pg_recvlogical.sgml
@@ -38,6 +38,14 @@ PostgreSQL documentation
    constraints as <xref linkend="app-pgreceivexlog">, plus those for logical
    replication (see <xref linkend="logicaldecoding">).
   </para>
+
+  <para>
+   <command>pg_recvlogical</> has no equivalent to the logical decoding
+   SQL interface's peek and get modes. It sends replay confirmations for
+   data lazily as it receives it and on clean exit. To examine pending data on
+    a slot without consuming it, use
+   <link linkend="functions-replication"><function>pg_logical_slot_peek_changes</></>.
+  </para>
  </refsect1>
 
  <refsect1>
@@ -155,6 +163,32 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
+      <term><option>-E <replaceable>lsn</replaceable></option></term>
+      <term><option>--endpos=<replaceable>lsn</replaceable></option></term>
+      <listitem>
+       <para>
+        In <option>--start</option> mode, automatically stop replication
+        and exit with normal exit status 0 when receiving reaches the
+        specified LSN.  If specified when not in <option>--start</option>
+        mode, an error is raised.
+       </para>
+
+       <para>
+        If there's a record with LSN exactly equal to <replaceable>lsn</>,
+        the record will be output.
+       </para>
+
+       <para>
+        The <option>--endpos</option> option is not aware of transaction
+        boundaries and may truncate output partway through a transaction.
+        Any partially output transaction will not be consumed and will be
+        replayed again when the slot is next read from. Individual messages
+        are never truncated.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>--if-not-exists</option></term>
       <listitem>
        <para>
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index cb5f989..4e6a8c2 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -40,6 +40,7 @@ static int	noloop = 0;
 static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
 static int	fsync_interval = 10 * 1000; /* 10 sec = default */
 static XLogRecPtr startpos = InvalidXLogRecPtr;
+static XLogRecPtr endpos = InvalidXLogRecPtr;
 static bool do_create_slot = false;
 static bool slot_exists_ok = false;
 static bool do_start_slot = false;
@@ -63,6 +64,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
 static void usage(void);
 static void StreamLogicalLog(void);
 static void disconnect_and_exit(int code);
+static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
+static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
+				   bool keepalive, XLogRecPtr lsn);
 
 static void
 usage(void)
@@ -81,6 +85,7 @@ usage(void)
 			 "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
 	printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
 	printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
+	printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
 	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
 	printf(_("  -o, --option=NAME[=VALUE]\n"
 			 "                         pass option NAME with optional value VALUE to the\n"
@@ -281,6 +286,7 @@ StreamLogicalLog(void)
 		int			bytes_written;
 		int64		now;
 		int			hdr_len;
+		XLogRecPtr	cur_record_lsn = InvalidXLogRecPtr;
 
 		if (copybuf != NULL)
 		{
@@ -454,6 +460,7 @@ StreamLogicalLog(void)
 			int			pos;
 			bool		replyRequested;
 			XLogRecPtr	walEnd;
+			bool		endposReached = false;
 
 			/*
 			 * Parse the keepalive message, enclosed in the CopyData message.
@@ -476,18 +483,32 @@ StreamLogicalLog(void)
 			}
 			replyRequested = copybuf[pos];
 
-			/* If the server requested an immediate reply, send one. */
-			if (replyRequested)
+			if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
 			{
-				/* fsync data, so we send a recent flush pointer */
-				if (!OutputFsync(now))
-					goto error;
+				/*
+				 * If there's nothing to read on the socket until a keepalive
+				 * we know that the server has nothing to send us; and if
+				 * walEnd has passed endpos, we know nothing else can have
+				 * committed before endpos.  So we can bail out now.
+				 */
+				endposReached = true;
+			}
 
-				now = feGetCurrentTimestamp();
-				if (!sendFeedback(conn, now, true, false))
+			/* Send a reply, if necessary */
+			if (replyRequested || endposReached)
+			{
+				if (!flushAndSendFeedback(conn, &now))
 					goto error;
 				last_status = now;
 			}
+
+			if (endposReached)
+			{
+				prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
+				time_to_abort = true;
+				break;
+			}
+
 			continue;
 		}
 		else if (copybuf[0] != 'w')
@@ -497,7 +518,6 @@ StreamLogicalLog(void)
 			goto error;
 		}
 
-
 		/*
 		 * Read the header of the XLogData message, enclosed in the CopyData
 		 * message. We only need the WAL location field (dataStart), the rest
@@ -515,12 +535,23 @@ StreamLogicalLog(void)
 		}
 
 		/* Extract WAL location for this block */
-		{
-			XLogRecPtr	temp = fe_recvint64(&copybuf[1]);
+		cur_record_lsn = fe_recvint64(&copybuf[1]);
 
-			output_written_lsn = Max(temp, output_written_lsn);
+		if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
+		{
+			/*
+			 * We've read past our endpoint, so prepare to go away being
+			 * cautious about what happens to our output data.
+			 */
+			if (!flushAndSendFeedback(conn, &now))
+				goto error;
+			prepareToTerminate(conn, endpos, false, cur_record_lsn);
+			time_to_abort = true;
+			break;
 		}
 
+		output_written_lsn = Max(cur_record_lsn, output_written_lsn);
+
 		bytes_left = r - hdr_len;
 		bytes_written = 0;
 
@@ -557,10 +588,29 @@ StreamLogicalLog(void)
 					strerror(errno));
 			goto error;
 		}
+
+		if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
+		{
+			/* endpos was exactly the record we just processed, we're done */
+			if (!flushAndSendFeedback(conn, &now))
+				goto error;
+			prepareToTerminate(conn, endpos, false, cur_record_lsn);
+			time_to_abort = true;
+			break;
+		}
 	}
 
 	res = PQgetResult(conn);
-	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	if (PQresultStatus(res) == PGRES_COPY_OUT)
+	{
+		/*
+		 * We're doing a client-initiated clean exit and have sent CopyDone to
+		 * the server. We've already sent replay confirmation and fsync'd so
+		 * we can just clean up the connection now.
+		 */
+		goto error;
+	}
+	else if (PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
 		fprintf(stderr,
 				_("%s: unexpected termination of replication stream: %s"),
@@ -638,6 +688,7 @@ main(int argc, char **argv)
 		{"password", no_argument, NULL, 'W'},
 /* replication options */
 		{"startpos", required_argument, NULL, 'I'},
+		{"endpos", required_argument, NULL, 'E'},
 		{"option", required_argument, NULL, 'o'},
 		{"plugin", required_argument, NULL, 'P'},
 		{"status-interval", required_argument, NULL, 's'},
@@ -673,7 +724,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:o:P:s:S:",
+	while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:E:o:P:s:S:",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -733,6 +784,16 @@ main(int argc, char **argv)
 				}
 				startpos = ((uint64) hi) << 32 | lo;
 				break;
+			case 'E':
+				if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
+				{
+					fprintf(stderr,
+							_("%s: could not parse end position \"%s\"\n"),
+							progname, optarg);
+					exit(1);
+				}
+				endpos = ((uint64) hi) << 32 | lo;
+				break;
 			case 'o':
 				{
 					char	   *data = pg_strdup(optarg);
@@ -857,6 +918,16 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+	if (endpos != InvalidXLogRecPtr && !do_start_slot)
+	{
+		fprintf(stderr,
+				_("%s: --endpos may only be specified with --start\n"),
+				progname);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
 #ifndef WIN32
 	pqsignal(SIGINT, sigint_handler);
 	pqsignal(SIGHUP, sighup_handler);
@@ -923,8 +994,8 @@ main(int argc, char **argv)
 		if (time_to_abort)
 		{
 			/*
-			 * We've been Ctrl-C'ed. That's not an error, so exit without an
-			 * errorcode.
+			 * We've been Ctrl-C'ed or reached an exit limit condition. That's
+			 * not an error, so exit without an errorcode.
 			 */
 			disconnect_and_exit(0);
 		}
@@ -943,3 +1014,47 @@ main(int argc, char **argv)
 		}
 	}
 }
+
+/*
+ * Fsync our output data, and send a feedback message to the server.  Returns
+ * true if successful, false otherwise.
+ *
+ * If successful, *now is updated to the current timestamp just before sending
+ * feedback.
+ */
+static bool
+flushAndSendFeedback(PGconn *conn, TimestampTz *now)
+{
+	/* flush data to disk, so that we send a recent flush pointer */
+	if (!OutputFsync(*now))
+		return false;
+	*now = feGetCurrentTimestamp();
+	if (!sendFeedback(conn, *now, true, false))
+		return false;
+
+	return true;
+}
+
+/*
+ * Try to inform the server about of upcoming demise, but don't wait around or
+ * retry on failure.
+ */
+static void
+prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
+{
+	(void) PQputCopyEnd(conn, NULL);
+	(void) PQflush(conn);
+
+	if (verbose)
+	{
+		if (keepalive)
+			fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n",
+					progname,
+					(uint32) (endpos >> 32), (uint32) endpos);
+		else
+			fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n",
+					progname, (uint32) (endpos >> 32), (uint32) (endpos),
+					(uint32) (lsn >> 32), (uint32) lsn);
+
+	}
+}
-- 
2.5.5

From 4a58bbc76031b966cce8b66227ffdc6785b08e33 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 3 Jan 2017 18:21:48 +0800
Subject: [PATCH 2/3] Add some minimal tests for pg_recvlogical

---
 src/bin/pg_basebackup/Makefile                |  2 ++
 src/bin/pg_basebackup/t/030_pg_recvlogical.pl | 46 +++++++++++++++++++++++++++
 2 files changed, 48 insertions(+)
 create mode 100644 src/bin/pg_basebackup/t/030_pg_recvlogical.pl

diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index 52ac9e9..1e54b19 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -12,6 +12,8 @@
 PGFILEDESC = "pg_basebackup/pg_receivexlog/pg_recvlogical - streaming WAL and backup receivers"
 PGAPPICON=win32
 
+EXTRA_INSTALL=contrib/test_decoding
+
 subdir = src/bin/pg_basebackup
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
diff --git a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
new file mode 100644
index 0000000..dca5ef2
--- /dev/null
+++ b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
@@ -0,0 +1,46 @@
+use strict;
+use warnings;
+use TestLib;
+use PostgresNode;
+use Test::More tests => 15;
+
+program_help_ok('pg_recvlogical');
+program_version_ok('pg_recvlogical');
+program_options_handling_ok('pg_recvlogical');
+
+my $node = get_new_node('main');
+
+# Initialize node without replication settings
+$node->init(allows_streaming => 1, has_archiving => 1);
+$node->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug1'
+log_error_verbosity = verbose
+});
+$node->dump_info;
+$node->start;
+
+$node->command_fails(['pg_recvlogical'],
+	'pg_recvlogical needs a slot name');
+$node->command_fails(['pg_recvlogical', '-S', 'test'],
+	'pg_recvlogical needs a database');
+$node->command_fails(['pg_recvlogical', '-S', 'test', '-d', 'postgres'],
+	'pg_recvlogical needs an action');
+$node->command_fails(['pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'), '--start'],
+	'no destionation file');
+
+$node->command_ok(['pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'), '--create-slot'],
+	'slot created');
+
+my $slot = $node->slot('test');
+isnt($slot->{'restart_lsn'}, '', 'restart lsn is defined for new slot');
+
+$node->psql('postgres', 'CREATE TABLE test_table(x integer)');
+$node->psql('postgres', 'INSERT INTO test_table(x) SELECT y FROM generate_series(1, 10) a(y);');
+my $nextlsn = $node->safe_psql('postgres', 'SELECT pg_current_xlog_insert_location()');
+chomp($nextlsn);
+
+$node->command_ok(['pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'), '--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'],
+	'replayed a transaction');
-- 
2.5.5

From d3b3ef86b5a91cea404c555d7a029bf2b21fb4f4 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 15 Nov 2016 16:06:16 +0800
Subject: [PATCH 3/3] Add a pg_recvlogical wrapper to PostgresNode

---
 src/test/perl/PostgresNode.pm               | 75 ++++++++++++++++++++++++++++-
 src/test/recovery/t/006_logical_decoding.pl | 31 +++++++++++-
 2 files changed, 104 insertions(+), 2 deletions(-)

diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 2f009d4..5197e80 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -1124,7 +1124,7 @@ sub psql
 			# IPC::Run::run threw an exception. re-throw unless it's a
 			# timeout, which we'll handle by testing is_expired
 			die $exc_save
-			  if (blessed($exc_save) || $exc_save ne $timeout_exception);
+			  if (blessed($exc_save) || $exc_save !~ qr/$timeout_exception/);
 
 			$ret = undef;
 
@@ -1493,6 +1493,79 @@ sub slot
 	return $self->query_hash('postgres', "SELECT __COLUMNS__ FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'", @columns);
 }
 
+=pod $node->pg_recvlogical_upto(self, dbname, slot_name, endpos, timeout_secs, ...)
+
+Invoke pg_recvlogical to read from slot_name on dbname until LSN endpos, which
+corresponds to pg_recvlogical --endpos.  Gives up after timeout (if nonzero).
+
+Disallows pg_recvlogial from internally retrying on error by passing --no-loop.
+
+Plugin options are passed as additional keyword arguments.
+
+If called in scalar context, returns stdout, and die()s on timeout or nonzero return.
+
+If called in array context, returns a tuple of (retval, stdout, stderr, timeout).
+timeout is the IPC::Run::Timeout object whose is_expired method can be tested
+to check for timeout. retval is undef on timeout.
+
+=cut
+
+sub pg_recvlogical_upto
+{
+	my ($self, $dbname, $slot_name, $endpos, $timeout_secs, %plugin_options) = @_;
+	my ($stdout, $stderr);
+
+	my $timeout_exception = 'pg_recvlogical timed out';
+
+	my @cmd = ('pg_recvlogical', '-S', $slot_name, '--dbname', $self->connstr($dbname));
+	push @cmd, '--endpos', $endpos if ($endpos);
+	push @cmd, '-f', '-', '--no-loop', '--start';
+
+	while (my ($k, $v) = each %plugin_options)
+	{
+		die "= is not permitted to appear in replication option name" if ($k =~ qr/=/);
+		push @cmd, "-o", "$k=$v";
+	}
+
+	my $timeout;
+	$timeout = IPC::Run::timeout($timeout_secs, exception => $timeout_exception ) if $timeout_secs;
+	my $ret = 0;
+
+	do {
+		local $@;
+		eval {
+			IPC::Run::run(\@cmd, ">", \$stdout, "2>", \$stderr, $timeout);
+			$ret = $?;
+		};
+		my $exc_save = $@;
+		if ($exc_save)
+		{
+			# IPC::Run::run threw an exception. re-throw unless it's a
+			# timeout, which we'll handle by testing is_expired
+			die $exc_save
+			  if (blessed($exc_save) || $exc_save !~ qr/$timeout_exception/);
+
+			$ret = undef;
+
+			die "Got timeout exception '$exc_save' but timer not expired?!"
+			  unless $timeout->is_expired;
+
+			die "$exc_save waiting for endpos $endpos with stdout '$stdout', stderr '$stderr'"
+				unless wantarray;
+		}
+	};
+
+	if (wantarray)
+	{
+		return ($ret, $stdout, $stderr, $timeout);
+	}
+	else
+	{
+		die "pg_recvlogical exited with code '$ret', stdout '$stdout' and stderr '$stderr'" if $ret;
+		return $stdout;
+	}
+}
+
 =pod
 
 =back
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index b80a9a9..d8cc8d3 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -1,9 +1,13 @@
 # Testing of logical decoding using SQL interface and/or pg_recvlogical
+#
+# Most logical decoding tests are in contrib/test_decoding. This module
+# is for work that doesn't fit well there, like where server restarts
+# are required.
 use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 2;
+use Test::More tests => 5;
 
 # Initialize master node
 my $node_master = get_new_node('master');
@@ -36,5 +40,30 @@ $result = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_chan
 chomp($result);
 is($result, '', 'Decoding after fast restart repeats no rows');
 
+# Insert some rows and verify that we get the same results from pg_recvlogical
+# and the SQL interface.
+$node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]);
+
+my $expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT};
+
+my $stdout_sql = $node_master->safe_psql('postgres', qq[SELECT data FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]);
+is($stdout_sql, $expected, 'got expected output from SQL decoding session');
+
+my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;");
+diag "waiting to replay $endpos";
+
+my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, $expected, 'got same expected output from pg_recvlogical decoding session');
+
+$stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot');
+
 # done with the node
 $node_master->stop;
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to