On Thu, Aug 15, 2024, at 5:53 PM, Doruk Yilmaz wrote: > Hello again, > > On Tue, Aug 13, 2024 at 12:48 AM Euler Taveira <eu...@eulerto.com> wrote: > > I'm curious about your use case. Is it just because the internal function > > has a > > different signature or your tool is capable of apply logical replication > > changes > > in parallel using the SQL API? > > The latter is correct, it applies logical replication changes in parallel. > Since multiple connections may commit, we need all of them to be able > to advance the replication origin. > > > * no documentation changes. Since the function you are changing has a new > > signature, this change should be reflected in the documentation. > > * no need for a new internal function. The second parameter (PID) can be > > optional and defaults to 0 in this case. See how we changed the > > pg_create_logical_replication_slot along the years add some IN parameters > > like > > twophase and failover in the recent versions. > > I updated/rewrote the patch to reflect these suggestions. > It now has the same DEFAULT 0 style used in > pg_create_logical_replication_slot. > I also updated the documentation.
[after a long hiatus...] I tested your patch again and it does what is advertised. I changed your patch a bit. The main change was the documentation. You didn't explain what this new parameter is for. I tried to explain but don't want to add lots of details. (There is a section that explain how parallel apply processes work behind the scenes.) I also renamed it from acquired_by to pid to be more descriptive. I fixed some white space issues too. I noticed that there are no tests. This doesn't appear to be a shortcoming from this patch but we need to cover some of these replication functions with an additional test file in another patch. Finally, I wrote a commit message and it is RfC. session 1: postgres=# select * from pg_replication_origin; roident | roname ---------+-------- (0 rows) postgres=# SELECT pg_backend_pid(); pg_backend_pid ---------------- 260732 (1 row) postgres=# SELECT pg_replication_origin_create('test'); pg_replication_origin_create ------------------------------ 1 (1 row) postgres=# SELECT pg_replication_origin_session_setup('test', 0); pg_replication_origin_session_setup ------------------------------------- (1 row) postgres=# select * from pg_replication_origin; roident | roname ---------+-------- 1 | test (1 row) session 2: postgres=# SELECT pg_replication_origin_session_setup('test', 260732); pg_replication_origin_session_setup ------------------------------------- (1 row) session 3: postgres=# SELECT pg_replication_origin_session_setup('test', 12345); ERROR: could not find replication state slot for replication origin with OID 1 which was acquired by 12345 -- Euler Taveira EDB https://www.enterprisedb.com/
From 74a74fd02bce786093c19a23bef9444d0b8ef41d Mon Sep 17 00:00:00 2001 From: Doruk <do...@mixrank.com> Date: Thu, 15 Aug 2024 23:34:26 +0300 Subject: [PATCH v3] pg_replication_origin_session_setup: pid parameter Since the introduction of parallel apply workers (commit 216a784829c), the replorigin_session_setup() was extended to accept an extra parameter: pid. This process ID is used to inform that multiple processes are sharing the same replication origin to apply changes in parallel. The replorigin_session_setup function has a SQL user interface: pg_replication_origin_session_setup. This commit adds an optional parameter that passes the process ID to the internal function replorigin_session_setup. It allows multiple processes to use the same replication origin if you are using the replication functions. --- doc/src/sgml/func.sgml | 8 +++++++- src/backend/catalog/system_functions.sql | 9 ++++++++- src/backend/replication/logical/origin.c | 4 +++- src/include/catalog/pg_proc.dat | 2 +- 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 47370e581a..e50e689fb6 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -29475,7 +29475,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset <indexterm> <primary>pg_replication_origin_session_setup</primary> </indexterm> - <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> ) + <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> ) <returnvalue>void</returnvalue> </para> <para> @@ -29483,6 +29483,12 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset origin, allowing replay progress to be tracked. Can only be used if no origin is currently selected. Use <function>pg_replication_origin_session_reset</function> to undo. + If multiple processes can safely use the same replication origin (for + example, parallel apply processes), the optional <parameter>pid</parameter> + parameter can be used to specify the process ID of the first process. + The first process must provide <parameter>pid</parameter> equals to + <literal>0</literal> and the other processes that share the same + replication origin should provide the process ID of the first process. </para></entry> </row> diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 591157b1d1..26151e0f1c 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -668,6 +668,13 @@ LANGUAGE INTERNAL CALLED ON NULL INPUT VOLATILE AS 'pg_set_attribute_stats'; +CREATE OR REPLACE FUNCTION + pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0) +RETURNS void +LANGUAGE INTERNAL +STRICT VOLATILE +AS 'pg_replication_origin_session_setup'; + -- -- The default permissions for functions mean that anyone can execute them. -- A number of functions shouldn't be executable by just anyone, but rather @@ -769,7 +776,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public; -REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public; +REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public; REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public; diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 1b586cb1cf..9cbe1eec45 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -1355,12 +1355,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS) { char *name; RepOriginId origin; + int pid; replorigin_check_prerequisites(true, false); name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); origin = replorigin_by_name(name, false); - replorigin_session_setup(origin, 0); + pid = PG_GETARG_INT32(1); + replorigin_session_setup(origin, pid); replorigin_session_origin = origin; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index b37e8a6f88..ea118a0563 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12063,7 +12063,7 @@ { oid => '6006', descr => 'configure session to maintain replication progress tracking for the passed in origin', proname => 'pg_replication_origin_session_setup', provolatile => 'v', - proparallel => 'u', prorettype => 'void', proargtypes => 'text', + proparallel => 'u', prorettype => 'void', proargtypes => 'text int4', prosrc => 'pg_replication_origin_session_setup' }, { oid => '6007', descr => 'teardown configured replication progress tracking', -- 2.39.5