From 06bbdf5b71248288f1b094a044941635589a4c68 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Mon, 28 Feb 2022 23:35:42 -0500
Subject: [PATCH v21] Skip empty transactions for logical replication.

The current logical replication behavior is to send every transaction to
subscriber even if the transaction is empty (because it does not
contain changes from the selected publications). It is a waste of CPU
cycles and network bandwidth to build/transmit these empty transactions.

This patch addresses the above problem by postponing the BEGIN message
until the first change. While processing a COMMIT message,
if there was no other change for that transaction,
do not send the COMMIT message. This means that pgoutput will
skip BEGIN/COMMIT messages for transactions that are empty. This patch
also postpones the START STREAM message while streaming large in-progress
transactions until the first change. While processing the STOP STREAM
message, if there was no other change for that transaction, do not send
the STOP STREAM message. The patch also makes sure that in synchronous
replication mode, when skipping empty transactions, keepalive messages
are sent to keep the LSN locations updated on the standby.

This patch does not skip empty transactions that are "two-phase".

Discussion:
https://postgr.es/m/CAMkU=1yohp9-dv48FLoSPrMqYEyyS5ZWkaZGD41RJr10xiNo_Q@mail.gmail.com
---
 src/backend/replication/logical/logical.c   |   4 +-
 src/backend/replication/pgoutput/pgoutput.c | 248 ++++++++++++++++++++++++++--
 src/backend/replication/syncrep.c           |  12 +-
 src/backend/replication/walsender.c         |  31 ++--
 src/include/replication/logical.h           |   3 +-
 src/include/replication/output_plugin.h     |   2 +-
 src/include/replication/syncrep.h           |   1 +
 src/test/subscription/t/020_messages.pl     |   5 +-
 src/tools/pgindent/typedefs.list            |   1 +
 9 files changed, 276 insertions(+), 31 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 934aa13..99b2775 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -683,12 +683,12 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
  * Update progress tracking (if supported).
  */
 void
-OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool send_keepalive)
 {
 	if (!ctx->update_progress)
 		return;
 
-	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid, send_keepalive);
 }
 
 /*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index ea57a04..aafe805 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -67,6 +67,8 @@ static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 										   TimestampTz prepare_time);
 static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
 								  ReorderBufferTXN *txn);
+static void pgoutput_send_stream_start(struct LogicalDecodingContext *ctx,
+									   ReorderBufferTXN *txn);
 static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
 								 ReorderBufferTXN *txn);
 static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
@@ -166,6 +168,20 @@ typedef struct RelationSyncEntry
 	AttrMap    *attrmap;
 } RelationSyncEntry;
 
+/*
+ * Maintain a per-transaction level variable to track whether the
+ * transaction has sent BEGIN. BEGIN is only sent when the first
+ * change in a transaction is processed. Similarly while streaming
+ * transactions, STREAM_START is only sent with the first change.
+ * This makes it possible to skip transactions that are empty.
+ */
+typedef struct PGOutputTxnData
+{
+   bool sent_begin_txn;    /* flag indicating whether BEGIN has been sent */
+   bool sent_stream_start; /* flag indicating if stream start has been sent */
+   bool sent_first_stream;   /* flag indicating if any stream has been sent */
+} PGOutputTxnData;
+
 /* Map used to remember which relation schemas we sent. */
 static HTAB *RelationSyncCache = NULL;
 
@@ -452,15 +468,42 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 }
 
 /*
- * BEGIN callback
+ * BEGIN callback.
+ *
+ * Don't send BEGIN message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on
+ * table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
+ * using bandwidth on something with little/no use for logical replication.
  */
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+	PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
+														 sizeof(PGOutputTxnData));
+
+	txn->output_plugin_private = txndata;
+}
+
+/*
+ * Send BEGIN.
+ *
+ * This is where the BEGIN is actually sent. This is called while processing
+ * the first change of the transaction.
+ */
+static void
+pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
 	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
