From dd5fa9688850a17cbbb523721a69dde5193ca80f Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Wed, 18 Jan 2023 13:45:32 +0800
Subject: [PATCH v13] Fix the logical replication timeout during large DDLs.

The DDLs like Refresh Materialized views that generate lots of temporary
data due to rewrite rules may not be processed by output plugins (for
example pgoutput). So, we won't send keep-alive messages for a long time
while processing such commands and that can lead the subscriber side to
timeout. We have previously fixed a similar case for large transactions in
commit f95d53eded where the output plugin filters all or most of the
changes but missed to handle the DDLs.

We decided not to backpatch this as this requires a new callback and
moreover, users can increase the wal_sender_timeout and
wal_receiver_timeout to avoid this problem.

Author: Wang wei, Hou Zhijie
Reviewed-by: Peter Smith, Ashutosh Bapat, Shi yu, Amit Kapila
Discussion: https://postgr.es/m/OS3PR01MB6275478E5D29E4A563302D3D9E2B9@OS3PR01MB6275.jpnprd01.prod.outlook.com
Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
---
 src/backend/replication/logical/logical.c       | 50 +++++++++++++++++++++++
 src/backend/replication/logical/reorderbuffer.c | 20 +++++++++
 src/backend/replication/pgoutput/pgoutput.c     | 54 +++----------------------
 src/include/replication/reorderbuffer.h         | 12 ++++++
 src/tools/pgindent/typedefs.list                |  1 +
 5 files changed, 89 insertions(+), 48 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1a58dd7..c3ec97a 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -93,6 +93,11 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx
 static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									   int nrelations, Relation relations[], ReorderBufferChange *change);
 
+/* callback to update txn's progress */
+static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
+										   ReorderBufferTXN *txn,
+										   XLogRecPtr lsn);
+
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
 
 /*
@@ -278,6 +283,12 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
 	ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
 
+	/*
+	 * Callback to support updating progress during sending data of a
+	 * transaction (and its subtransactions) to the output plugin.
+	 */
+	ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
+
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
@@ -1584,6 +1595,45 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static void
+update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+							   XLogRecPtr lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "update_progress_txn";
+	state.report_location = lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+	ctx->write_xid = txn->xid;
+
+	/*
+	 * Report this change's lsn so replies from clients can give an up-to-date
+	 * answer. This won't ever be enough (and shouldn't be!) to confirm
+	 * receipt of this transaction, but it might allow another transaction's
+	 * commit to be confirmed with one message.
+	 */
+	ctx->write_location = lsn;
+
+	ctx->end_xact = false;
+
+	OutputPluginUpdateProgress(ctx, false);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 /*
  * Set the required catalog xmin horizon for historic snapshots in the current
  * replication slot.
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index efe057b..0446bee 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2106,6 +2106,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	PG_TRY();
 	{
 		ReorderBufferChange *change;
+		int			changes_count = 0;	/* used to accumulate the number of
+										 * changes */
 
 		if (using_subtxn)
 			BeginInternalSubTransaction(streaming ? "stream" : "replay");
@@ -2446,6 +2448,24 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					elog(ERROR, "tuplecid value in changequeue");
 					break;
 			}
+
+			/*
+			 * It is possible that the data is not sent to downstream for a
+			 * long time either because the output plugin filtered it or there
+			 * is a DDL that generates a lot of data that is not processed by
+			 * the plugin. So, in such cases, the downstream can timeout. To
+			 * avoid that we try to send a keepalive message if required.
+			 * Trying to send a keepalive message after every change has some
+			 * overhead, but testing showed there is no noticeable overhead if
+			 * we do it after every ~100 changes.
+			 */
+#define CHANGES_THRESHOLD 100
+
+			if (++changes_count >= CHANGES_THRESHOLD)
+			{
+				rb->update_progress_txn(rb, txn, change->lsn);
+				changes_count = 0;
+			}
 		}
 
 		/* speculative insertion record must be freed by now */
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 1a80d67..0ba6b6b 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -92,8 +92,6 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
 static void send_repl_origin(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id, XLogRecPtr origin_lsn,
 							 bool send_origin);
