On Tue, 22 Oct 2024 at 16:24, Hayato Kuroda (Fujitsu)
<kuroda.hay...@fujitsu.com> wrote:
>
> Dear Amit, Vignesh,
>
> > 1. Please ensure that none of the existing tests that use
> > subscriptions with large changes will be impacted due to this change.
>
> I found at least 022_twophase_cascade.pl should be fixed.
> The file has a part which tests non-streaming case:
>
> ```
> # -----------------------
> # 2PC NON-STREAMING TESTS
> # -----------------------
> ...
> $node_B->safe_psql(
>         'postgres', "
>         CREATE SUBSCRIPTION tap_sub_B
>         CONNECTION '$node_A_connstr application_name=$appname_B'
>         PUBLICATION tap_pub_A
>         WITH (two_phase = on)");
> ...
> ```
>
> I know the streaming actually does not happen because few tuples will be 
> inserted
> later, but creating as streaming=parallel is bit misleading.

The attached v5 version has the change to create subscriptions in
streaming off mode. I also did not find any other TAP test which
required further changes.

Regards,
Vignesh
From 479006dc72c2b101884349e3185128ffaf25b5a0 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Wed, 25 Sep 2024 14:42:20 +0530
Subject: [PATCH v5] Change the default value of the streaming option to
 'parallel'.

Previously the default value of this option was 'off'. The parallel option
indicates that the changes in large transactions (greater than
logical_decoding_work_mem) are to be applied directly via one of the
parallel apply workers, if available.

The parallel mode was introduced in 16, but we refrain from enabling it by
default to avoid seeing any unpleasant behavior in the existing
applications. However we haven't found any such report yet, so this is a
good time to enable it by default.

Reported-by: Vignesh C
Author: Hayato Kuroda, Amit Kapila
Discussion: https://postgr.es/m/CALDaNm1=medhw23nuoepjtmonwsmsp80ddsw+sejs0gumc_...@mail.gmail.com
---
 doc/src/sgml/ref/create_subscription.sgml     | 27 ++++++++++++-------
 src/backend/commands/subscriptioncmds.c       |  2 +-
 src/bin/pg_dump/pg_dump.c                     |  2 ++
 src/bin/pg_dump/t/002_pg_dump.pl              | 10 +++----
 src/test/regress/expected/subscription.out    | 24 ++++++++---------
 .../subscription/t/022_twophase_cascade.pl    |  4 +--
 6 files changed, 39 insertions(+), 30 deletions(-)

diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 8a3096e62b..6cf7d4f9a1 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -271,11 +271,23 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
         <listitem>
          <para>
           Specifies whether to enable streaming of in-progress transactions
-          for this subscription.  The default value is <literal>off</literal>,
-          meaning all transactions are fully decoded on the publisher and only
-          then sent to the subscriber as a whole.
+          for this subscription.  The default value is <literal>parallel</literal>,
+          meaning incoming changes are directly applied via one of the parallel
+          apply workers, if available. If no parallel apply worker is free to
+          handle streaming transactions then the changes are written to
+          temporary files and applied after the transaction is committed. Note
+          that if an error happens in a parallel apply worker, the finish LSN
+          of the remote transaction might not be reported in the server log.
          </para>
 
+         <caution>
+          <para>
+           There is a risk of deadlock when the schemas of the publisher and
+           subscriber differ, although such cases are rare. The apply worker
+           is equipped to retry these transactions automatically.
+          </para>
+         </caution>
+
          <para>
           If set to <literal>on</literal>, the incoming changes are written to
           temporary files and then applied only after the transaction is
@@ -283,13 +295,8 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
 
          <para>
-          If set to <literal>parallel</literal>, incoming changes are directly
-          applied via one of the parallel apply workers, if available. If no
-          parallel apply worker is free to handle streaming transactions then
-          the changes are written to temporary files and applied after the
-          transaction is committed. Note that if an error happens in a
-          parallel apply worker, the finish LSN of the remote transaction
-          might not be reported in the server log.
+          If set to <literal>off</literal>, all transactions are fully decoded
+          on the publisher and only then sent to the subscriber as a whole.
          </para>
         </listitem>
        </varlistentry>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 02ccc636b8..0a7a618855 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -151,7 +151,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 	if (IsSet(supported_opts, SUBOPT_BINARY))
 		opts->binary = false;
 	if (IsSet(supported_opts, SUBOPT_STREAMING))
