> Moved to next CF with "needs review" state. Here's an updated series. It's on top of the entry https://commitfest.postgresql.org/12/883/ for PostgresNode TAP test enhancements.
It corresponds exactly to patches [2,3,4] in the logical decoding on standby post at https://www.postgresql.org/message-id/CAMsr+YEzC=-+eV09A=ra150fjtkmtqt5q70piqbwytbor3c...@mail.gmail.com If this is committed, the remaining decoding on standby patches will just be the meat of the feature. -- Craig Ringer http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From b7458118d98204d4b44f0d3e2953a117f1ed876a Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Thu, 1 Sep 2016 12:37:40 +0800 Subject: [PATCH 1/3] Add an optional --endpos LSN argument to pg_recvlogical MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit pg_recvlogical usually just runs until cancelled or until the upstream server disconnects. For some purposes, especially testing, it's useful to have the ability to stop receive at a specified LSN without having to parse the output and deal with buffering issues, etc. Add a --endpos parameter that takes the LSN at which no further messages should be written and receive should stop. Craig Ringer, Álvaro Herrera --- doc/src/sgml/ref/pg_recvlogical.sgml | 34 ++++++++ src/bin/pg_basebackup/pg_recvlogical.c | 145 +++++++++++++++++++++++++++++---- 2 files changed, 164 insertions(+), 15 deletions(-) diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml index b35881f..d066ce8 100644 --- a/doc/src/sgml/ref/pg_recvlogical.sgml +++ b/doc/src/sgml/ref/pg_recvlogical.sgml @@ -38,6 +38,14 @@ PostgreSQL documentation constraints as <xref linkend="app-pgreceivexlog">, plus those for logical replication (see <xref linkend="logicaldecoding">). </para> + + <para> + <command>pg_recvlogical</> has no equivalent to the logical decoding + SQL interface's peek and get modes. It sends replay confirmations for + data lazily as it receives it and on clean exit. To examine pending data on + a slot without consuming it, use + <link linkend="functions-replication"><function>pg_logical_slot_peek_changes</></>. + </para> </refsect1> <refsect1> @@ -155,6 +163,32 @@ PostgreSQL documentation </varlistentry> <varlistentry> + <term><option>-E <replaceable>lsn</replaceable></option></term> + <term><option>--endpos=<replaceable>lsn</replaceable></option></term> + <listitem> + <para> + In <option>--start</option> mode, automatically stop replication + and exit with normal exit status 0 when receiving reaches the + specified LSN. If specified when not in <option>--start</option> + mode, an error is raised. + </para> + + <para> + If there's a record with LSN exactly equal to <replaceable>lsn</>, + the record will be output. + </para> + + <para> + The <option>--endpos</option> option is not aware of transaction + boundaries and may truncate output partway through a transaction. + Any partially output transaction will not be consumed and will be + replayed again when the slot is next read from. Individual messages + are never truncated. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><option>--if-not-exists</option></term> <listitem> <para> diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index cb5f989..4e6a8c2 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -40,6 +40,7 @@ static int noloop = 0; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static int fsync_interval = 10 * 1000; /* 10 sec = default */ static XLogRecPtr startpos = InvalidXLogRecPtr; +static XLogRecPtr endpos = InvalidXLogRecPtr; static bool do_create_slot = false; static bool slot_exists_ok = false; static bool do_start_slot = false; @@ -63,6 +64,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr; static void usage(void); static void StreamLogicalLog(void); static void disconnect_and_exit(int code); +static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now); +static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos, + bool keepalive, XLogRecPtr lsn); static void usage(void) @@ -81,6 +85,7 @@ usage(void) " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000)); printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n")); printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n")); + printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n")); printf(_(" -n, --no-loop do not loop on connection lost\n")); printf(_(" -o, --option=NAME[=VALUE]\n" " pass option NAME with optional value VALUE to the\n" @@ -281,6 +286,7 @@ StreamLogicalLog(void) int bytes_written; int64 now; int hdr_len; + XLogRecPtr cur_record_lsn = InvalidXLogRecPtr; if (copybuf != NULL) { @@ -454,6 +460,7 @@ StreamLogicalLog(void) int pos; bool replyRequested; XLogRecPtr walEnd; + bool endposReached = false; /* * Parse the keepalive message, enclosed in the CopyData message. @@ -476,18 +483,32 @@ StreamLogicalLog(void) } replyRequested = copybuf[pos]; - /* If the server requested an immediate reply, send one. */ - if (replyRequested) + if (endpos != InvalidXLogRecPtr && walEnd >= endpos) { - /* fsync data, so we send a recent flush pointer */ - if (!OutputFsync(now)) - goto error; + /* + * If there's nothing to read on the socket until a keepalive + * we know that the server has nothing to send us; and if + * walEnd has passed endpos, we know nothing else can have + * committed before endpos. So we can bail out now. + */ + endposReached = true; + } - now = feGetCurrentTimestamp(); - if (!sendFeedback(conn, now, true, false)) + /* Send a reply, if necessary */ + if (replyRequested || endposReached) + { + if (!flushAndSendFeedback(conn, &now)) goto error; last_status = now; } + + if (endposReached) + { + prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr); + time_to_abort = true; + break; + } + continue; } else if (copybuf[0] != 'w') @@ -497,7 +518,6 @@ StreamLogicalLog(void) goto error; } - /* * Read the header of the XLogData message, enclosed in the CopyData * message. We only need the WAL location field (dataStart), the rest @@ -515,12 +535,23 @@ StreamLogicalLog(void) } /* Extract WAL location for this block */ - { - XLogRecPtr temp = fe_recvint64(©buf[1]); + cur_record_lsn = fe_recvint64(©buf[1]); - output_written_lsn = Max(temp, output_written_lsn); + if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos) + { + /* + * We've read past our endpoint, so prepare to go away being + * cautious about what happens to our output data. + */ + if (!flushAndSendFeedback(conn, &now)) + goto error; + prepareToTerminate(conn, endpos, false, cur_record_lsn); + time_to_abort = true; + break; } + output_written_lsn = Max(cur_record_lsn, output_written_lsn); + bytes_left = r - hdr_len; bytes_written = 0; @@ -557,10 +588,29 @@ StreamLogicalLog(void) strerror(errno)); goto error; } + + if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos) + { + /* endpos was exactly the record we just processed, we're done */ + if (!flushAndSendFeedback(conn, &now)) + goto error; + prepareToTerminate(conn, endpos, false, cur_record_lsn); + time_to_abort = true; + break; + } } res = PQgetResult(conn); - if (PQresultStatus(res) != PGRES_COMMAND_OK) + if (PQresultStatus(res) == PGRES_COPY_OUT) + { + /* + * We're doing a client-initiated clean exit and have sent CopyDone to + * the server. We've already sent replay confirmation and fsync'd so + * we can just clean up the connection now. + */ + goto error; + } + else if (PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, _("%s: unexpected termination of replication stream: %s"), @@ -638,6 +688,7 @@ main(int argc, char **argv) {"password", no_argument, NULL, 'W'}, /* replication options */ {"startpos", required_argument, NULL, 'I'}, + {"endpos", required_argument, NULL, 'E'}, {"option", required_argument, NULL, 'o'}, {"plugin", required_argument, NULL, 'P'}, {"status-interval", required_argument, NULL, 's'}, @@ -673,7 +724,7 @@ main(int argc, char **argv) } } - while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:o:P:s:S:", + while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:E:o:P:s:S:", long_options, &option_index)) != -1) { switch (c) @@ -733,6 +784,16 @@ main(int argc, char **argv) } startpos = ((uint64) hi) << 32 | lo; break; + case 'E': + if (sscanf(optarg, "%X/%X", &hi, &lo) != 2) + { + fprintf(stderr, + _("%s: could not parse end position \"%s\"\n"), + progname, optarg); + exit(1); + } + endpos = ((uint64) hi) << 32 | lo; + break; case 'o': { char *data = pg_strdup(optarg); @@ -857,6 +918,16 @@ main(int argc, char **argv) exit(1); } + if (endpos != InvalidXLogRecPtr && !do_start_slot) + { + fprintf(stderr, + _("%s: --endpos may only be specified with --start\n"), + progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + #ifndef WIN32 pqsignal(SIGINT, sigint_handler); pqsignal(SIGHUP, sighup_handler); @@ -923,8 +994,8 @@ main(int argc, char **argv) if (time_to_abort) { /* - * We've been Ctrl-C'ed. That's not an error, so exit without an - * errorcode. + * We've been Ctrl-C'ed or reached an exit limit condition. That's + * not an error, so exit without an errorcode. */ disconnect_and_exit(0); } @@ -943,3 +1014,47 @@ main(int argc, char **argv) } } } + +/* + * Fsync our output data, and send a feedback message to the server. Returns + * true if successful, false otherwise. + * + * If successful, *now is updated to the current timestamp just before sending + * feedback. + */ +static bool +flushAndSendFeedback(PGconn *conn, TimestampTz *now) +{ + /* flush data to disk, so that we send a recent flush pointer */ + if (!OutputFsync(*now)) + return false; + *now = feGetCurrentTimestamp(); + if (!sendFeedback(conn, *now, true, false)) + return false; + + return true; +} + +/* + * Try to inform the server about of upcoming demise, but don't wait around or + * retry on failure. + */ +static void +prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn) +{ + (void) PQputCopyEnd(conn, NULL); + (void) PQflush(conn); + + if (verbose) + { + if (keepalive) + fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n", + progname, + (uint32) (endpos >> 32), (uint32) endpos); + else + fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n", + progname, (uint32) (endpos >> 32), (uint32) (endpos), + (uint32) (lsn >> 32), (uint32) lsn); + + } +} -- 2.5.5
From 4a58bbc76031b966cce8b66227ffdc6785b08e33 Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Tue, 3 Jan 2017 18:21:48 +0800 Subject: [PATCH 2/3] Add some minimal tests for pg_recvlogical --- src/bin/pg_basebackup/Makefile | 2 ++ src/bin/pg_basebackup/t/030_pg_recvlogical.pl | 46 +++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) create mode 100644 src/bin/pg_basebackup/t/030_pg_recvlogical.pl diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile index 52ac9e9..1e54b19 100644 --- a/src/bin/pg_basebackup/Makefile +++ b/src/bin/pg_basebackup/Makefile @@ -12,6 +12,8 @@ PGFILEDESC = "pg_basebackup/pg_receivexlog/pg_recvlogical - streaming WAL and backup receivers" PGAPPICON=win32 +EXTRA_INSTALL=contrib/test_decoding + subdir = src/bin/pg_basebackup top_builddir = ../../.. include $(top_builddir)/src/Makefile.global diff --git a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl new file mode 100644 index 0000000..dca5ef2 --- /dev/null +++ b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl @@ -0,0 +1,46 @@ +use strict; +use warnings; +use TestLib; +use PostgresNode; +use Test::More tests => 15; + +program_help_ok('pg_recvlogical'); +program_version_ok('pg_recvlogical'); +program_options_handling_ok('pg_recvlogical'); + +my $node = get_new_node('main'); + +# Initialize node without replication settings +$node->init(allows_streaming => 1, has_archiving => 1); +$node->append_conf('postgresql.conf', q{ +wal_level = 'logical' +max_replication_slots = 4 +max_wal_senders = 4 +log_min_messages = 'debug1' +log_error_verbosity = verbose +}); +$node->dump_info; +$node->start; + +$node->command_fails(['pg_recvlogical'], + 'pg_recvlogical needs a slot name'); +$node->command_fails(['pg_recvlogical', '-S', 'test'], + 'pg_recvlogical needs a database'); +$node->command_fails(['pg_recvlogical', '-S', 'test', '-d', 'postgres'], + 'pg_recvlogical needs an action'); +$node->command_fails(['pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'), '--start'], + 'no destionation file'); + +$node->command_ok(['pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'), '--create-slot'], + 'slot created'); + +my $slot = $node->slot('test'); +isnt($slot->{'restart_lsn'}, '', 'restart lsn is defined for new slot'); + +$node->psql('postgres', 'CREATE TABLE test_table(x integer)'); +$node->psql('postgres', 'INSERT INTO test_table(x) SELECT y FROM generate_series(1, 10) a(y);'); +my $nextlsn = $node->safe_psql('postgres', 'SELECT pg_current_xlog_insert_location()'); +chomp($nextlsn); + +$node->command_ok(['pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'), '--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'], + 'replayed a transaction'); -- 2.5.5
From d3b3ef86b5a91cea404c555d7a029bf2b21fb4f4 Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Tue, 15 Nov 2016 16:06:16 +0800 Subject: [PATCH 3/3] Add a pg_recvlogical wrapper to PostgresNode --- src/test/perl/PostgresNode.pm | 75 ++++++++++++++++++++++++++++- src/test/recovery/t/006_logical_decoding.pl | 31 +++++++++++- 2 files changed, 104 insertions(+), 2 deletions(-) diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm index 2f009d4..5197e80 100644 --- a/src/test/perl/PostgresNode.pm +++ b/src/test/perl/PostgresNode.pm @@ -1124,7 +1124,7 @@ sub psql # IPC::Run::run threw an exception. re-throw unless it's a # timeout, which we'll handle by testing is_expired die $exc_save - if (blessed($exc_save) || $exc_save ne $timeout_exception); + if (blessed($exc_save) || $exc_save !~ qr/$timeout_exception/); $ret = undef; @@ -1493,6 +1493,79 @@ sub slot return $self->query_hash('postgres', "SELECT __COLUMNS__ FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'", @columns); } +=pod $node->pg_recvlogical_upto(self, dbname, slot_name, endpos, timeout_secs, ...) + +Invoke pg_recvlogical to read from slot_name on dbname until LSN endpos, which +corresponds to pg_recvlogical --endpos. Gives up after timeout (if nonzero). + +Disallows pg_recvlogial from internally retrying on error by passing --no-loop. + +Plugin options are passed as additional keyword arguments. + +If called in scalar context, returns stdout, and die()s on timeout or nonzero return. + +If called in array context, returns a tuple of (retval, stdout, stderr, timeout). +timeout is the IPC::Run::Timeout object whose is_expired method can be tested +to check for timeout. retval is undef on timeout. + +=cut + +sub pg_recvlogical_upto +{ + my ($self, $dbname, $slot_name, $endpos, $timeout_secs, %plugin_options) = @_; + my ($stdout, $stderr); + + my $timeout_exception = 'pg_recvlogical timed out'; + + my @cmd = ('pg_recvlogical', '-S', $slot_name, '--dbname', $self->connstr($dbname)); + push @cmd, '--endpos', $endpos if ($endpos); + push @cmd, '-f', '-', '--no-loop', '--start'; + + while (my ($k, $v) = each %plugin_options) + { + die "= is not permitted to appear in replication option name" if ($k =~ qr/=/); + push @cmd, "-o", "$k=$v"; + } + + my $timeout; + $timeout = IPC::Run::timeout($timeout_secs, exception => $timeout_exception ) if $timeout_secs; + my $ret = 0; + + do { + local $@; + eval { + IPC::Run::run(\@cmd, ">", \$stdout, "2>", \$stderr, $timeout); + $ret = $?; + }; + my $exc_save = $@; + if ($exc_save) + { + # IPC::Run::run threw an exception. re-throw unless it's a + # timeout, which we'll handle by testing is_expired + die $exc_save + if (blessed($exc_save) || $exc_save !~ qr/$timeout_exception/); + + $ret = undef; + + die "Got timeout exception '$exc_save' but timer not expired?!" + unless $timeout->is_expired; + + die "$exc_save waiting for endpos $endpos with stdout '$stdout', stderr '$stderr'" + unless wantarray; + } + }; + + if (wantarray) + { + return ($ret, $stdout, $stderr, $timeout); + } + else + { + die "pg_recvlogical exited with code '$ret', stdout '$stdout' and stderr '$stderr'" if $ret; + return $stdout; + } +} + =pod =back diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index b80a9a9..d8cc8d3 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -1,9 +1,13 @@ # Testing of logical decoding using SQL interface and/or pg_recvlogical +# +# Most logical decoding tests are in contrib/test_decoding. This module +# is for work that doesn't fit well there, like where server restarts +# are required. use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 2; +use Test::More tests => 5; # Initialize master node my $node_master = get_new_node('master'); @@ -36,5 +40,30 @@ $result = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_chan chomp($result); is($result, '', 'Decoding after fast restart repeats no rows'); +# Insert some rows and verify that we get the same results from pg_recvlogical +# and the SQL interface. +$node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]); + +my $expected = q{BEGIN +table public.decoding_test: INSERT: x[integer]:1 y[text]:'1' +table public.decoding_test: INSERT: x[integer]:2 y[text]:'2' +table public.decoding_test: INSERT: x[integer]:3 y[text]:'3' +table public.decoding_test: INSERT: x[integer]:4 y[text]:'4' +COMMIT}; + +my $stdout_sql = $node_master->safe_psql('postgres', qq[SELECT data FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]); +is($stdout_sql, $expected, 'got expected output from SQL decoding session'); + +my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"); +diag "waiting to replay $endpos"; + +my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1'); +chomp($stdout_recv); +is($stdout_recv, $expected, 'got same expected output from pg_recvlogical decoding session'); + +$stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1'); +chomp($stdout_recv); +is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot'); + # done with the node $node_master->stop; -- 2.5.5
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers