On Sat, 20 Jul 2024 at 20:48, vignesh C <vignes...@gmail.com> wrote:
>
> On Fri, 12 Jul 2024 at 08:22, Peter Smith <smithpb2...@gmail.com> wrote:
> >
> > Hi Vignesh. Here are the rest of my comments for patch v20240705-0003.
> > ======
> >
> > 8. logicalrep_sequence_sync_worker_find
> >
> > +/*
> > + * Walks the workers array and searches for one that matches given
> > + * subscription id.
> > + *
> > + * We are only interested in the sequence sync worker.
> > + */
> > +LogicalRepWorker *
> > +logicalrep_sequence_sync_worker_find(Oid subid, bool only_running)
> >
> > There are other similar functions for walking the workers array to
> > search for a worker. Instead of having different functions for
> > different cases, wouldn't it be cleaner to combine these into a single
> > function, where you pass a parameter (e.g. a mask of worker types that
> > you are interested in finding)?

This is fixed in the v20240730_2 version attached at [1].

> > 17.
> > Also, where does the number 100 come from? Why not 1000? Why not 10?
> > Why have batching at all? Maybe there should be some comment to
> > describe the reason and the chosen value.

I had run some tests with 10/100 and 1000 sequences per batch for
10000 sequences. The results for it:
10 per batch   - 4.94 seconds
100 per batch  - 4.87 seconds
1000 per batch - 4.53 seconds

There is not much time difference between each of them. Currently, it
is set to 100, which seems fine since it will not generate a lot of
transactions. Additionally, the locks on the sequences will be
periodically released during the commit transaction.

I had used the test from the attached patch by changing
max_sequences_sync_per_batch to 10/100/100 in 035_sequences.pl to
verify this.

[1] - 
https://www.postgresql.org/message-id/CALDaNm3%2BXzHAbgyn8gmbBLK5goyv_uyGgHEsTQxRZ8bVk6nAEg%40mail.gmail.com

Regards,
Vignesh
                  |     1000 seq/batch | 100 seq/batch |        10 seq/batch
----------|----------------|---------------|-----------------
Exec time |     4.604323149        | 4.719184017   |    4.945001841
Exec time |     4.518861055        | 4.738708973   |    4.958534002
Exec time |     4.522281885        | 4.873774052   |    4.957324982
Exec time |     4.524350882        | 4.878502131   |    4.943160057
Exec time |     4.531511068        | 4.874341965   |    4.850522995
Median    |     4.524350882        | 4.873774052   |    4.945001841
From d060bd7903d812904816f0ada004420e2d9f0c21 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Tue, 30 Jul 2024 11:34:55 +0530
Subject: [PATCH] Performance testing changes.

Performance testing changes.
---
 src/backend/replication/logical/launcher.c     |   1 +
 src/backend/replication/logical/sequencesync.c |   6 +-
 src/backend/utils/misc/guc_tables.c            |  12 +++
 src/backend/utils/misc/postgresql.conf.sample  |   1 +
 src/include/replication/logicallauncher.h      |   2 +
 src/test/subscription/t/035_sequences.pl       | 142 +++++++++++++++++++++++++
 6 files changed, 162 insertions(+), 2 deletions(-)
 create mode 100644 src/test/subscription/t/035_sequences.pl

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 04d76e7..2ece21f 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -50,6 +50,7 @@
 int			max_logical_replication_workers = 4;
 int			max_sync_workers_per_subscription = 2;
 int			max_parallel_apply_workers_per_subscription = 2;
+int			max_sequences_sync_per_batch = 10;
 
 LogicalRepWorker *MyLogicalRepWorker = NULL;
 
diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c
index fc36bf9..6bcfbdf 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -20,6 +20,7 @@
 #include "catalog/pg_subscription_rel.h"
 #include "commands/sequence.h"
 #include "pgstat.h"
+#include "replication/logicallauncher.h"
 #include "replication/logicalworker.h"
 #include "replication/worker_internal.h"
 #include "utils/acl.h"
@@ -29,6 +30,7 @@
 #include "utils/rls.h"
 #include "utils/usercontext.h"
 
+
 /*
  * fetch_remote_sequence_data
  *
@@ -287,11 +289,11 @@ LogicalRepSyncSequences(void)
 		 * Have we reached the end of the current batch of sequences,
 		 * or last remaining sequences to synchronize?
 		 */
