On Sun, Jan 22, 2023, at 9:42 AM, Takamichi Osumi (Fujitsu) wrote: > On Saturday, January 21, 2023 3:36 AM I wrote: > > Kindly have a look at the patch v18. > I've conducted some refactoring for v18. > Now the latest patch should be tidier and > the comments would be clearer and more aligned as a whole. > > Attached the updated patch v19. [I haven't been following this thread for a long time...]
Good to know that you keep improving this patch. I have a few suggestions that were easier to provide a patch on top of your latest patch than to provide an inline suggestions. There are a few documentation polishing. Let me comment some of them above. - The length of time (ms) to delay the application of changes. + Total time spent delaying the application of changes, in milliseconds I don't remember if I suggested this description for catalog but IMO the suggestion reads better for me. - For time-delayed logical replication (i.e. when the subscription is - created with parameter min_apply_delay > 0), the apply worker sends a - Standby Status Update message to the publisher with a period of - <literal>wal_receiver_status_interval</literal>. Make sure to set - <literal>wal_receiver_status_interval</literal> less than the - <literal>wal_sender_timeout</literal> on the publisher, otherwise, the - walsender will repeatedly terminate due to the timeout errors. If - <literal>wal_receiver_status_interval</literal> is set to zero, the apply - worker doesn't send any feedback messages during the subscriber's - <literal>min_apply_delay</literal> period. See - <xref linkend="sql-createsubscription"/> for details. + For time-delayed logical replication, the apply worker sends a feedback + message to the publisher every + <varname>wal_receiver_status_interval</varname> milliseconds. Make sure + to set <varname>wal_receiver_status_interval</varname> less than the + <varname>wal_sender_timeout</varname> on the publisher, otherwise, the + <literal>walsender</literal> will repeatedly terminate due to timeout + error. If <varname>wal_receiver_status_interval</varname> is set to + zero, the apply worker doesn't send any feedback messages during the + <literal>min_apply_delay</literal> interval. I removed the parenthesis explanation about time-delayed logical replication. If you are reading the documentation and does not know what it means you should (a) read the logical replication chapter or (b) check the glossary (maybe a new entry should be added). I also removed the Standby status Update message but it is a low level detail; let's refer to it as feedback message as the other sentences do. I changed "literal" to "varname" that's the correct tag for parameters. I replace "period" with "interval" that was the previous terminology. IMO we should be uniform, use one or the other. - The subscriber replication can be instructed to lag behind the publisher - side changes by specifying the <literal>min_apply_delay</literal> - subscription parameter. See <xref linkend="sql-createsubscription"/> for - details. + A logical replication subscription can delay the application of changes by + specifying the <literal>min_apply_delay</literal> subscription parameter. + See <xref linkend="sql-createsubscription"/> for details. This feature refers to a specific subscription, hence, "logical replication subscription" instead of "subscriber replication". + if (IsSet(opts->specified_opts, SUBOPT_MIN_APPLY_DELAY)) + errorConflictingDefElem(defel, pstate); + Peter S referred to this missing piece of code too. -int +static int defGetMinApplyDelay(DefElem *def) { It seems you forgot static keyword. - elog(DEBUG2, "time-delayed replication for txid %u, min_apply_delay = %lld ms, Remaining wait time: %ld ms", - xid, (long long) MySubscription->minapplydelay, diffms); + elog(DEBUG2, "time-delayed replication for txid %u, min_apply_delay = " INT64_FORMAT " ms, remaining wait time: %ld ms", + xid, MySubscription->minapplydelay, diffms); int64 should use format modifier INT64_FORMAT. - (long) wal_receiver_status_interval * 1000, + wal_receiver_status_interval * 1000L, Cast is not required. I added a suffix to the constant. - elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X in-delayed: %d", + elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X, apply delay: %s", force, LSN_FORMAT_ARGS(recvpos), LSN_FORMAT_ARGS(writepos), LSN_FORMAT_ARGS(flushpos), - in_delayed_apply); + in_delayed_apply? "yes" : "no"); It is better to use a string to represent the yes/no option. - gettext_noop("Min apply delay (ms)")); + gettext_noop("Min apply delay")); I don't know if it was discussed but we don't add units to headers. When I think about this parameter representation (internal and external), I decided to use the previous code because it provides a unit for external representation. I understand that using the same representation as recovery_min_apply_delay is good but the current code does not handle the external representation accordingly. (recovery_min_apply_delay uses the GUC machinery to adds the unit but for min_apply_delay, it doesn't). # Setup for streaming case -$node_publisher->append_conf('postgres.conf', +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_mode = immediate'); $node_publisher->reload; Fix configuration file name. Maybe tests should do a better job. I think check_apply_delay_time is fragile because it does not guarantee that time is not shifted. Time-delayed replication is a subscriber feature and to check its correctness it should check the logs. # Note that we cannot call check_apply_delay_log() here because there is a # possibility that the delay is skipped. The event happens when the WAL # replication between publisher and subscriber is delayed due to a mechanical # problem. The log output will be checked later - substantial delay-time case. If you might not use the logs for it, it should adjust the min_apply_delay, no? It does not exercise the min_apply_delay vs parallel streaming mode. + /* + * The combination of parallel streaming mode and + * min_apply_delay is not allowed. + */ + if (opts.streaming == LOGICALREP_STREAM_PARALLEL) + if ((IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY) && opts.min_apply_delay > 0) || + (!IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY) && sub->minapplydelay > 0)) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot enable %s mode for subscription with %s", + "streaming = parallel", "min_apply_delay")); + Is this code correct? I also didn't like this message. "cannot enable streaming = parallel mode for subscription with min_apply_delay" is far from a good error message. How about refer parallelism to "parallel streaming mode". -- Euler Taveira EDB https://www.enterprisedb.com/
From 5024325284ee3b4a4dc0a6a1cc6457ed5608cb46 Mon Sep 17 00:00:00 2001 From: Euler Taveira <euler.tave...@enterprisedb.com> Date: Mon, 23 Jan 2023 15:52:55 -0300 Subject: [PATCH] Euler's review --- doc/src/sgml/catalogs.sgml | 2 +- doc/src/sgml/config.sgml | 20 ++- doc/src/sgml/logical-replication.sgml | 7 +- doc/src/sgml/ref/create_subscription.sgml | 13 +- src/backend/commands/subscriptioncmds.c | 13 +- src/backend/replication/logical/worker.c | 40 +++--- src/bin/psql/describe.c | 2 +- src/test/regress/expected/subscription.out | 160 ++++++++++----------- src/test/subscription/t/032_apply_delay.pl | 8 +- 9 files changed, 133 insertions(+), 132 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index bf3c05241c..0bdb683296 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7878,7 +7878,7 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l <structfield>subminapplydelay</structfield> <type>int8</type> </para> <para> - The length of time (ms) to delay the application of changes. + Total time spent delaying the application of changes, in milliseconds </para></entry> </row> diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 39244bf64a..a15723d74f 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4788,17 +4788,15 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class=" command line. </para> <para> - For time-delayed logical replication (i.e. when the subscription is - created with parameter min_apply_delay > 0), the apply worker sends a - Standby Status Update message to the publisher with a period of - <literal>wal_receiver_status_interval</literal>. Make sure to set - <literal>wal_receiver_status_interval</literal> less than the - <literal>wal_sender_timeout</literal> on the publisher, otherwise, the - walsender will repeatedly terminate due to the timeout errors. If - <literal>wal_receiver_status_interval</literal> is set to zero, the apply - worker doesn't send any feedback messages during the subscriber's - <literal>min_apply_delay</literal> period. See - <xref linkend="sql-createsubscription"/> for details. + For time-delayed logical replication, the apply worker sends a feedback + message to the publisher every + <varname>wal_receiver_status_interval</varname> milliseconds. Make sure + to set <varname>wal_receiver_status_interval</varname> less than the + <varname>wal_sender_timeout</varname> on the publisher, otherwise, the + <literal>walsender</literal> will repeatedly terminate due to timeout + error. If <varname>wal_receiver_status_interval</varname> is set to + zero, the apply worker doesn't send any feedback messages during the + <literal>min_apply_delay</literal> interval. </para> </listitem> </varlistentry> diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 863af11a47..d8ae93f88d 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -248,10 +248,9 @@ </para> <para> - The subscriber replication can be instructed to lag behind the publisher - side changes by specifying the <literal>min_apply_delay</literal> - subscription parameter. See <xref linkend="sql-createsubscription"/> for - details. + A logical replication subscription can delay the application of changes by + specifying the <literal>min_apply_delay</literal> subscription parameter. + See <xref linkend="sql-createsubscription"/> for details. </para> <sect2 id="logical-replication-subscription-slot"> diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 76ee9c0b3d..97ca9f8d9e 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -378,10 +378,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl </para> <warning> <para> - Delaying the replication can mean there is a much longer time between making - a change on the publisher, and that change being committed on the subscriber. - This can impact the performance of synchronous replication. - See <xref linkend="guc-synchronous-commit"/>. + Delaying the replication can mean there is a much longer time + between making a change on the publisher, and that change being + committed on the subscriber. This can impact the performance of + synchronous replication. See <xref linkend="guc-synchronous-commit"/> + parameter. </para> </warning> </listitem> @@ -452,8 +453,8 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl </para> <para> - A non-zero <literal>min_apply_delay</literal> parameter is not allowed when streaming - in parallel mode. + A non-zero <literal>min_apply_delay</literal> parameter is not allowed when + streaming in parallel mode. </para> <para> diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 11e9e9160a..d5fa7a95a9 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -331,6 +331,9 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, else if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) && strcmp(defel->defname, "min_apply_delay") == 0) { + if (IsSet(opts->specified_opts, SUBOPT_MIN_APPLY_DELAY)) + errorConflictingDefElem(defel, pstate); + opts->specified_opts |= SUBOPT_MIN_APPLY_DELAY; opts->min_apply_delay = defGetMinApplyDelay(defel); } @@ -2261,11 +2264,11 @@ defGetStreamingMode(DefElem *def) /* - * Extract the min_apply_delay mode value from a DefElem. This is very similar - * to PGC_INT case of parse_and_validate_value(), because min_apply_delay + * Extract the min_apply_delay value from a DefElem. This is very similar to + * parse_and_validate_value() for integer values, because min_apply_delay * accepts the same string as recovery_min_apply_delay. */ -int +static int defGetMinApplyDelay(DefElem *def) { char *value; @@ -2294,8 +2297,8 @@ defGetMinApplyDelay(DefElem *def) hintmsg ? errhint("%s", _(hintmsg)) : 0)); /* - * Check lower bound. parse_int() has been already confirmed that result - * is equal to or smaller than INT_MAX. + * Check lower bound. parse_int() has already been confirmed that result + * is less than or equal to INT_MAX. */ if (result < 0) ereport(ERROR, diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index eeac69ea13..00fe29fc20 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -320,11 +320,11 @@ bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; /* - * In order to avoid walsender timeout for time-delayed replication the worker - * process keeps sending feedback messages during the delay period. - * Meanwhile, the feature delays the apply before starting the - * transaction and thus we don't write WALs for the suspended changes during - * the wait. When the worker process sends a feedback message + * In order to avoid walsender timeout for time-delayed logical replication the + * apply worker keeps sending feedback messages during the delay interval. + * Meanwhile, the feature delays the apply before the start of the + * transaction and thus we don't write WAL records for the suspended changes during + * the wait. When the apply worker sends a feedback message * during the delay, we should not make positions of the flushed and apply LSN * overwritten by the last received latest LSN. See send_feedback() for details. */ @@ -1090,20 +1090,20 @@ maybe_delay_apply(TransactionId xid, TimestampTz finish_ts) if (diffms <= 0) break; - elog(DEBUG2, "time-delayed replication for txid %u, min_apply_delay = %lld ms, Remaining wait time: %ld ms", - xid, (long long) MySubscription->minapplydelay, diffms); + elog(DEBUG2, "time-delayed replication for txid %u, min_apply_delay = " INT64_FORMAT " ms, remaining wait time: %ld ms", + xid, MySubscription->minapplydelay, diffms); /* * Call send_feedback() to prevent the publisher from exiting by * timeout during the delay, when wal_receiver_status_interval is * available. */ - if (wal_receiver_status_interval > 0 - && diffms > wal_receiver_status_interval * 1000) + if (wal_receiver_status_interval > 0 && + diffms > wal_receiver_status_interval * 1000L) { WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - (long) wal_receiver_status_interval * 1000, + wal_receiver_status_interval * 1000L, WAIT_EVENT_RECOVERY_APPLY_DELAY); send_feedback(last_received, true, false, true); } @@ -2135,8 +2135,8 @@ ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, /* * Common spoolfile processing. * - * The commit/prepare time for streaming transaction is required to achieve - * time-delayed replication. + * The commit/prepare time (finish_ts) for streamed transactions is required + * for time-delayed logical replication. */ void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, @@ -3869,12 +3869,12 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply, bool in_delayed * No outstanding transactions to flush, we can report the latest received * position. This is important for synchronous replication. * - * If the subscriber side apply is delayed (because of time-delayed - * replication) then do not tell the publisher that the received latest - * LSN is already applied and flushed, otherwise, it leads to the - * publisher side making a wrong assumption of logical replication - * progress. Instead, we just send a feedback message to avoid a publisher - * timeout during the delay. + * If the logical replication subscription is delayed (min_apply_delay + * parameter) then do not inform the publisher that the received latest LSN + * is already applied and flushed, otherwise, the publisher will make a + * wrong assumption about the logical replication progress. Instead, it + * just sends a feedback message to avoid a replication timeout during the + * delay. */ if (!have_pending_txes && !in_delayed_apply) flushpos = writepos = recvpos; @@ -3913,12 +3913,12 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply, bool in_delayed pq_sendint64(reply_message, now); /* sendTime */ pq_sendbyte(reply_message, requestReply); /* replyRequested */ - elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X in-delayed: %d", + elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X, apply delay: %s", force, LSN_FORMAT_ARGS(recvpos), LSN_FORMAT_ARGS(writepos), LSN_FORMAT_ARGS(flushpos), - in_delayed_apply); + in_delayed_apply? "yes" : "no"); walrcv_send(LogRepWorkerWalRcvConn, reply_message->data, reply_message->len); diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 8a27063bed..81d4607a1c 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6533,7 +6533,7 @@ describeSubscriptions(const char *pattern, bool verbose) ", suborigin AS \"%s\"\n" ", subminapplydelay AS \"%s\"\n", gettext_noop("Origin"), - gettext_noop("Min apply delay (ms)")); + gettext_noop("Min apply delay")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 1230bcb096..977f73fe9b 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -114,18 +114,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | 0 | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -135,10 +135,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -155,10 +155,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -167,10 +167,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -202,10 +202,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | 0 | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | 0 | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -239,19 +239,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -263,27 +263,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -298,10 +298,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more than once @@ -316,10 +316,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -355,10 +355,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -367,10 +367,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -380,10 +380,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -396,18 +396,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | 0 | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -425,19 +425,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 123 | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 123 | off | dbname=regress_doesnotexist | 0/0 (1 row) -- success -- min_apply_delay value with unit is converted into ms and stored as an integer ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = '1 d'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay (ms) | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 86400000 | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 86400000 | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - alter subscription with streaming = parallel should fail when time-delayed replication is set diff --git a/src/test/subscription/t/032_apply_delay.pl b/src/test/subscription/t/032_apply_delay.pl index 37388e474f..1b6bc1ef80 100644 --- a/src/test/subscription/t/032_apply_delay.pl +++ b/src/test/subscription/t/032_apply_delay.pl @@ -16,7 +16,7 @@ sub check_apply_delay_log { my ($node_subscriber, $offset, $expected) = @_; - my $log_location = $node_subscriber->wait_for_log(qr/time-delayed replication for txid (\d+), min_apply_delay = (\d+) ms, Remaining wait time: (\d+) ms/, $offset); + my $log_location = $node_subscriber->wait_for_log(qr/time-delayed replication for txid (\d+), min_apply_delay = (\d+) ms, remaining wait time: (\d+) ms/, $offset); cmp_ok($log_location, '>', $offset, "logfile contains triggered logical replication apply delay" @@ -25,7 +25,7 @@ sub check_apply_delay_log # Get the delay time from the server log my $contents = slurp_file($node_subscriber->logfile, $offset); $contents =~ - qr/time-delayed replication for txid (\d+), min_apply_delay = (\d+) ms, Remaining wait time: (\d+) ms/ + qr/time-delayed replication for txid (\d+), min_apply_delay = (\d+) ms, remaining wait time: (\d+) ms/ or die "could not get the apply worker wait time"; my $logged_delay = $3; @@ -87,7 +87,7 @@ $node_publisher->safe_psql('postgres', my $appname = 'tap_sub'; -# Create a subscription that applies the trasaction after 50 milliseconds delay +# Create a subscription that applies the transaction after 50 milliseconds delay $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (copy_data = off, min_apply_delay = '50ms', streaming = 'on')" ); @@ -114,7 +114,7 @@ is($result, qq(2|1|2), 'check if the new rows were applied to subscriber'); check_apply_delay_time($node_publisher, $node_subscriber, '2', '0.05'); # Setup for streaming case -$node_publisher->append_conf('postgres.conf', +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_mode = immediate'); $node_publisher->reload; -- 2.30.2