On 07/03/17 06:23, Petr Jelinek wrote: > Hi, > > there has been discussion at the logical replication initial copy thread > [1] about making apply work with sync commit off by default for > performance reasons and adding option to change that per subscription. > > Here I propose patch to implement this - it adds boolean column > subssynccommit to pg_subscription catalog which decides if > synchronous_commit should be off or local for apply. And it adds > SYNCHRONOUS_COMMIT = boolean to the list of WITH options for CREATE and > ALTER SUBSCRIPTION. When nothing is specified it will set it to false. > > The patch is built on top of copy patch currently as there are conflicts > between the two and this helps a bit with testing of copy patch. > > [1] > https://www.postgresql.org/message-id/CA+TgmoY7Lk2YKArcp4O=Qu=xoor8j71mad1oteojawmuje3...@mail.gmail.com >
I rebased this patch against recent changes and the latest version of copy patch. -- Petr Jelinek http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
>From 0876a23ddc385a6c4dc8dc26abcd1e82c9ab2482 Mon Sep 17 00:00:00 2001 From: Petr Jelinek <pjmo...@pjmodos.net> Date: Mon, 6 Mar 2017 13:07:45 +0100 Subject: [PATCH] Add option to modify sync commit per subscription This also changes default behaviour of subscription workers to synchronous_commit = off --- doc/src/sgml/catalogs.sgml | 11 ++++++ doc/src/sgml/ref/alter_subscription.sgml | 1 + doc/src/sgml/ref/create_subscription.sgml | 15 ++++++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/commands/subscriptioncmds.c | 59 +++++++++++++++++++++++++----- src/backend/replication/logical/launcher.c | 2 +- src/backend/replication/logical/worker.c | 28 +++++++++++++- src/bin/pg_dump/pg_dump.c | 12 +++++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 5 ++- src/include/catalog/pg_subscription.h | 11 ++++-- src/test/regress/expected/subscription.out | 27 +++++++------- src/test/regress/sql/subscription.sql | 3 +- 13 files changed, 143 insertions(+), 33 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 228ec78..f71d9c9 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -6395,6 +6395,17 @@ </row> <row> + <entry><structfield>subsynccommit</structfield></entry> + <entry><type>bool</type></entry> + <entry></entry> + <entry> + If true, the apply for the subscription will run with + <literal>synchronous_commit</literal> set to <literal>local</literal>. + Otherwise it will have it set to <literal>false</literal>. + </entry> + </row> + + <row> <entry><structfield>subconninfo</structfield></entry> <entry><type>text</type></entry> <entry></entry> diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 6335e17..712de98 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -26,6 +26,7 @@ ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> WITH ( <rep <phrase>where <replaceable class="PARAMETER">suboption</replaceable> can be:</phrase> SLOT NAME = slot_name + | SYNCHRONOUS_COMMIT = boolean ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> SET PUBLICATION publication_name [, ...] { REFRESH WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] ) | NOREFRESH } ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> REFRESH PUBLICATION WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] ) diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 6468470..6baff2f 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -29,6 +29,7 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl | CREATE SLOT | NOCREATE SLOT | SLOT NAME = slot_name | COPY DATA | NOCOPY DATA + | SYNCHRONOUS_COMMIT = boolean | NOCONNECT </synopsis> </refsynopsisdiv> @@ -145,6 +146,20 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl </varlistentry> <varlistentry> + <term><literal>SYNCHRONOUS_COMMIT = <replaceable class="parameter">boolean</replaceable></literal></term> + <listitem> + <para> + Modifies the <literal>synchronous_commit</literal> setting of the + subscription workers. When set to <literal>true</literal>, the + <literal>synchronous_commit</literal> for the worker will be set to + <literal>local</literal> otherwise to <literal>false</literal>. The + default value is <literal>false</literal> independently of the default + <literal>synchronous_commit</literal> setting for the instance. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term>NOCONNECT</term> <listitem> <para> diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 9b74892..26921aa 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -68,6 +68,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->name = pstrdup(NameStr(subform->subname)); sub->owner = subform->subowner; sub->enabled = subform->subenabled; + sub->synccommit = subform->subsynccommit; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index cba2d5c..402f682 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -60,12 +60,13 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, char **slot_name, - bool *copy_data) + bool *copy_data, bool *synchronous_commit) { ListCell *lc; bool connect_given = false; bool create_slot_given = false; bool copy_data_given = false; + bool synchronous_commit_given = false; if (connect) *connect = true; @@ -80,6 +81,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *slot_name = NULL; if (copy_data) *copy_data = true; + if (synchronous_commit) + *synchronous_commit = false; /* Parse options */ foreach (lc, options) @@ -165,6 +168,26 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, copy_data_given = true; *copy_data = !defGetBoolean(defel); } + else if (strcmp(defel->defname, "synchronous_commit") == 0 && synchronous_commit) + { + if (synchronous_commit_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + synchronous_commit_given = true; + *synchronous_commit = defGetBoolean(defel); + } + else if (strcmp(defel->defname, "nosynchronous_commit") == 0 && synchronous_commit) + { + if (synchronous_commit_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + synchronous_commit_given = true; + *synchronous_commit = !defGetBoolean(defel); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -269,6 +292,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool enabled_given; bool enabled; bool copy_data; + bool synchronous_commit; char *conninfo; char *slotname; char originname[NAMEDATALEN]; @@ -280,7 +304,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Connection and publication should not be specified here. */ parse_subscription_options(stmt->options, &connect, &enabled_given, - &enabled, &create_slot, &slotname, ©_data); + &enabled, &create_slot, &slotname, ©_data, + &synchronous_commit); /* * Since creating a replication slot is not transactional, rolling back @@ -330,6 +355,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); + values[Anum_pg_subscription_subsynccommit - 1] = + BoolGetDatum(synchronous_commit); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); values[Anum_pg_subscription_subslotname - 1] = @@ -581,14 +608,26 @@ AlterSubscription(AlterSubscriptionStmt *stmt) { case ALTER_SUBSCRIPTION_OPTIONS: { - char *slot_name; + char *slot_name; + bool synchronous_commit; + Form_pg_subscription form; + + form = (Form_pg_subscription) GETSTRUCT(tup); + synchronous_commit = form->subsynccommit; parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, &slot_name, NULL); + NULL, &slot_name, NULL, + &synchronous_commit); - values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slot_name)); - replaces[Anum_pg_subscription_subslotname - 1] = true; + if (slot_name) + { + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slot_name)); + replaces[Anum_pg_subscription_subslotname - 1] = true; + } + values[Anum_pg_subscription_subsynccommit - 1] = + BoolGetDatum(synchronous_commit); + replaces[Anum_pg_subscription_subsynccommit - 1] = true; update_tuple = true; break; @@ -601,7 +640,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, - NULL, NULL); + NULL, NULL, NULL); Assert(enabled_given); values[Anum_pg_subscription_subenabled - 1] = @@ -626,7 +665,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) Subscription *sub = GetSubscription(subid, false); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data); + NULL, NULL, ©_data, NULL); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -652,7 +691,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) Subscription *sub = GetSubscription(subid, false); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data); + NULL, NULL, ©_data, NULL); AlterSubscription_refresh(sub, copy_data); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 255b225..0d3ec27 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -126,7 +126,7 @@ get_subscription_list(void) */ oldcxt = MemoryContextSwitchTo(resultcxt); - sub = (Subscription *) palloc(sizeof(Subscription)); + sub = (Subscription *) palloc0(sizeof(Subscription)); sub->oid = HeapTupleGetOid(tup); sub->dbid = subform->subdbid; sub->owner = subform->subowner; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fbae40a..5e4b9ba 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1353,6 +1353,21 @@ reread_subscription(void) } /* + * We need to make new connection to new slot if slot name has changed + * so exit here as well if that's the case. + */ + if (strcmp(newsub->slotname, MySubscription->slotname) != 0) + { + ereport(LOG, + (errmsg("logical replication worker for subscription \"%s\" will " + "restart because the replication slot name was changed", + MySubscription->name))); + + walrcv_disconnect(wrconn); + proc_exit(0); + } + + /* * Exit if publication list was changed. The launcher will start * new worker. */ @@ -1384,8 +1399,7 @@ reread_subscription(void) } /* Check for other changes that should never happen too. */ - if (newsub->dbid != MySubscription->dbid || - strcmp(newsub->slotname, MySubscription->slotname) != 0) + if (newsub->dbid != MySubscription->dbid) { elog(ERROR, "subscription %u changed unexpectedly", MyLogicalRepWorker->subid); @@ -1397,6 +1411,11 @@ reread_subscription(void) MemoryContextSwitchTo(oldctx); + /* Change synchronous commit according to the user's wishes */ + SetConfigOption("synchronous_commit", + MySubscription->synccommit ? "local" : "off", + PGC_BACKEND, PGC_S_OVERRIDE); + if (started_tx) CommitTransactionCommand(); @@ -1464,6 +1483,11 @@ ApplyWorkerMain(Datum main_arg) MySubscriptionValid = true; MemoryContextSwitchTo(oldctx); + /* Setup synchronous commit according to the user's wishes */ + SetConfigOption("synchronous_commit", + MySubscription->synccommit ? "local" : "off", + PGC_BACKEND, PGC_S_OVERRIDE); + if (!MySubscription->enabled) { ereport(LOG, diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index e67171d..1a9ebb3 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3625,6 +3625,7 @@ getSubscriptions(Archive *fout) int i_subname; int i_rolname; int i_subenabled; + int i_subsynccommit; int i_subconninfo; int i_subslotname; int i_subpublications; @@ -3642,7 +3643,8 @@ getSubscriptions(Archive *fout) appendPQExpBuffer(query, "SELECT s.tableoid, s.oid, s.subname," "(%s s.subowner) AS rolname, s.subenabled, " - " s.subconninfo, s.subslotname, s.subpublications " + " s.subsynccommit, s.subconninfo, s.subslotname, " + " s.subpublications " "FROM pg_catalog.pg_subscription s " "WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database" " WHERE datname = current_database())", @@ -3656,6 +3658,7 @@ getSubscriptions(Archive *fout) i_subname = PQfnumber(res, "subname"); i_rolname = PQfnumber(res, "rolname"); i_subenabled = PQfnumber(res, "subenabled"); + i_subsynccommit = PQfnumber(res, "subsynccommit"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subpublications = PQfnumber(res, "subpublications"); @@ -3673,6 +3676,8 @@ getSubscriptions(Archive *fout) subinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname)); subinfo[i].subenabled = (strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0); + subinfo[i].subsynccommit = + (strcmp(PQgetvalue(res, i, i_subsynccommit), "t") == 0); subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname)); subinfo[i].subpublications = @@ -3742,6 +3747,11 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo) else appendPQExpBufferStr(query, "DISABLED"); + if (subinfo->subsynccommit) + appendPQExpBufferStr(query, "SYNCHRONOUS_COMMIT = true"); + else + appendPQExpBufferStr(query, "SYNCHRONOUS_COMMIT = false"); + appendPQExpBufferStr(query, ", SLOT NAME = "); appendStringLiteralAH(query, subinfo->subslotname, fout); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index a466527..5934eb0 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -604,6 +604,7 @@ typedef struct _SubscriptionInfo DumpableObject dobj; char *rolname; bool subenabled; + bool subsynccommit; char *subconninfo; char *subslotname; char *subpublications; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 61a3e2a..118a037 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5115,7 +5115,8 @@ describeSubscriptions(const char *pattern, bool verbose) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; - static const bool translate_columns[] = {false, false, false, false, false}; + static const bool translate_columns[] = {false, false, false, false, + false, false}; if (pset.sversion < 100000) { @@ -5141,7 +5142,9 @@ describeSubscriptions(const char *pattern, bool verbose) if (verbose) { appendPQExpBuffer(&buf, + ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", + gettext_noop("Synchronous Commit"), gettext_noop("Conninfo")); } diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0811880..62845e9 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -39,6 +39,7 @@ CATALOG(pg_subscription,6100) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6101) BKI_SCHE bool subenabled; /* True if the subscription is enabled * (the worker should be running) */ + bool subsynccommit; /* Should apply use synchronous commit? */ #ifdef CATALOG_VARLEN /* variable-length fields start here */ text subconninfo; /* Connection string to the publisher */ @@ -54,14 +55,15 @@ typedef FormData_pg_subscription *Form_pg_subscription; * compiler constants for pg_subscription * ---------------- */ -#define Natts_pg_subscription 7 +#define Natts_pg_subscription 8 #define Anum_pg_subscription_subdbid 1 #define Anum_pg_subscription_subname 2 #define Anum_pg_subscription_subowner 3 #define Anum_pg_subscription_subenabled 4 -#define Anum_pg_subscription_subconninfo 5 -#define Anum_pg_subscription_subslotname 6 -#define Anum_pg_subscription_subpublications 7 +#define Anum_pg_subscription_subsynccommit 5 +#define Anum_pg_subscription_subconninfo 6 +#define Anum_pg_subscription_subslotname 7 +#define Anum_pg_subscription_subpublications 8 typedef struct Subscription @@ -71,6 +73,7 @@ typedef struct Subscription char *name; /* Name of the subscription */ Oid owner; /* Oid of the subscription owner */ bool enabled; /* Indicates if the subscription is enabled */ + bool synccommit; /* Indicates if apply should use synchronous commit */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ List *publications; /* List of publication names to subscribe to */ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index d8dc55a..42a4a3c 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -27,19 +27,19 @@ reset client_min_messages; CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (NOCONNECT); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Conninfo ----------+---------------------------+---------+-------------+--------------------- - testsub | regress_subscription_user | f | {testpub} | dbname=doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Synchronous Commit | Conninfo +---------+---------------------------+---------+-------------+--------------------+--------------------- + testsub | regress_subscription_user | f | {testpub} | f | dbname=doesnotexist (1 row) ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3 NOREFRESH; ALTER SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist2'; \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Conninfo ----------+---------------------------+---------+---------------------+---------------------- - testsub | regress_subscription_user | f | {testpub2,testpub3} | dbname=doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Synchronous Commit | Conninfo +---------+---------------------------+---------+---------------------+--------------------+---------------------- + testsub | regress_subscription_user | f | {testpub2,testpub3} | f | dbname=doesnotexist2 (1 row) BEGIN; @@ -66,11 +66,12 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_dummy; ERROR: must be owner of subscription testsub RESET ROLE; ALTER SUBSCRIPTION testsub RENAME TO testsub_foo; -\dRs - List of subscriptions - Name | Owner | Enabled | Publication --------------+---------------------------+---------+--------------------- - testsub_foo | regress_subscription_user | f | {testpub2,testpub3} +ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = true); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Synchronous Commit | Conninfo +-------------+---------------------------+---------+---------------------+--------------------+---------------------- + testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | t | dbname=doesnotexist2 (1 row) -- rename back to keep the rest simple diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 62c99d8..8617654 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -47,8 +47,9 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_dummy; RESET ROLE; ALTER SUBSCRIPTION testsub RENAME TO testsub_foo; +ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = true); -\dRs +\dRs+ -- rename back to keep the rest simple ALTER SUBSCRIPTION testsub_foo RENAME TO testsub; -- 2.7.4
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers