>From dcfb9952ca3d26fe7aaf3cadaa81f2fa1d60c9d7 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Wed, 24 May 2017 21:07:15 +0200
Subject: [PATCH 3/3] Receive invalidation messages correctly in tablesync
 worker

---
 src/backend/replication/logical/worker.c | 23 +++++++++++++++--------
 1 file changed, 15 insertions(+), 8 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 971f76b..87e8470 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -118,7 +118,7 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
 
 static void store_flush_position(XLogRecPtr remote_lsn);
 
-static void reread_subscription(void);
+static void maybe_reread_subscription(void);
 
 /* Flags set by signal handlers */
 static volatile sig_atomic_t got_SIGHUP = false;
@@ -165,8 +165,7 @@ ensure_transaction(void)
 
 	StartTransactionCommand();
 
-	if (!MySubscriptionValid)
-		reread_subscription();
+	maybe_reread_subscription();
 
 	MemoryContextSwitchTo(ApplyMessageContext);
 	return true;
@@ -463,6 +462,12 @@ apply_handle_commit(StringInfo s)
 
 		store_flush_position(commit_data.end_lsn);
 	}
+	else
+	{
+		/* Process any invalidation messages that might have accumulated. */
+		AcceptInvalidationMessages();
+		maybe_reread_subscription();
+	}
 
 	in_remote_transaction = false;
 
@@ -1119,8 +1124,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			 * now.
 			 */
 			AcceptInvalidationMessages();
-			if (!MySubscriptionValid)
-				reread_subscription();
+			maybe_reread_subscription();
 
 			/* Process any table synchronization changes. */
 			process_syncing_tables(last_received);
@@ -1302,17 +1306,20 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 		last_flushpos = flushpos;
 }
 
-
 /*
- * Reread subscription info and exit on change.
+ * Reread subscription info if needed. Most changes will be exit.
  */
 static void
-reread_subscription(void)
+maybe_reread_subscription(void)
 {
 	MemoryContext oldctx;
 	Subscription *newsub;
 	bool		started_tx = false;
 
+	/* When cache state is valid there is nothing to do here. */
+	if (MySubscriptionValid)
+		return;
+
 	/* This function might be called inside or outside of transaction. */
 	if (!IsTransactionState())
 	{
-- 
2.7.4

