On Thu, Aug 15, 2024, at 5:53 PM, Doruk Yilmaz wrote:
> Hello again,
> 
> On Tue, Aug 13, 2024 at 12:48 AM Euler Taveira <eu...@eulerto.com> wrote:
> > I'm curious about your use case. Is it just because the internal function 
> > has a
> > different signature or your tool is capable of apply logical replication 
> > changes
> > in parallel using the SQL API?
> 
> The latter is correct, it applies logical replication changes in parallel.
> Since multiple connections may commit, we need all of them to be able
> to advance the replication origin.
> 
> > * no documentation changes. Since the function you are changing has a new
> > signature, this change should be reflected in the documentation.
> > * no need for a new internal function. The second parameter (PID) can be
> > optional and defaults to 0 in this case. See how we changed the
> > pg_create_logical_replication_slot along the years add some IN parameters 
> > like
> > twophase and failover in the recent versions.
> 
> I updated/rewrote the patch to reflect these suggestions.
> It now has the same DEFAULT 0 style used in 
> pg_create_logical_replication_slot.
> I also updated the documentation.

[after a long hiatus...]

I tested your patch again and it does what is advertised. I changed your patch
a bit. The main change was the documentation. You didn't explain what this new
parameter is for. I tried to explain but don't want to add lots of details.
(There is a section that explain how parallel apply processes work behind the
scenes.) I also renamed it from acquired_by to pid to be more descriptive. I
fixed some white space issues too. I noticed that there are no tests. This
doesn't appear to be a shortcoming from this patch but we need to cover some of
these replication functions with an additional test file in another patch.
Finally, I wrote a commit message and it is RfC.

session 1:

postgres=# select * from pg_replication_origin;
roident | roname 
---------+--------
(0 rows)

postgres=# SELECT pg_backend_pid();
pg_backend_pid 
----------------
         260732
(1 row)

postgres=# SELECT pg_replication_origin_create('test');
pg_replication_origin_create 
------------------------------
                            1
(1 row)

postgres=# SELECT pg_replication_origin_session_setup('test', 0);
pg_replication_origin_session_setup 
-------------------------------------
 
(1 row)

postgres=# select * from pg_replication_origin;
roident | roname 
---------+--------
       1 | test
(1 row)

session 2:

postgres=# SELECT pg_replication_origin_session_setup('test', 260732);
pg_replication_origin_session_setup 
-------------------------------------
 
(1 row)

session 3:

postgres=# SELECT pg_replication_origin_session_setup('test', 12345);
ERROR:  could not find replication state slot for replication origin with OID 1 
which was acquired by 12345


--
Euler Taveira
EDB   https://www.enterprisedb.com/
From 74a74fd02bce786093c19a23bef9444d0b8ef41d Mon Sep 17 00:00:00 2001
From: Doruk <do...@mixrank.com>
Date: Thu, 15 Aug 2024 23:34:26 +0300
Subject: [PATCH v3] pg_replication_origin_session_setup: pid parameter

Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
 doc/src/sgml/func.sgml                   | 8 +++++++-
 src/backend/catalog/system_functions.sql | 9 ++++++++-
 src/backend/replication/logical/origin.c | 4 +++-
 src/include/catalog/pg_proc.dat          | 2 +-
 4 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 47370e581a..e50e689fb6 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29475,7 +29475,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <indexterm>
          <primary>pg_replication_origin_session_setup</primary>
         </indexterm>
-        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
         <returnvalue>void</returnvalue>
        </para>
        <para>
@@ -29483,6 +29483,12 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         origin, allowing replay progress to be tracked.
         Can only be used if no origin is currently selected.
         Use <function>pg_replication_origin_session_reset</function> to undo.
+        If multiple processes can safely use the same replication origin (for
+        example, parallel apply processes), the optional <parameter>pid</parameter>
+        parameter can be used to specify the process ID of the first process.
+        The first process must provide <parameter>pid</parameter> equals to
+        <literal>0</literal> and the other processes that share the same
+        replication origin should provide the process ID of the first process.
        </para></entry>
       </row>
 
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 591157b1d1..26151e0f1c 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -668,6 +668,13 @@ LANGUAGE INTERNAL
 CALLED ON NULL INPUT VOLATILE
 AS 'pg_set_attribute_stats';
 
+CREATE OR REPLACE FUNCTION
+  pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_replication_origin_session_setup';
+
 --
 -- The default permissions for functions mean that anyone can execute them.
 -- A number of functions shouldn't be executable by just anyone, but rather
@@ -769,7 +776,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
 
 REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
 
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
 
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 1b586cb1cf..9cbe1eec45 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1355,12 +1355,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
 {
 	char	   *name;
 	RepOriginId origin;
+	int			pid;
 
 	replorigin_check_prerequisites(true, false);
 
 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
 	origin = replorigin_by_name(name, false);
-	replorigin_session_setup(origin, 0);
+	pid = PG_GETARG_INT32(1);
+	replorigin_session_setup(origin, pid);
 
 	replorigin_session_origin = origin;
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index b37e8a6f88..ea118a0563 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12063,7 +12063,7 @@
 { oid => '6006',
   descr => 'configure session to maintain replication progress tracking for the passed in origin',
   proname => 'pg_replication_origin_session_setup', provolatile => 'v',
-  proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+  proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
   prosrc => 'pg_replication_origin_session_setup' },
 
 { oid => '6007', descr => 'teardown configured replication progress tracking',
-- 
2.39.5

Reply via email to