+	PGOutputTxnData	*txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+	Assert(txndata);
+	Assert(!txndata->sent_begin_txn);
 
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
 	logicalrep_write_begin(ctx->out, txn);
+	txndata->sent_begin_txn = true;
 
 	send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
 					 send_replication_origin);
@@ -475,7 +518,25 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					XLogRecPtr commit_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	PGOutputTxnData	*txndata = (PGOutputTxnData *) txn->output_plugin_private;
+	bool            sent_begin_txn;
+
+	Assert(txndata);
+
+	/*
+	 * If a BEGIN message was not yet sent, then it means there were no relevant
+	 * changes encountered, so we can skip the COMMIT message too.
+	 */
+	sent_begin_txn = txndata->sent_begin_txn;
+	txn->output_plugin_private = NULL;
+	OutputPluginUpdateProgress(ctx, !sent_begin_txn);
+
+	pfree(txndata);
+	if (!sent_begin_txn)
+	{
+		elog(DEBUG1, "Skipping replication of an empty transaction in commit");
+		return;
+	}
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -506,7 +567,7 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr prepare_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -520,7 +581,7 @@ static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 							 XLogRecPtr commit_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -536,7 +597,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 							   XLogRecPtr prepare_end_lsn,
 							   TimestampTz prepare_time)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -556,6 +617,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 	bool		schema_sent;
 	TransactionId xid = InvalidTransactionId;
 	TransactionId topxid = InvalidTransactionId;