-static void update_replication_progress(LogicalDecodingContext *ctx,
-										bool skipped_xact);
 
 /*
  * Only 3 publication actions are used for row filtering ("insert", "update",
@@ -586,7 +584,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	 * from this transaction has been sent to the downstream.
 	 */
 	sent_begin_txn = txndata->sent_begin_txn;
-	update_replication_progress(ctx, !sent_begin_txn);
+	OutputPluginUpdateProgress(ctx, !sent_begin_txn);
 	pfree(txndata);
 	txn->output_plugin_private = NULL;
 
@@ -625,7 +623,7 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr prepare_lsn)
 {
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -639,7 +637,7 @@ static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 							 XLogRecPtr commit_lsn)
 {
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -655,7 +653,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 							   XLogRecPtr prepare_end_lsn,
 							   TimestampTz prepare_time)
 {
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1401,8 +1399,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
 
-	update_replication_progress(ctx, false);
-
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -1637,8 +1633,6 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	Oid		   *relids;
 	TransactionId xid = InvalidTransactionId;
 
-	update_replication_progress(ctx, false);
-
 	/* Remember the xid for the change in streaming mode. See pgoutput_change. */
 	if (in_streaming)
 		xid = change->txn->xid;
@@ -1702,8 +1696,6 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
 	TransactionId xid = InvalidTransactionId;
 
-	update_replication_progress(ctx, false);
-
 	if (!data->messages)
 		return;
 
@@ -1903,7 +1895,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	Assert(!in_streaming);
 	Assert(rbtxn_is_streamed(txn));
 
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1924,7 +1916,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 {
 	Assert(rbtxn_is_streamed(txn));
 
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
@@ -2424,37 +2416,3 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
 		}
 	}
 }
-
-/*
- * Try to update progress and send a keepalive message if too many changes were
- * processed.
- *
- * For a large transaction, if we don't send any change to the downstream for a
- * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
- * This can happen when all or most of the changes are either not published or
- * got filtered out.
- */
-static void
-update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
-{
-	static int	changes_count = 0;
-
-	/*
-	 * We don't want to try sending a keepalive message after processing each
-	 * change as that can have overhead. Tests revealed that there is no
-	 * noticeable overhead in doing it after continuously processing 100 or so
-	 * changes.
-	 */
-#define CHANGES_THRESHOLD 100
-
-	/*
-	 * If we are at the end of transaction LSN, update progress tracking.
-	 * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
-	 * try to send a keepalive message if required.
-	 */
-	if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
-	{
-		OutputPluginUpdateProgress(ctx, skipped_xact);
-		changes_count = 0;
-	}
-}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index e5db041..215d149 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -526,6 +526,12 @@ typedef void (*ReorderBufferStreamTruncateCB) (
 											   Relation relations[],
 											   ReorderBufferChange *change);
 
+/* update progress txn callback signature */
+typedef void (*ReorderBufferUpdateProgressTxnCB) (
+												  ReorderBuffer *rb,
+												  ReorderBufferTXN *txn,
+												  XLogRecPtr lsn);
+
 struct ReorderBuffer
 {
 	/*
@@ -590,6 +596,12 @@ struct ReorderBuffer
 	ReorderBufferStreamTruncateCB stream_truncate;
 
 	/*
+	 * Callback to be called when updating progress during sending data of a
+	 * transaction (and its subtransactions) to the output plugin.
+	 */
+	ReorderBufferUpdateProgressTxnCB update_progress_txn;
+
+	/*
 	 * Pointer that will be passed untouched to the callbacks.
 	 */
 	void	   *private_data;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 07fbb7c..d3224df 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2311,6 +2311,7 @@ ReorderBufferToastEnt
 ReorderBufferTupleBuf
 ReorderBufferTupleCidEnt
 ReorderBufferTupleCidKey
+ReorderBufferUpdateProgressTxnCB
 ReorderTuple
 RepOriginId
 ReparameterizeForeignPathByChild_function
-- 
1.8.3.1