-		opts->streaming = LOGICALREP_STREAM_OFF;
+		opts->streaming = LOGICALREP_STREAM_PARALLEL;
 	if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
 		opts->twophase = false;
 	if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 1b47c388ce..d8c6330732 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5235,6 +5235,8 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 		appendPQExpBufferStr(query, ", streaming = on");
 	else if (strcmp(subinfo->substream, "p") == 0)
 		appendPQExpBufferStr(query, ", streaming = parallel");
+	else
+		appendPQExpBufferStr(query, ", streaming = off");
 
 	if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
 		appendPQExpBufferStr(query, ", two_phase = on");
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index ab6c830491..ac60829d68 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -2992,7 +2992,7 @@ my %tests = (
 						 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
 						 WITH (connect = false);',
 		regexp => qr/^
-			\QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub1');\E
+			\QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub1', streaming = parallel);\E
 			/xm,
 		like => { %full_runs, section_post_data => 1, },
 	},
@@ -3001,9 +3001,9 @@ my %tests = (
 		create_order => 50,
 		create_sql => 'CREATE SUBSCRIPTION sub2
 						 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
-						 WITH (connect = false, origin = none);',
+						 WITH (connect = false, origin = none, streaming = off);',
 		regexp => qr/^
-			\QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', origin = none);\E
+			\QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', streaming = off, origin = none);\E
 			/xm,
 		like => { %full_runs, section_post_data => 1, },
 	},
@@ -3012,9 +3012,9 @@ my %tests = (
 		create_order => 50,
 		create_sql => 'CREATE SUBSCRIPTION sub3
 						 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
-						 WITH (connect = false, origin = any);',
+						 WITH (connect = false, origin = any, streaming = on);',
 		regexp => qr/^
-			\QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3');\E
+			\QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E
 			/xm,
 		like => { %full_runs, section_post_data => 1, },
 	},
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 17d48b1685..1443e1d929 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -119,7 +119,7 @@ HINT:  To initiate replication, you must manually create the replication slot, e
                                                                                                                  List of subscriptions
        Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
 ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | none   | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
@@ -127,7 +127,7 @@ ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
                                                                                                                  List of subscriptions
        Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
 ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -148,7 +148,7 @@ ERROR:  invalid connection string syntax: missing "=" after "foobar" in connecti
                                                                                                                 List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -160,7 +160,7 @@ ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
                                                                                                                      List of subscriptions
       Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | f                 | t             | f        | off                | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | f                 | t             | f        | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -179,7 +179,7 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
                                                                                                                      List of subscriptions
       Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist2 | 0/12345
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist2 | 0/12345
 (1 row)
 
 -- ok - with lsn = NONE
@@ -191,7 +191,7 @@ ERROR:  invalid WAL location (LSN): 0/0
                                                                                                                      List of subscriptions
       Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -226,7 +226,7 @@ HINT:  Available values: local, remote_write, remote_apply, on, off.
                                                                                                                        List of subscriptions
         Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
 ---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | local              | dbname=regress_doesnotexist2 | 0/0
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -258,7 +258,7 @@ HINT:  To initiate replication, you must manually create the replication slot, e
                                                                                                                 List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
@@ -267,7 +267,7 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
                                                                                                                 List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -374,7 +374,7 @@ HINT:  To initiate replication, you must manually create the replication slot, e
                                                                                                                 List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- we can alter streaming when two_phase enabled
@@ -412,7 +412,7 @@ HINT:  To initiate replication, you must manually create the replication slot, e
                                                                                                                 List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
@@ -420,7 +420,7 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
                                                                                                                 List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
 -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | t                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl
index 1acc79f17e..7fe7c7c175 100644
--- a/src/test/subscription/t/022_twophase_cascade.pl
+++ b/src/test/subscription/t/022_twophase_cascade.pl
@@ -88,7 +88,7 @@ $node_B->safe_psql(
 	CREATE SUBSCRIPTION tap_sub_B
 	CONNECTION '$node_A_connstr application_name=$appname_B'
 	PUBLICATION tap_pub_A
-	WITH (two_phase = on)");
+	WITH (two_phase = on, streaming = off)");
 
 # node_B (pub) -> node_C (sub)
 my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
@@ -100,7 +100,7 @@ $node_C->safe_psql(
 	CREATE SUBSCRIPTION tap_sub_C
 	CONNECTION '$node_B_connstr application_name=$appname_C'
 	PUBLICATION tap_pub_B
-	WITH (two_phase = on)");
+	WITH (two_phase = on, streaming = off)");
 
 # Wait for subscribers to finish initialization
 $node_A->wait_for_catchup($appname_B);
-- 
2.34.1

Reply via email to