+	PGOutputTxnData *txndata;
+	ReorderBufferTXN *toptxn;
 
 	/*
 	 * Remember XID of the (sub)transaction for the change. We don't care if
@@ -569,9 +632,15 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 		xid = change->txn->xid;
 
 	if (change->txn->toptxn)
+	{
 		topxid = change->txn->toptxn->xid;
+		toptxn = change->txn->toptxn;
+	}
 	else
+	{
 		topxid = xid;
+		toptxn = change->txn;
+	}
 
 	/*
 	 * Do we need to send the schema? We do track streamed transactions
@@ -594,6 +663,22 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 	if (schema_sent)
 		return;
 
+   /* set up txndata */
+   txndata = toptxn->output_plugin_private;
+
+	if (in_streaming)
+	{
+		/* If streaming, send STREAM START if we haven't yet */
+		if (txndata && !txndata->sent_stream_start)
+			pgoutput_send_stream_start(ctx, toptxn);
+	}
+	else
+	{
+		/* If not streaming, send BEGIN if we haven't yet */
+		if (txndata && !txndata->sent_begin_txn)
+			pgoutput_send_begin(ctx, toptxn);
+	}
+
 	/*
 	 * Send the schema.  If the changes will be published using an ancestor's
 	 * schema, not the relation's own, send that ancestor's schema before
@@ -1141,6 +1226,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				Relation relation, ReorderBufferChange *change)
 {
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
 	MemoryContext old;
 	RelationSyncEntry *relentry;
 	TransactionId xid = InvalidTransactionId;
@@ -1183,6 +1269,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+   if (in_streaming)
+	{
+		/* If streaming, send STREAM START if we haven't yet */
+		if (txndata && !txndata->sent_stream_start)
+		pgoutput_send_stream_start(ctx, txn);
+	}
+	else
+	{
+		/* If not streaming, send BEGIN if we haven't yet */
+		if (txndata && !txndata->sent_begin_txn)
+		pgoutput_send_begin(ctx, txn);
+	}
+
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -1354,6 +1454,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				  int nrelations, Relation relations[], ReorderBufferChange *change)
 {
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	PGOutputTxnData *txndata;
 	MemoryContext old;
 	RelationSyncEntry *relentry;
 	int			i;
@@ -1397,6 +1498,21 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 	if (nrelids > 0)
 	{
+		txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+		if (in_streaming)
+		{
+			/* If streaming, send STREAM START if we haven't yet */
+			if (txndata && !txndata->sent_stream_start)
+			pgoutput_send_stream_start(ctx, txn);
+		}
+		else
+		{
+			/* If not streaming, send BEGIN if we haven't yet */
+			if (txndata && !txndata->sent_begin_txn)
+			pgoutput_send_begin(ctx, txn);
+		}
+
 		OutputPluginPrepareWrite(ctx, true);
 		logicalrep_write_truncate(ctx->out,
 								  xid,
@@ -1429,6 +1545,28 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (in_streaming)
 		xid = txn->xid;
 
+	/*
+	 * Output BEGIN if we haven't yet.
+	 * Avoid for non-transactional messages.
+	 */
+	if (in_streaming || transactional)
+	{
+		PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+		if (in_streaming)
+		{
+			/* If streaming, send STREAM START if we haven't yet */
+			if (txndata && !txndata->sent_stream_start)
+			pgoutput_send_stream_start(ctx, txn);
+		}
+		else
+		{
+			/* If not streaming, send BEGIN if we haven't yet */
+			if (txndata && !txndata->sent_begin_txn)
+			pgoutput_send_begin(ctx, txn);
+		}
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_message(ctx->out,
 							 xid,
@@ -1511,28 +1649,60 @@ static void
 pgoutput_stream_start(struct LogicalDecodingContext *ctx,
 					  ReorderBufferTXN *txn)
 {
-	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
+	PGOutputTxnData *txndata = txn->output_plugin_private;
 
 	/* we can't nest streaming of transactions */
 	Assert(!in_streaming);
 
 	/*
+	 * Don't actually send stream start here, instead set a flag that indicates
+	 * that stream start hasn't been sent and wait for the first actual change
+	 * for this stream to be sent and then send stream start. This is done
+	 * to avoid sending empty streams without any changes.
+	 */
+	if (txndata == NULL)
+	{
+		txndata =
+			MemoryContextAllocZero(ctx->context, sizeof(PGOutputTxnData));
+		txn->output_plugin_private = txndata;
+	}
+
+	txndata->sent_stream_start = false;
+	in_streaming = true;
+}
+
+/*
+ * Actually send START STREAM
+ */
+static void
+pgoutput_send_stream_start(struct LogicalDecodingContext *ctx,
+					  ReorderBufferTXN *txn)
+{
+	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
+	PGOutputTxnData	*txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+
+	/*
 	 * If we already sent the first stream for this transaction then don't
 	 * send the origin id in the subsequent streams.
 	 */
-	if (rbtxn_is_streamed(txn))
+	if (txndata->sent_first_stream)
 		send_replication_origin = false;
 
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
-	logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
+	logicalrep_write_stream_start(ctx->out, txn->xid, !txndata->sent_first_stream);
 
 	send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
 					 send_replication_origin);
 
 	OutputPluginWrite(ctx, true);
 
-	/* we're streaming a chunk of transaction now */
-	in_streaming = true;
+	/*
+	 * Set the flags that indicate that changes were sent as part of
+	 * the transaction and the stream.
+	 */
+	txndata->sent_begin_txn = txndata->sent_stream_start = true;
+	txndata->sent_first_stream = true;
 }
 
 /*
@@ -1542,9 +1712,18 @@ static void
 pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
 					 ReorderBufferTXN *txn)
 {
+	PGOutputTxnData *data = txn->output_plugin_private;
+
 	/* we should be streaming a trasanction */
 	Assert(in_streaming);
 
+	if (!data->sent_stream_start)
+	{
+		in_streaming = false;
+		elog(DEBUG1, "Skipping replication of an empty transaction in stream stop");
+		return;
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_stop(ctx->out);
 	OutputPluginWrite(ctx, true);
@@ -1563,6 +1742,8 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
 					  XLogRecPtr abort_lsn)
 {
 	ReorderBufferTXN *toptxn;
+	PGOutputTxnData  *txndata;
+	bool sent_first_stream;
 
 	/*
 	 * The abort should happen outside streaming block, even for streamed
@@ -1572,6 +1753,20 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
 
 	/* determine the toplevel transaction */
 	toptxn = (txn->toptxn) ? txn->toptxn : txn;
+	txndata = toptxn->output_plugin_private;
+	sent_first_stream = txndata->sent_first_stream;
+
+	if (txn->toptxn == NULL)
+	{
+		pfree(txndata);
+		txn->output_plugin_private = NULL;
+	}
+
+	if (!sent_first_stream)
+	{
+		elog(DEBUG1, "Skipping replication of an empty transaction in stream abort");
+		return;
+	}
 
 	Assert(rbtxn_is_streamed(toptxn));
 
@@ -1591,6 +1786,9 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn,
 					   XLogRecPtr commit_lsn)
 {
+	PGOutputTxnData *txndata = txn->output_plugin_private;
+	bool			sent_first_stream = txndata->sent_first_stream;
+
 	/*
 	 * The commit should happen outside streaming block, even for streamed
 	 * transactions. The transaction has to be marked as streamed, though.
@@ -1598,7 +1796,21 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	Assert(!in_streaming);
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx);
+	pfree(txndata);
+	txn->output_plugin_private = NULL;
+
+	/*
+	 * If no changes were part of this transaction then drop the commit
+	 * but send the update progress.
+	 */
+	if (!sent_first_stream)
+	{
+		elog(DEBUG1, "Skipping replication of an empty transaction in stream commit");
+		OutputPluginUpdateProgress(ctx, true);
+		return;
+	}
+
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1617,9 +1829,21 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 							ReorderBufferTXN *txn,
 							XLogRecPtr prepare_lsn)
 {
+	PGOutputTxnData *txndata = txn->output_plugin_private;
+	bool			sent_begin_txn = txndata->sent_begin_txn;
+
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx);
+	pfree(txndata);
+	txn->output_plugin_private = NULL;
+
+	if (!sent_begin_txn)
+	{
+		elog(DEBUG1, "Skipping replication of an empty transaction in stream prepare");
+		return;
+	}
+
+	OutputPluginUpdateProgress(ctx, false);
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index ce163b9..11f7358 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -171,8 +171,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 	 * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
 	 * it's false, the lock is not necessary because we don't touch the queue.
 	 */
-	if (!SyncRepRequested() ||
-		!((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+	if (!SyncRepEnabled())
 		return;
 
 	/* Cap the level for anything other than commit to remote flush only. */
@@ -539,6 +538,15 @@ SyncRepReleaseWaiters(void)
 }
 
 /*
+ * Check if synchronous replication is enabled
+ */
+bool
+SyncRepEnabled(void)
+{
+	return SyncRepRequested() && ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined;
+}
+
+/*
  * Calculate the synced Write, Flush and Apply positions among sync standbys.
  *
  * Return false if the number of sync standbys is less than
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5a718b1..a492610 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -242,14 +242,15 @@ static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
 static void ProcessStandbyHSFeedbackMessage(void);
 static void ProcessRepliesIfAny(void);
-static void WalSndKeepalive(bool requestReply);
+static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
 static void WalSndKeepaliveIfNecessary(void);
 static void WalSndCheckTimeOut(void);
 static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
-static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
+								 bool send_keepalive);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
@@ -1449,12 +1450,20 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
  * Write the current position to the lag tracker (see XLogSendPhysical).
  */
 static void
-WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
+					 bool send_keepalive)
 {
 	static TimestampTz sendTime = 0;
 	TimestampTz now = GetCurrentTimestamp();
 
 	/*
+	 * When skipping empty transactions in synchronous replication, we need
+	 * to send a keepalive to keep the MyWalSnd locations updated.
+	 */
+	if (send_keepalive && SyncRepEnabled())
+		WalSndKeepalive(true, ctx->write_location);
+
+	/*
 	 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
 	 * avoid flooding the lag tracker when we commit frequently.
 	 */
@@ -1550,7 +1559,7 @@ WalSndWaitForWal(XLogRecPtr loc)
 		if (MyWalSnd->flush < sentPtr &&
 			MyWalSnd->write < sentPtr &&
 			!waiting_for_ping_response)
-			WalSndKeepalive(false);
+			WalSndKeepalive(false, 0);
 
 		/* check whether we're done */
 		if (loc <= RecentFlushPtr)
@@ -2068,7 +2077,7 @@ ProcessStandbyReplyMessage(void)
 
 	/* Send a reply if the standby requested one. */
 	if (replyRequested)
-		WalSndKeepalive(false);
+		WalSndKeepalive(false, 0);
 
 	/*
 	 * Update shared state for this WalSender process based on reply data from
@@ -3074,7 +3083,7 @@ WalSndDone(WalSndSendDataCallback send_data)
 		proc_exit(0);
 	}
 	if (!waiting_for_ping_response)
-		WalSndKeepalive(true);
+		WalSndKeepalive(true, 0);
 }
 
 /*
@@ -3588,18 +3597,20 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  *
  * If requestReply is set, the message requests the other party to send
  * a message back to us, for heartbeat purposes.  We also set a flag to
- * let nearby code that we're waiting for that response, to avoid
+ * let nearby code know that we're waiting for that response, to avoid
  * repeated requests.
+ *
+ * If writePtr is set, mark that as the LSN processed, else use sentPtr.
  */
 static void
-WalSndKeepalive(bool requestReply)
+WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
 {
 	elog(DEBUG2, "sending replication keepalive");
 
 	/* construct the message... */
 	resetStringInfo(&output_message);
 	pq_sendbyte(&output_message, 'k');
-	pq_sendint64(&output_message, sentPtr);
+	pq_sendint64(&output_message, writePtr ? writePtr : sentPtr);
 	pq_sendint64(&output_message, GetCurrentTimestamp());
 	pq_sendbyte(&output_message, requestReply ? 1 : 0);
 
@@ -3638,7 +3649,7 @@ WalSndKeepaliveIfNecessary(void)
 											wal_sender_timeout / 2);
 	if (last_processing >= ping_time)
 	{
-		WalSndKeepalive(true);
+		WalSndKeepalive(true, 0);
 
 		/* Try to flush pending output to the client */
 		if (pq_flush_if_writable() != 0)
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 1097cc9..9f59855 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,7 +26,8 @@ typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
 typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr,
 														 XLogRecPtr Ptr,
-														 TransactionId xid
+														 TransactionId xid,
+														 bool send_keepalive
 );
 
 typedef struct LogicalDecodingContext
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index a16bebf..eb91d17 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -270,6 +270,6 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
-extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool send_keepalive);
 
 #endif							/* OUTPUT_PLUGIN_H */
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 27be230..7086532 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -90,6 +90,7 @@ extern void SyncRepCleanupAtProcExit(void);
 /* called by wal sender */
 extern void SyncRepInitConfig(void);
 extern void SyncRepReleaseWaiters(void);
+extern bool SyncRepEnabled(void);
 
 /* called by wal sender and user backend */
 extern int	SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys);
diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl
index b5045ff..d21d929 100644
--- a/src/test/subscription/t/020_messages.pl
+++ b/src/test/subscription/t/020_messages.pl
@@ -87,9 +87,8 @@ $result = $node_publisher->safe_psql(
 			'publication_names', 'tap_pub')
 ));
 
-# 66 67 == B C == BEGIN COMMIT
-is( $result, qq(66
-67),
+# no message and no BEGIN and COMMIT because of empty transaction optimization
+is($result, qq(),
 	'option messages defaults to false so message (M) is not available on slot'
 );
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d9b83f7..77f33b2 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1608,6 +1608,7 @@ PGMessageField
 PGModuleMagicFunction
 PGNoticeHooks
 PGOutputData
+PGOutputTxnData
 PGPROC
 PGP_CFB
 PGP_Context
-- 
1.8.3.1