-		if (((curr_seq % MAX_SEQUENCES_SYNC_PER_BATCH) == 0) ||
+		if (((curr_seq % max_sequences_sync_per_batch) == 0) ||
 			curr_seq == seq_count)
 		{
 			/* Obtain the starting index of the current batch. */
-			int			i = (curr_seq - 1) - ((curr_seq - 1) % MAX_SEQUENCES_SYNC_PER_BATCH);
+			int			i = (curr_seq - 1) - ((curr_seq - 1) % max_sequences_sync_per_batch);
 
 			/* LOG all the sequences synchronized during current batch. */
 			for (; i < curr_seq; i++)
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 6a623f5..017fb59 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3185,6 +3185,18 @@ struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"max_sequences_sync_per_batch",
+			PGC_SIGHUP,
+			REPLICATION_SUBSCRIBERS,
+			gettext_noop("Maximum number of sequences to be synchronized in one batch."),
+			NULL,
+		},
+		&max_sequences_sync_per_batch,
+		10, 0, 10000,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"max_parallel_apply_workers_per_subscription",
 			PGC_SIGHUP,
 			REPLICATION_SUBSCRIBERS,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 9ec9f97..82a0713 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -383,6 +383,7 @@
 					# (change requires restart)
 #max_sync_workers_per_subscription = 2	# taken from max_logical_replication_workers
 #max_parallel_apply_workers_per_subscription = 2	# taken from max_logical_replication_workers
+#max_sequences_sync_per_batch = 10
 
 
 #------------------------------------------------------------------------------
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index ff0438b..673f114 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -15,6 +15,8 @@
 extern PGDLLIMPORT int max_logical_replication_workers;
 extern PGDLLIMPORT int max_sync_workers_per_subscription;
 extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_sequences_sync_per_batch;
+
 
 extern void ApplyLauncherRegister(void);
 extern void ApplyLauncherMain(Datum main_arg);
diff --git a/src/test/subscription/t/035_sequences.pl b/src/test/subscription/t/035_sequences.pl
new file mode 100644
index 0000000..e5ac670
--- /dev/null
+++ b/src/test/subscription/t/035_sequences.pl
@@ -0,0 +1,142 @@
+
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# This tests that sequences are synced correctly to the subscriber
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $sequence_count = 10000;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+
+# Avoid checkpoint during the test, otherwise, extra values will be fetched for
+# the sequences which will cause the test to fail randomly.
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'checkpoint_timeout = 1h
+shared_buffers = 40GB
+max_worker_processes = 32
+max_parallel_maintenance_workers = 24
+max_parallel_workers = 32
+synchronous_commit = off
+checkpoint_timeout = 1d
+max_wal_size = 24GB
+min_wal_size = 15GB
+max_locks_per_transaction = 11000
+autovacuum = off'
+);
+
+$node_publisher->start;
+
+# Initialize subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf', 'max_sequences_sync_per_batch = 100
+shared_buffers = 40GB
+max_worker_processes = 32
+max_parallel_maintenance_workers = 24
+max_parallel_workers = 32
+synchronous_commit = off
+checkpoint_timeout = 1d
+max_wal_size = 24GB
+min_wal_size = 15GB
+max_locks_per_transaction = 11000
+autovacuum = off');
+
+$node_subscriber->start;
+
+
+for (my $i = 0; $i < $sequence_count; $i++)
+{
+	$node_publisher->safe_psql('postgres', "CREATE SEQUENCE regress_s$i;");
+	$node_publisher->safe_psql('postgres', "SELECT nextval('regress_s$i') FROM generate_series(1,100);");
+	$node_subscriber->safe_psql('postgres', "CREATE SEQUENCE regress_s$i;");
+}
+
+my $result = 1;
+is($result, '1', "test started");
+use Time::HiRes qw( time );
+my $start = time();
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION regress_seq_pub FOR ALL SEQUENCES");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION regress_seq_sub CONNECTION '$publisher_connstr' PUBLICATION regress_seq_pub"
+);
+
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+
+my $end = time();
+my $runtime = sprintf("%.16s", $end - $start);
+print "test execution time $runtime\n";
+
+$start = time();
+
+$node_subscriber->safe_psql('postgres',
+        "ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION SEQUENCES"
+);
+
+$synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+
+$end = time();
+$runtime = sprintf("%.16s", $end - $start);
+print "test execution time $runtime\n";
+
+$start = time();
+
+$node_subscriber->safe_psql('postgres',
+        "ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION SEQUENCES"
+);
+
+$synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+
+$end = time();
+$runtime = sprintf("%.16s", $end - $start);
+print "test execution time $runtime\n";
+
+$start = time();
+
+$node_subscriber->safe_psql('postgres',
+        "ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION SEQUENCES"
+);
+
+$synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$end = time();
+$runtime = sprintf("%.16s", $end - $start);
+print "test execution time $runtime\n";
+
+$start = time();
+$node_subscriber->safe_psql('postgres',
+        "ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION SEQUENCES"
+);
+
+$synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$end = time();
+$runtime = sprintf("%.16s", $end - $start);
+print "test execution time $runtime\n";
+
+done_testing();
-- 
1.8.3.1

Reply via email to