Hi, In logical replication, currently Walsender sends the data that is generated locally and the data that are replicated from other instances. This results in infinite recursion in circular logical replication setup. Here the user is trying to have a 2-way replication setup with node 1 publishing data to node2 and node2 publishing data to node1, so that the user can perform dml operations from any node, it can act as a 2-way multi master replication setup.
This problem can be reproduced with the following steps: -- Instance 1 create publication pub1 for table t1; create table t1(c1 int); -- Instance 2 create table t1(c1 int); create publication pub2 for table t1; create subscription sub1 CONNECTION 'dbname=postgres port=5432' publication pub1; -- Instance 1 create subscription sub2 CONNECTION 'dbname=postgres port=5433' publication pub2; insert into t1 values(10); In this scenario, the Walsender in publisher pub1 sends data to the apply worker in subscriber sub1, the apply worker in sub1 maps the data to local tables and applies the individual changes as they are received. Then the Walsender in publisher pub2 sends data to the apply worker in subscriber sub2, the apply worker in sub2 maps the data to local tables and applies the individual changes as they are received. This process repeats infinitely. Currently we do not differentiate if the data is locally generated data, or a replicated data and we send both the data which causes infinite recursion. We could see that the record count has increased significantly within sometime: select count(*) from t1; count -------------- 4000000 (1 row) If the table had primary key constraint, we could notice that the first insert is successful and when the same insert is sent back, the insert fails because of constraint error: 2022-02-23 09:28:43.592 IST [14743] ERROR: duplicate key value violates unique constraint "t1_pkey" 2022-02-23 09:28:43.592 IST [14743] DETAIL: Key (c1)=(10) already exists. 2022-02-23 09:28:43.592 IST [14743] CONTEXT: processing remote data during "INSERT" for replication target relation "public.t1" in transaction 727 at 2022-02-23 09:28:43.406738+05:30 2022-02-23 09:28:43.593 IST [14678] LOG: background worker "logical replication worker" (PID 14743) exited with exit code 1 2022-02-23 09:28:48.608 IST [14745] LOG: logical replication apply worker for subscription "sub2" has started 2022-02-23 09:28:48.624 IST [14745] ERROR: duplicate key value violates unique constraint "t1_pkey" 2022-02-23 09:28:48.624 IST [14745] DETAIL: Key (c1)=(10) already exists. 2022-02-23 09:28:48.624 IST [14745] CONTEXT: processing remote data during "INSERT" for replication target relation "public.t1" in transaction 727 at 2022-02-23 09:28:43.406738+05:30 2022-02-23 09:28:48.626 IST [14678] LOG: background worker "logical replication worker" (PID 14745) exited with exit code 1 The same problem can occur in any circular node setup like 3 nodes, 4node etc like: a) node1 publishing to node2 b) node2 publishing to node3 c) node3 publishing back to node1. Here there are two problems for the user: a) incremental synchronization of table sending both local data and replicated data by walsender b) Table synchronization of table using copy command sending both local data and replicated data For the first problem "Incremental synchronization of table by Walsender" can be solved by: Currently the locally generated data does not have replication origin associated and the data that has originated from another instance will have a replication origin associated. We could use this information to differentiate locally generated data and replicated data and send only the locally generated data. This "only_local" could be provided as an option while subscription is created: ex: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname =postgres port=5433' PUBLICATION pub1 with (only_local = on); I have attached a basic patch for this, if the idea is accepted, I will work further to test more scenarios, add documentation, and test and post an updated patch. For the second problem, Table synchronization of table including local data and replicated data using copy command. Let us consider the following scenario: a) node1 publishing to node2 b) node2 publishing to node1. Here in this case node1 will have replicated data from node2 and vice versa. In the above if user wants to include node3 to subscribe data from node2. Users will have to create a subscription in node3 to get the data from node2. During table synchronization we send the complete table data from node2 to node3. Node2 will have local data from node2 and also replicated data from node1. Currently we don't have an option to differentiate between the locally generated data and replicated data in the heap which will cause infinite recursion as described above. To handle this if user has specified only_local option, we could throw a warning or error out while creating subscription in this case, we could have a column srreplicateddata in pg_subscription_rel which could indicate if the table has any replicated data or not: postgres=# select * from pg_subscription_rel; srsubid | srrelid | srsubstate | srsublsn | srreplicateddata ---------+---------+------------+-----------+------------------ 16389 | 16384 | r | 0/14A4640 | t 16389 | 16385 | r | 0/14A4690 | f (1 row) In the above example, srreplicateddata with true indicates, tabel t1 whose relid is 16384 has replicated data and the other row having srreplicateddata as false indicates table t2 whose relid is 16385 does not have replicated data. When creating a new subscription, the subscriber will connect to the publisher and check if the relation has replicated data by checking srreplicateddata in pg_subscription_rel table. If the table has any replicated data, log a warning or error for this. Also, we could document the steps on how to handle the initial sync like: a) Complete the ongoing transactions on this table in the replication setup nodes i.e. node1 and node2 in the above case, so that the table data is consistent, b) Once there are no ongoing transaction, Copy the table data using copy command from any one of the nodes, c) create subscription with copy_data option as off d) Perform further transactions on the table e) All the further transactions performed will be handled by the walsender which will take care of skipping replicated data and sending only the local data. i.e. node2 will send the locally generated data to node3. I'm not sure if there is any other better way to handle this. If there is a better way, we could handle it accordingly. Thoughts? Regards, Vignesh
From f7bc51504919a5f34265a0f02720f1b0b34fc480 Mon Sep 17 00:00:00 2001 From: Vigneshwaran C <vignesh21@gmail.com> Date: Wed, 23 Feb 2022 11:37:30 +0530 Subject: [PATCH v1] Skip replication of non local data. Add an option only_local which will subscribe only to the locally generated data in the publisher node. If subscriber is created with this option, publisher will skip publishing the data that was subscribed from other nodes. It can be created using following syntax: ex: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname =postgres port=9999' PUBLICATION pub1 with (only_local = on); --- contrib/test_decoding/test_decoding.c | 13 +++++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 3 +- src/backend/commands/subscriptioncmds.c | 20 +++++++++-- .../libpqwalreceiver/libpqwalreceiver.c | 18 ++++++++-- src/backend/replication/logical/decode.c | 36 ++++++++++++++----- src/backend/replication/logical/logical.c | 35 ++++++++++++++++++ src/backend/replication/logical/tablesync.c | 2 +- src/backend/replication/logical/worker.c | 1 + src/backend/replication/pgoutput/pgoutput.c | 25 +++++++++++++ src/backend/replication/slot.c | 4 ++- src/backend/replication/slotfuncs.c | 18 +++++++--- src/backend/replication/walreceiver.c | 2 +- src/backend/replication/walsender.c | 21 ++++++++--- src/bin/psql/tab-complete.c | 2 +- src/include/catalog/pg_proc.dat | 6 ++-- src/include/catalog/pg_subscription.h | 3 ++ src/include/replication/logical.h | 4 +++ src/include/replication/output_plugin.h | 7 ++++ src/include/replication/pgoutput.h | 1 + src/include/replication/slot.h | 5 ++- src/include/replication/walreceiver.h | 8 +++-- src/test/regress/expected/rules.out | 5 +-- src/tools/pgindent/typedefs.list | 1 + 24 files changed, 205 insertions(+), 36 deletions(-) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index ea22649e41..58bc5dbc1c 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -73,6 +73,8 @@ static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static bool pg_decode_filter_remotedata(LogicalDecodingContext *ctx, + RepOriginId origin_id); static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, @@ -148,6 +150,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->truncate_cb = pg_decode_truncate; cb->commit_cb = pg_decode_commit_txn; cb->filter_by_origin_cb = pg_decode_filter; + cb->filter_remotedata_cb = pg_decode_filter_remotedata; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; cb->sequence_cb = pg_decode_sequence; @@ -484,6 +487,16 @@ pg_decode_filter(LogicalDecodingContext *ctx, return false; } +static bool +pg_decode_filter_remotedata(LogicalDecodingContext *ctx, + RepOriginId origin_id) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->only_local && origin_id != InvalidRepOriginId) + return true; + return false; +} /* * Print literal `outputstr' already represented as string of type `typid' * into stringbuf `s'. diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index ca65a8bd20..94e096e5fb 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -69,6 +69,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->binary = subform->subbinary; sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; + sub->onlylocaldata = subform->subonlylocaldata; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 3cb69b1f87..931c549f7c 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -958,7 +958,8 @@ CREATE VIEW pg_replication_slots AS L.confirmed_flush_lsn, L.wal_status, L.safe_wal_size, - L.two_phase + L.two_phase, + L.only_local FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3ef6607d24..3f5cbe2c20 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -61,6 +61,7 @@ #define SUBOPT_BINARY 0x00000080 #define SUBOPT_STREAMING 0x00000100 #define SUBOPT_TWOPHASE_COMMIT 0x00000200 +#define SUBOPT_ONLYLOCAL_DATA 0x00000400 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -82,6 +83,7 @@ typedef struct SubOpts bool binary; bool streaming; bool twophase; + bool onlylocal_data; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -130,6 +132,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->streaming = false; if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT)) opts->twophase = false; + if (IsSet(supported_opts, SUBOPT_ONLYLOCAL_DATA)) + opts->onlylocal_data = false; /* Parse options */ foreach(lc, stmt_options) @@ -228,6 +232,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_STREAMING; opts->streaming = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_ONLYLOCAL_DATA) && + strcmp(defel->defname, "only_local") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_ONLYLOCAL_DATA)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_ONLYLOCAL_DATA; + opts->onlylocal_data = defGetBoolean(defel); + } else if (strcmp(defel->defname, "two_phase") == 0) { /* @@ -390,7 +403,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT | SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT); + SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | + SUBOPT_ONLYLOCAL_DATA); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -460,6 +474,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary); values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming); + values[Anum_pg_subscription_subonlylocaldata - 1] = BoolGetDatum(opts.onlylocal_data); values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(opts.twophase ? LOGICALREP_TWOPHASE_STATE_PENDING : @@ -565,7 +580,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, - CRS_NOEXPORT_SNAPSHOT, NULL); + CRS_NOEXPORT_SNAPSHOT, NULL, + opts.onlylocal_data); if (twophase_enabled) UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 0d89db4e6a..326f60414e 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -75,7 +75,8 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, - XLogRecPtr *lsn); + XLogRecPtr *lsn, + bool onlylocal_data); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const char *query, @@ -453,6 +454,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, PQserverVersion(conn->streamConn) >= 150000) appendStringInfoString(&cmd, ", two_phase 'on'"); + if (options->proto.logical.onlylocal_data && + PQserverVersion(conn->streamConn) >= 150000) + appendStringInfoString(&cmd, ", only_local 'on'"); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) @@ -869,7 +874,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, - XLogRecPtr *lsn) + XLogRecPtr *lsn, bool onlylocal_data) { PGresult *res; StringInfoData cmd; @@ -899,6 +904,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, appendStringInfoChar(&cmd, ' '); } + if (onlylocal_data) + { + appendStringInfoString(&cmd, "ONLY_LOCAL"); + if (use_new_options_syntax) + appendStringInfoString(&cmd, ", "); + else + appendStringInfoChar(&cmd, ' '); + } + if (use_new_options_syntax) { switch (snapshot_action) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 18cf931822..6305b93fc7 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -555,6 +555,15 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) return filter_by_origin_cb_wrapper(ctx, origin_id); } +static inline bool +FilterRemoteOriginData(LogicalDecodingContext *ctx, RepOriginId origin_id) +{ + if (ctx->callbacks.filter_remotedata_cb == NULL) + return false; + + return filter_remotedata_cb_wrapper(ctx, origin_id); +} + /* * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). */ @@ -585,7 +594,8 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) message = (xl_logical_message *) XLogRecGetData(r); if (message->dbId != ctx->slot->data.database || - FilterByOrigin(ctx, origin_id)) + FilterByOrigin(ctx, origin_id) || + FilterRemoteOriginData(ctx, origin_id)) return; if (message->transactional && @@ -864,7 +874,8 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* output plugin doesn't look for this origin, no need to queue */ - if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) || + FilterRemoteOriginData(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); @@ -914,7 +925,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* output plugin doesn't look for this origin, no need to queue */ - if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) || + FilterRemoteOriginData(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); @@ -980,7 +992,8 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* output plugin doesn't look for this origin, no need to queue */ - if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) || + FilterRemoteOriginData(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); @@ -1032,7 +1045,8 @@ DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* output plugin doesn't look for this origin, no need to queue */ - if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) || + FilterRemoteOriginData(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); @@ -1082,7 +1096,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* output plugin doesn't look for this origin, no need to queue */ - if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) || + FilterRemoteOriginData(ctx, XLogRecGetOrigin(r))) return; /* @@ -1175,7 +1190,8 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* output plugin doesn't look for this origin, no need to queue */ - if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) || + FilterRemoteOriginData(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); @@ -1250,7 +1266,8 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, { return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || - ctx->fast_forward || FilterByOrigin(ctx, origin_id)); + ctx->fast_forward || FilterByOrigin(ctx, origin_id) || + FilterRemoteOriginData(ctx, origin_id)); } /* @@ -1335,7 +1352,8 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* output plugin doesn't look for this origin, no need to queue */ - if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) || + FilterRemoteOriginData(ctx, XLogRecGetOrigin(r))) return; tupledata = XLogRecGetData(r); diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 934aa13f2d..19584eaea7 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -246,6 +246,8 @@ StartupDecodingContext(List *output_plugin_options, (ctx->callbacks.stream_sequence_cb != NULL) || (ctx->callbacks.stream_truncate_cb != NULL); + ctx->onlylocal_data = ctx->callbacks.filter_remotedata_cb != NULL; + /* * streaming callbacks * @@ -451,6 +453,8 @@ CreateInitDecodingContext(const char *plugin, */ ctx->twophase &= slot->data.two_phase; + ctx->onlylocal_data &= slot->data.onlylocal_data; + ctx->reorder->output_rewrites = ctx->options.receive_rewrites; return ctx; @@ -1178,6 +1182,37 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) return ret; } +bool +filter_remotedata_cb_wrapper(LogicalDecodingContext *ctx, + RepOriginId origin_id) +{ + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + bool ret; + + Assert(!ctx->fast_forward); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "filter_remoteorigin"; + state.report_location = InvalidXLogRecPtr; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + + /* do the actual work: call callback */ + ret = ctx->callbacks.filter_remotedata_cb(ctx, origin_id); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + + return ret; +} + static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 1659964571..f5093ce8c9 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1224,7 +1224,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) HOLD_INTERRUPTS(); walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ , false /* two_phase */ , - CRS_USE_SNAPSHOT, origin_startpos); + CRS_USE_SNAPSHOT, origin_startpos, false /* only_local */); RESUME_INTERRUPTS(); /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5d9acc6173..15385fb614 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3575,6 +3575,7 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.binary = MySubscription->binary; options.proto.logical.streaming = MySubscription->stream; options.proto.logical.twophase = false; + options.proto.logical.onlylocal_data = MySubscription->onlylocaldata; if (!am_tablesync_worker()) { diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index ea57a0477f..0c9b60bd65 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -55,6 +55,8 @@ static void pgoutput_message(LogicalDecodingContext *ctx, Size sz, const char *message); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static bool pgoutput_remoteorigin_filter(LogicalDecodingContext *ctx, + RepOriginId origin_id); static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, @@ -215,6 +217,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->commit_prepared_cb = pgoutput_commit_prepared_txn; cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; + cb->filter_remotedata_cb = pgoutput_remoteorigin_filter; cb->shutdown_cb = pgoutput_shutdown; /* transaction streaming */ @@ -239,11 +242,13 @@ parse_output_parameters(List *options, PGOutputData *data) bool messages_option_given = false; bool streaming_given = false; bool two_phase_option_given = false; + bool onlylocal_data_given = false; data->binary = false; data->streaming = false; data->messages = false; data->two_phase = false; + data->onlylocal_data = false; foreach(lc, options) { @@ -332,6 +337,16 @@ parse_output_parameters(List *options, PGOutputData *data) data->two_phase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "only_local") == 0) + { + if (onlylocal_data_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + onlylocal_data_given = true; + + data->onlylocal_data = defGetBoolean(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -1450,6 +1465,16 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx, return false; } +static bool +pgoutput_remoteorigin_filter(LogicalDecodingContext *ctx, + RepOriginId origin_id) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + + if (data->onlylocal_data && origin_id != InvalidRepOriginId) + return true; + return false; +} /* * Shutdown the output plugin. * diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 3d39fddaae..429bc1328c 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -253,7 +253,8 @@ ReplicationSlotValidateName(const char *name, int elevel) */ void ReplicationSlotCreate(const char *name, bool db_specific, - ReplicationSlotPersistency persistency, bool two_phase) + ReplicationSlotPersistency persistency, bool two_phase, + bool onlylocal_data) { ReplicationSlot *slot = NULL; int i; @@ -313,6 +314,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.persistency = persistency; slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; + slot->data.onlylocal_data = onlylocal_data; /* and then data only present in shared memory */ slot->just_dirtied = false; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 886899afd2..0e0bc1e940 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -42,7 +42,8 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, - temporary ? RS_TEMPORARY : RS_PERSISTENT, false); + temporary ? RS_TEMPORARY : RS_PERSISTENT, + false, false); if (immediately_reserve) { @@ -118,7 +119,8 @@ static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, XLogRecPtr restart_lsn, - bool find_startpoint) + bool find_startpoint, + bool onlylocal_data) { LogicalDecodingContext *ctx = NULL; @@ -133,7 +135,8 @@ create_logical_replication_slot(char *name, char *plugin, * error as well. */ ReplicationSlotCreate(name, true, - temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase); + temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, + onlylocal_data); /* * Create logical decoding context to find start point or, if we don't @@ -171,6 +174,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) Name plugin = PG_GETARG_NAME(1); bool temporary = PG_GETARG_BOOL(2); bool two_phase = PG_GETARG_BOOL(3); + bool onlylocal_data = PG_GETARG_BOOL(4); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -189,7 +193,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) temporary, two_phase, InvalidXLogRecPtr, - true); + true, + onlylocal_data); values[0] = NameGetDatum(&MyReplicationSlot->data.name); values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); @@ -231,7 +236,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 14 +#define PG_GET_REPLICATION_SLOTS_COLS 15 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -429,6 +434,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.two_phase); + values[i++] = BoolGetDatum(slot_contents.data.onlylocal_data); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(tupstore, tupdesc, values, nulls); @@ -794,6 +801,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) temporary, false, src_restart_lsn, + false, false); } else diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ceaff097b9..cfdefb1f22 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -374,7 +374,7 @@ WalReceiverMain(void) "pg_walreceiver_%lld", (long long int) walrcv_get_backend_pid(wrconn)); - walrcv_create_slot(wrconn, slotname, true, false, 0, NULL); + walrcv_create_slot(wrconn, slotname, true, false, 0, NULL, false); SpinLockAcquire(&walrcv->mutex); strlcpy(walrcv->slotname, slotname, NAMEDATALEN); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 5a718b1fe9..b826326b98 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -963,12 +963,14 @@ static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, - bool *two_phase) + bool *two_phase, + bool *onlylocal_data) { ListCell *lc; bool snapshot_action_given = false; bool reserve_wal_given = false; bool two_phase_given = false; + bool onlylocal_data_given = false; /* Parse options */ foreach(lc, cmd->options) @@ -1019,6 +1021,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, two_phase_given = true; *two_phase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "only_local") == 0) + { + if (onlylocal_data_given || cmd->kind != REPLICATION_KIND_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + onlylocal_data_given = true; + *onlylocal_data = defGetBoolean(defel); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -1035,6 +1046,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) char *slot_name; bool reserve_wal = false; bool two_phase = false; + bool onlylocal_data = false; CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT; DestReceiver *dest; TupOutputState *tstate; @@ -1044,13 +1056,14 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(!MyReplicationSlot); - parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase); + parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase, + &onlylocal_data); if (cmd->kind == REPLICATION_KIND_PHYSICAL) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false); + false, false); } else { @@ -1065,7 +1078,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase); + two_phase, onlylocal_data); } if (cmd->kind == REPLICATION_KIND_LOGICAL) diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 6957567264..d7a4e24167 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -3104,7 +3104,7 @@ psql_completion(const char *text, int start, int end) else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", "enabled", "slot_name", "streaming", - "synchronous_commit", "two_phase"); + "synchronous_commit", "two_phase", "only_local"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 7f1ee97f55..e002563a2a 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -10782,9 +10782,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,only_local}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 18c291289f..6e3119247c 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -65,6 +65,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool substream; /* Stream in-progress transactions. */ + bool subonlylocaldata; /* skip copying of remote origin data */ + char subtwophasestate; /* Stream two-phase transactions */ #ifdef CATALOG_VARLEN /* variable-length fields start here */ @@ -102,6 +104,7 @@ typedef struct Subscription bool binary; /* Indicates if the subscription wants data in * binary format */ bool stream; /* Allow streaming in-progress transactions. */ + bool onlylocaldata; /* Skip copying of remote orging data */ char twophasestate; /* Allow streaming two-phase transactions */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 1097cc9799..82014fe252 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -99,6 +99,8 @@ typedef struct LogicalDecodingContext */ bool twophase_opt_given; + bool onlylocal_data; + /* * State for writing output. */ @@ -138,6 +140,8 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, const char *gid); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); +extern bool filter_remotedata_cb_wrapper(LogicalDecodingContext *ctx, + RepOriginId origin_id); extern void ResetLogicalStreamingState(void); extern void UpdateDecodingStats(LogicalDecodingContext *ctx); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index a16bebf76c..52b5de3eb8 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -106,6 +106,12 @@ typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx, typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, RepOriginId origin_id); +/* + * Filter remote origin changes. + */ +typedef bool (*LogicalDecodeFilterRemoteOriginCB) (struct LogicalDecodingContext *ctx, + RepOriginId origin_id); + /* * Called to shutdown an output plugin. */ @@ -246,6 +252,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeMessageCB message_cb; LogicalDecodeSequenceCB sequence_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; + LogicalDecodeFilterRemoteOriginCB filter_remotedata_cb; LogicalDecodeShutdownCB shutdown_cb; /* streaming of changes at prepare time */ diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index eafedd610a..e8fac6b3f8 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -29,6 +29,7 @@ typedef struct PGOutputData bool streaming; bool messages; bool two_phase; + bool onlylocal_data; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 24b30210c3..833d380b0f 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -94,6 +94,8 @@ typedef struct ReplicationSlotPersistentData */ bool two_phase; + bool onlylocal_data; + /* plugin name */ NameData plugin; } ReplicationSlotPersistentData; @@ -195,7 +197,8 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, - ReplicationSlotPersistency p, bool two_phase); + ReplicationSlotPersistency p, bool two_phase, + bool onlylocal_data); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 92f73a55b8..e62dca9b45 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -183,6 +183,7 @@ typedef struct bool streaming; /* Streaming of large transactions */ bool twophase; /* Streaming of two-phase transactions at * prepare time */ + bool onlylocal_data; } logical; } proto; } WalRcvStreamOptions; @@ -351,7 +352,8 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, - XLogRecPtr *lsn); + XLogRecPtr *lsn, + bool onlylocal_data); /* * walrcv_get_backend_pid_fn @@ -423,8 +425,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) -#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \ - WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) +#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn, onlylocal_data) \ + WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn, onlylocal_data) #define walrcv_get_backend_pid(conn) \ WalReceiverFunctions->walrcv_get_backend_pid(conn) #define walrcv_exec(conn, exec, nRetTypes, retTypes) \ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 1420288d67..dc677e5c67 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1456,8 +1456,9 @@ pg_replication_slots| SELECT l.slot_name, l.confirmed_flush_lsn, l.wal_status, l.safe_wal_size, - l.two_phase - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase) + l.two_phase, + l.only_local + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, only_local) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index c6b302c7b2..0bf093858b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1370,6 +1370,7 @@ LogicalDecodeCommitCB LogicalDecodeCommitPreparedCB LogicalDecodeFilterByOriginCB LogicalDecodeFilterPrepareCB +LogicalDecodeFilterRemoteOriginCB LogicalDecodeMessageCB LogicalDecodePrepareCB LogicalDecodeRollbackPreparedCB -- 2.30.2