Hi Kuroda Hayato,

> ===
> 01. relation.c - GetCheapestReplicaIdentityFullPath
>
> ```
>          * The reason that the planner would not pick partial indexes and
> indexes
>          * with only expressions based on the way currently
> baserestrictinfos are
>          * formed (e.g., col_1 = $1 ... AND col_N = $2).
> ```
>
> Is "col_N = $2" a typo? I think it should be "col_N = $N" or "attr1 = $1
> ... AND attrN = $N".
>
>
Yes, it is a typo, fixed now.


> ===
> 02. 032_subscribe_use_index.pl
>
> If a table has a primary key on the subscriber, it will be used even if
> enable_indexscan is false(legacy behavior).
> Should we test it?
>
>
Yes, good idea. I added two tests, one test that we cannot use regular
indexes when index scan is disabled, and another one that we use replica
identity index when index scan is disabled. This is useful to make sure if
someone changes the behavior can see the impact.


> ~~~
> 03. 032_subscribe_use_index.pl -  SUBSCRIPTION RE-CALCULATES INDEX AFTER
> CREATE/DROP INDEX
>
> I think this test seems to be not trivial, so could you write down the
> motivation?
>

makes sense, done


>
> ~~~
> 04. 032_subscribe_use_index.pl -  SUBSCRIPTION RE-CALCULATES INDEX AFTER
> CREATE/DROP INDEX
>
> ```
> # wait until the index is created
> $node_subscriber->poll_query_until(
>         'postgres', q{select count(*)=1 from pg_stat_all_indexes where
> indexrelname = 'test_replica_id_full_idx';}
> ) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0
> updates one row via index";
> ```
>
> CREATE INDEX is a synchronous behavior, right? If so we don't have to wait
> here.
> ...And the comment in case of die may be wrong.
> (There are some cases like this)
>

It is not about CREATE INDEX being async. It is about pg_stat_all_indexes
being async. If we do not wait, the tests become flaky, because sometimes
the update has not been reflected in the view immediately.

This is explained here: PostgreSQL: Documentation: 14: 28.2. The Statistics
Collector <https://www.postgresql.org/docs/current/monitoring-stats.html>

*When using the statistics to monitor collected data, it is important to
> realize that the information does not update instantaneously. Each
> individual server process transmits new statistical counts to the collector
> just before going idle; so a query or transaction still in progress does
> not affect the displayed totals. Also, the collector itself emits a new
> report at most once per PGSTAT_STAT_INTERVAL milliseconds (500 ms unless
> altered while building the server). So the displayed information lags
> behind actual activity. However, current-query information collected by
> track_activities is always up-to-date.*
>



>
> ~~~
> 05. 032_subscribe_use_index.pl - SUBSCRIPTION USES INDEX UPDATEs MULTIPLE
> ROWS
>
> ```
> # Testcase start: SUBSCRIPTION USES INDEX UPDATEs MULTIPLE ROWS
> #
> # Basic test where the subscriber uses index
> # and touches 50 rows with UPDATE
> ```
>
> "touches 50 rows with UPDATE" -> "updates 50 rows", per other tests.
>
> fixed


> ~~~
> 06. 032_subscribe_use_index.pl - SUBSCRIPTION CAN UPDATE THE INDEX IT
> USES AFTER ANALYZE
>
> I think this test seems to be not trivial, so could you write down the
> motivation?
> (Same as Re-calclate)
>

sure, done


>
> ~~~
> 07. 032_subscribe_use_index.pl - SUBSCRIPTION CAN UPDATE THE INDEX IT
> USES AFTER ANALYZE
>
> ```
> # show that index_b is not used
> $node_subscriber->poll_query_until(
>         'postgres', q{select idx_scan=0 from pg_stat_all_indexes where
> indexrelname = 'index_b';}
> ) or die "Timed out while waiting for check subscriber tap_sub_rep_full
> updates two rows via index scan with index on high cardinality column-2";
> ```
>
> I think we don't have to wait here, is() should be used instead.
> poll_query_until() should be used only when idx_scan>0 is checked.
> (There are some cases like this)
>

Yes, makes sense


>
> ~~~
> 08. 032_subscribe_use_index.pl - SUBSCRIPTION USES INDEX ON PARTITIONED
> TABLES
>
> ```
> # make sure that the subscriber has the correct data
> $node_subscriber->poll_query_until(
>         'postgres', q{select sum(user_id+value_1+value_2)=550070 AND
> count(DISTINCT(user_id,value_1, value_2))=981 from users_table_part;}
> ) or die "ensure subscriber has the correct data at the end of the test";
> ```
>
>
Ah, for this case, we already have is() checks for the same results, this
is just a left-over from the earlier iterations


> I think we can replace it to wait_for_catchup() and is()...
> Moreover, we don't have to wait here because in above line we wait until
> the index is used on the subscriber.
> (There are some cases like this)
>

Fixed a few more such cases.

Thanks for the review! Attached v16.

Onder KALACI
From 1ebc170f821f290283aadd10c8c04480ad203580 Mon Sep 17 00:00:00 2001
From: Onder Kalaci <onderkalaci@gmail.com>
Date: Tue, 17 May 2022 10:47:39 +0200
Subject: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full
 on the publisher

Using `REPLICA IDENTITY FULL` on the publication leads to a full table
scan per tuple change on the subscription. This makes `REPLICA
IDENTITY FULL` impracticable -- probably other than some small number
of use cases.

With this patch, I'm proposing the following change: If there is any
index on the subscriber, let the planner sub-modules compare the costs
of index versus sequential scan and choose the cheapest. The index
should be a btree index, not a partial index, and it should have at
least one column reference (e.g., cannot consist of only expressions).

The majority of the logic on the subscriber side already exists in
the code. The subscriber is already capable of doing (unique) index
scans.  With this patch, we are allowing the index to iterate over the
tuples fetched and only act when tuples are equal. Anyone familiar
with this part of the code might recognize that the sequential scan
code on the subscriber already implements the `tuples_equal()`
function. In short, the changes on the subscriber are mostly
combining parts of (unique) index scan and sequential scan codes.

The decision on whether to use an index (or which index) is mostly
derived from planner infrastructure. The idea is that on the
subscriber we have all the columns. So, construct all the
`Path`s with the restrictions on all columns, such as
`col_1 = $1 AND col_2 = $2 ... AND col_n = $N`. Finally, let the
planner sub-module -- `make_one_rel()` -- to give us the relevant
index `Path`s. On top of that, add the sequential scan `Path` as
well. Finally, pick the cheapest `Path` among.

From the performance point of view, there are a few things to note.
First, the patch aims not to change the behavior when PRIMARY KEY
or UNIQUE INDEX is used. Second, when REPLICA IDENTITY FULL is on
the publisher and an index is used on the subscriber, the
difference mostly comes down to `index scan` vs `sequential scan`.
That's why it is hard to claim certain number of improvements.
It mostly depends on the data size, index and the data distribution.

Still, below I try to showcase the potential improvements using an
index on the subscriber `pgbench_accounts(bid)`. With the index,
all the changes are replicated within ~5 seconds. When the index
is dropped, the same operation takes around ~300 seconds.

// init source db
pgbench -i -s 100 -p 5432 postgres
psql -c "ALTER TABLE pgbench_accounts DROP CONSTRAINT pgbench_accounts_pkey;" -p 5432 postgres
psql -c "CREATE INDEX i1 ON pgbench_accounts(aid);" -p 5432 postgres
psql -c "ALTER TABLE pgbench_accounts REPLICA IDENTITY FULL;" -p 5432 postgres
psql -c "CREATE PUBLICATION pub_test_1 FOR TABLE pgbench_accounts;" -p 5432 postgres

// init target db, drop existing primary key
pgbench -i -p 9700 postgres
psql -c "TRUNCATE pgbench_accounts;" -p 9700 postgres
psql -c "ALTER TABLE pgbench_accounts DROP CONSTRAINT pgbench_accounts_pkey;" -p 9700 postgres
psql -c "CREATE SUBSCRIPTION sub_test_1 CONNECTION 'host=localhost port=5432 user=onderkalaci dbname=postgres' PUBLICATION pub_test_1;" -p 9700 postgres

// create one index, even on a low cardinality column
psql -c "CREATE INDEX i2 ON pgbench_accounts(bid);" -p 9700 postgres

// now, run some pgbench tests and observe replication
pgbench -t 500 -b tpcb-like -p 5432 postgres
---
 doc/src/sgml/logical-replication.sgml         |    8 +-
 src/backend/executor/execReplication.c        |  124 +-
 src/backend/replication/logical/relation.c    |  423 +++++-
 src/backend/replication/logical/worker.c      |   86 +-
 src/include/replication/logicalrelation.h     |   24 +-
 src/test/subscription/meson.build             |    1 +
 .../subscription/t/032_subscribe_use_index.pl | 1202 +++++++++++++++++
 7 files changed, 1777 insertions(+), 91 deletions(-)
 create mode 100644 src/test/subscription/t/032_subscribe_use_index.pl

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index e98538e540..68dbef3ec2 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -132,8 +132,12 @@
    certain additional requirements) can also be set to be the replica
    identity.  If the table does not have any suitable key, then it can be set
    to replica identity <quote>full</quote>, which means the entire row becomes
-   the key.  This, however, is very inefficient and should only be used as a
-   fallback if no other solution is possible.  If a replica identity other
+   the key. If replica identity <quote>full</quote> is used, indexes can be
+   used on the subscriber side for seaching the rows. The index should be
+   btree, non-partial and have at least one column reference (e.g.,
+   should not consists of only expressions). If there are no suitable indexes,
+   the search on the subscriber side is very inefficient and should only be
+   used as a fallback if no other solution is possible.  If a replica identity other
    than <quote>full</quote> is set on the publisher side, a replica identity
    comprising the same or fewer columns must also be set on the subscriber
    side.  See <xref linkend="sql-altertable-replica-identity"/> for details on
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 6014f2e248..401ece45ce 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -25,6 +25,9 @@
 #include "nodes/nodeFuncs.h"
 #include "parser/parse_relation.h"
 #include "parser/parsetree.h"
+#ifdef USE_ASSERT_CHECKING
+#include "replication/logicalrelation.h"
+#endif
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "utils/builtins.h"
@@ -37,28 +40,29 @@
 #include "utils/typcache.h"
 
 
+static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
+						 TypeCacheEntry **eq);
+
 /*
  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
  * is setup to match 'rel' (*NOT* idxrel!).
  *
- * Returns whether any column contains NULLs.
+ * Returns how many columns should be used for the index scan.
  *
- * This is not generic routine, it expects the idxrel to be replication
- * identity of a rel and meet all limitations associated with that.
+ * This is not a generic routine - it expects the idxrel to be an index
+ * that planner would choose if the searchslot includes all the columns
+ * (e.g., REPLICA IDENTITY FULL on the source).
  */
-static bool
+static int
 build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 						 TupleTableSlot *searchslot)
 {
-	int			attoff;
+	int			index_attoff;
+	int			skey_attoff = 0;
 	bool		isnull;
 	Datum		indclassDatum;
 	oidvector  *opclass;
 	int2vector *indkey = &idxrel->rd_index->indkey;
-	bool		hasnulls = false;
-
-	Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
-		   RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
 
 	indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
 									Anum_pg_index_indclass, &isnull);
@@ -66,20 +70,53 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 	opclass = (oidvector *) DatumGetPointer(indclassDatum);
 
 	/* Build scankey for every attribute in the index. */
-	for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
+	for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
+		 index_attoff++)
 	{
 		Oid			operator;
 		Oid			opfamily;
+		Oid			optype = get_opclass_input_type(opclass->values[index_attoff]);
 		RegProcedure regop;
-		int			pkattno = attoff + 1;
-		int			mainattno = indkey->values[attoff];
-		Oid			optype = get_opclass_input_type(opclass->values[attoff]);
+		int			table_attno = indkey->values[index_attoff];
+
+		if (!AttributeNumberIsValid(table_attno))
+		{
+			/*
+			 * This attribute is an expression, and
+			 * SuitablePathsForRepIdentFull() was called earlier when the
+			 * index for subscriber was selected. There, the indexes comprising
+			 * *only* expressions have already been eliminated.
+			 *
+			 * We sanity check this now.
+			 */
+#ifdef USE_ASSERT_CHECKING
+			IndexInfo  *indexInfo = BuildIndexInfo(idxrel);
+			Assert(!IsIndexOnlyOnExpression(indexInfo));
+#endif
+
+			/*
+			 * Furthermore, because primary key and unique key indexes can't
+			 * include expressions we also sanity check the index is neither
+			 * of those kinds.
+			 */
+			Assert(RelationGetReplicaIndex(rel) != RelationGetRelid(idxrel) &&
+				   RelationGetPrimaryKeyIndex(rel) != RelationGetRelid(idxrel));
+
+			/*
+			 * XXX: For a non-primary/unique index with an additional
+			 * expression, we do not have to continue at this point. However,
+			 * the below code assumes the index scan is only done for simple
+			 * column references. If we can relax the assumption in the below
+			 * code-block, we can also remove the continue.
+			 */
+			continue;
+		}
 
 		/*
 		 * Load the operator info.  We need this to get the equality operator
 		 * function for the scan key.
 		 */
-		opfamily = get_opclass_family(opclass->values[attoff]);
+		opfamily = get_opclass_family(opclass->values[index_attoff]);
 
 		operator = get_opfamily_member(opfamily, optype,
 									   optype,
@@ -91,23 +128,25 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 		regop = get_opcode(operator);
 
 		/* Initialize the scankey. */
-		ScanKeyInit(&skey[attoff],
-					pkattno,
+		ScanKeyInit(&skey[skey_attoff],
+					index_attoff + 1,
 					BTEqualStrategyNumber,
 					regop,
-					searchslot->tts_values[mainattno - 1]);
+					searchslot->tts_values[table_attno - 1]);
 
-		skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
+		skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
 
 		/* Check for null value. */
-		if (searchslot->tts_isnull[mainattno - 1])
-		{
-			hasnulls = true;
-			skey[attoff].sk_flags |= SK_ISNULL;
-		}
+		if (searchslot->tts_isnull[table_attno - 1])
+			skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
+
+		skey_attoff++;
 	}
 
-	return hasnulls;
+	/* There should always be at least one attribute for the index scan. */
+	Assert(skey_attoff > 0);
+
+	return skey_attoff;
 }
 
 /*
@@ -123,33 +162,50 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 							 TupleTableSlot *outslot)
 {
 	ScanKeyData skey[INDEX_MAX_KEYS];
+	int			skey_attoff;
 	IndexScanDesc scan;
 	SnapshotData snap;
 	TransactionId xwait;
 	Relation	idxrel;
 	bool		found;
+	TypeCacheEntry **eq = NULL; /* only used when the index is not unique */
+	bool		indisunique;
 
 	/* Open the index. */
 	idxrel = index_open(idxoid, RowExclusiveLock);
+	indisunique = idxrel->rd_index->indisunique;
 
-	/* Start an index scan. */
 	InitDirtySnapshot(snap);
-	scan = index_beginscan(rel, idxrel, &snap,
-						   IndexRelationGetNumberOfKeyAttributes(idxrel),
-						   0);
 
 	/* Build scan key. */
-	build_replindex_scan_key(skey, rel, idxrel, searchslot);
+	skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
+
+	/* Start an index scan. */
+	scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
 
 retry:
 	found = false;
 
-	index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
+	index_rescan(scan, skey, skey_attoff, NULL, 0);
 
 	/* Try to find the tuple */
-	if (index_getnext_slot(scan, ForwardScanDirection, outslot))
+	while (index_getnext_slot(scan, ForwardScanDirection, outslot))
 	{
-		found = true;
+		/* Avoid expensive equality check if index is unique */
+		if (!indisunique)
+		{
+			/*
+			 * We only need to allocate once. This is allocated within per
+			 * tuple context -- ApplyMessageContext -- hence no need to
+			 * explicitly pfree().
+			 */
+			if (eq == NULL)
+				eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
+
+			if (!tuples_equal(outslot, searchslot, eq))
+				continue;
+		}
+
 		ExecMaterializeSlot(outslot);
 
 		xwait = TransactionIdIsValid(snap.xmin) ?
@@ -164,6 +220,10 @@ retry:
 			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
 			goto retry;
 		}
+
+		/* Found our tuple and it's not locked */
+		found = true;
+		break;
 	}
 
 	/* Found tuple, try to lock it in the lockmode. */
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index e989047681..c3b9151ef4 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -17,38 +17,36 @@
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/table.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_am_d.h"
 #include "catalog/pg_subscription_rel.h"
+#include "catalog/pg_operator.h"
+#include "commands/defrem.h"
 #include "executor/executor.h"
 #include "nodes/makefuncs.h"
 #include "replication/logicalrelation.h"
 #include "replication/worker_internal.h"
+#include "optimizer/cost.h"
+#include "optimizer/paramassign.h"
+#include "optimizer/pathnode.h"
+#include "optimizer/paths.h"
+#include "optimizer/plancat.h"
+#include "optimizer/restrictinfo.h"
 #include "utils/inval.h"
+#include "utils/typcache.h"
 
 
 static MemoryContext LogicalRepRelMapContext = NULL;
-
 static HTAB *LogicalRepRelMap = NULL;
-
-/*
- * Partition map (LogicalRepPartMap)
- *
- * When a partitioned table is used as replication target, replicated
- * operations are actually performed on its leaf partitions, which requires
- * the partitions to also be mapped to the remote relation.  Parent's entry
- * (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because
- * individual partitions may have different attribute numbers, which means
- * attribute mappings to remote relation's attributes must be maintained
- * separately for each partition.
- */
 static MemoryContext LogicalRepPartMapContext = NULL;
+
+/* For LogicalRepPartMap details see LogicalRepPartMapEntry in logicalrelation.h */
 static HTAB *LogicalRepPartMap = NULL;
-typedef struct LogicalRepPartMapEntry
-{
-	Oid			partoid;		/* LogicalRepPartMap's key */
-	LogicalRepRelMapEntry relmapentry;
-} LogicalRepPartMapEntry;
+
+static Oid	FindLogicalRepUsableIndex(Relation localrel,
+									  LogicalRepRelation *remoterel);
 
 /*
  * Relcache invalidation callback for our relation map cache.
@@ -438,6 +436,14 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 		 */
 		logicalrep_rel_mark_updatable(entry);
 
+		/*
+		 * Finding a usable index is an infrequent task. It occurs when an
+		 * operation is first performed on the relation, or after invalidation
+		 * of the relation cache entry (e.g., such as ANALYZE or
+		 * CREATE/DROP index on the relation).
+		 */
+		entry->usableIndexOid = FindLogicalRepUsableIndex(entry->localrel, remoterel);
+
 		entry->localrelvalid = true;
 	}
 
@@ -581,7 +587,7 @@ logicalrep_partmap_init(void)
  * Note there's no logicalrep_partition_close, because the caller closes the
  * component relation.
  */
-LogicalRepRelMapEntry *
+LogicalRepPartMapEntry *
 logicalrep_partition_open(LogicalRepRelMapEntry *root,
 						  Relation partrel, AttrMap *map)
 {
@@ -615,7 +621,7 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 	if (found && entry->localrelvalid)
 	{
 		entry->localrel = partrel;
-		return entry;
+		return part_entry;
 	}
 
 	/* Switch to longer-lived context. */
@@ -696,10 +702,385 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 	/* Set if the table's replica identity is enough to apply update/delete. */
 	logicalrep_rel_mark_updatable(entry);
 
+	/*
+	 * Finding a usable index is an infrequent task. It occurs when an
+	 * operation is first performed on the relation, or after invalidation of
+	 * of the relation cache entry (e.g., such as ANALYZE or
+	 * CREATE/DROP index on the relation).
+	 */
+	entry->usableIndexOid = FindLogicalRepUsableIndex(partrel, remoterel);
+
 	entry->localrelvalid = true;
 
 	/* state and statelsn are left set to 0. */
 	MemoryContextSwitchTo(oldctx);
 
-	return entry;
+	return part_entry;
+}
+
+/*
+ * Returns a valid index oid if the input path is an index path.
+ *
+ * Otherwise, returns InvalidOid.
+ */
+static Oid
+GetIndexOidFromPath(Path *path)
+{
+	if (path->pathtype == T_IndexScan || path->pathtype == T_IndexOnlyScan)
+	{
+		IndexPath  *index_sc = (IndexPath *) path;
+
+		return index_sc->indexinfo->indexoid;
+	}
+
+	return InvalidOid;
+}
+
+/*
+ * Returns true if the given index consists only of expressions such as:
+ * 	CREATE INDEX idx ON table(foo(col));
+ *
+ * Returns false even if there is one column reference:
+ * 	 CREATE INDEX idx ON table(foo(col), col_2);
+ */
+bool
+IsIndexOnlyOnExpression(IndexInfo *indexInfo)
+{
+	for (int i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
+	{
+		AttrNumber	attnum = indexInfo->ii_IndexAttrNumbers[i];
+
+		if (AttributeNumberIsValid(attnum))
+			return false;
+	}
+
+	return true;
+}
+
+/*
+ * Iterates over the input path list and returns another path list that
+ * includes index [only] scans where paths with non-btree indexes, partial
+ * indexes or indexes on only expressions have been removed.
+ */
+static List *
+SuitablePathsForRepIdentFull(List *pathlist)
+{
+	ListCell   *lc;
+	List	   *suitableIndexList = NIL;
+
+	foreach(lc, pathlist)
+	{
+		Path	   *path = (Path *) lfirst(lc);
+		Oid			idxoid = GetIndexOidFromPath(path);
+
+		if (!OidIsValid(idxoid))
+		{
+			/* Unrelated Path, skip */
+			continue;
+		}
+		else
+		{
+			Relation	indexRelation;
+			IndexInfo  *indexInfo;
+			bool		is_btree;
+			bool		is_partial;
+			bool		is_only_on_expression;
+
+			indexRelation = index_open(idxoid, AccessShareLock);
+			indexInfo = BuildIndexInfo(indexRelation);
+			is_btree = (indexInfo->ii_Am == BTREE_AM_OID);
+			is_partial = (indexInfo->ii_Predicate != NIL);
+			is_only_on_expression = IsIndexOnlyOnExpression(indexInfo);
+			index_close(indexRelation, AccessShareLock);
+
+			/* eliminating not suitable index scan path */
+			if (is_btree && !is_partial && !is_only_on_expression)
+				suitableIndexList = lappend(suitableIndexList, path);
+		}
+	}
+
+	return suitableIndexList;
+}
+
+/*
+ * This is not a generic function. It is a helper function for
+ * GetCheapestReplicaIdentityFullPath. The function creates a dummy PlannerInfo
+ * for the given relationId as if the relation is queried with SELECT command.
+ */
+static PlannerInfo *
+GenerateDummySelectPlannerInfoForRelation(Oid relationId)
+{
+	PlannerInfo *root;
+	Query	   *query;
+	PlannerGlobal *glob;
+	RangeTblEntry *rte;
+
+	/* Set up mostly-dummy planner state */
+	query = makeNode(Query);
+	query->commandType = CMD_SELECT;
+
+	glob = makeNode(PlannerGlobal);
+
+	root = makeNode(PlannerInfo);
+	root->parse = query;
+	root->glob = glob;
+	root->query_level = 1;
+	root->planner_cxt = CurrentMemoryContext;
+	root->wt_param_id = -1;
+
+	/* Build a minimal RTE for the rel */
+	rte = makeNode(RangeTblEntry);
+	rte->rtekind = RTE_RELATION;
+	rte->relid = relationId;
+	rte->relkind = RELKIND_RELATION;
+	rte->rellockmode = AccessShareLock;
+	rte->lateral = false;
+	rte->inh = false;
+	rte->inFromCl = true;
+	query->rtable = list_make1(rte);
+
+	/* Set up RTE/RelOptInfo arrays */
+	setup_simple_rel_arrays(root);
+
+	return root;
+}
+
+/*
+ * Generate all the possible paths for the given subscriber relation,
+ * for cases where the source relation is replicated via REPLICA
+ * IDENTITY FULL. The function returns the cheapest Path among the
+ * eligible paths, see SuitablePathsForRepIdentFull().
+ *
+ * The function emulates getting the cheapest path for a query in
+ * the form of:
+ * 	"SELECT FROM localrel
+ * 	 WHERE attr1 = $1 AND attr2 = $2 ... AND attrN = $N"
+ *
+ * The function guarantees to return a path, because it adds
+ * sequential scan path if needed.
+ *
+ * The function assumes that all the columns will be provided during
+ * the execution phase, given that REPLICA IDENTITY FULL guarantees
+ * that.
+ */
+static Path *
+GetCheapestReplicaIdentityFullPath(Relation localrel)
+{
+	PlannerInfo *root;
+	RelOptInfo *rel;
+	int			attno;
+	RangeTblRef *rt;
+	List	   *joinList;
+
+	/* Build PlannerInfo */
+	root = GenerateDummySelectPlannerInfoForRelation(localrel->rd_id);
+
+	/* Build RelOptInfo */
+	rel = build_simple_rel(root, 1, NULL);
+
+	/*
+	 * Generate restrictions for all columns in the form of attr1 = $1 AND
+	 * attr2 = $2 ... AND attrN = $N
+	 */
+	for (attno = 0; attno < RelationGetNumberOfAttributes(localrel); attno++)
+	{
+		Form_pg_attribute attr = TupleDescAttr(localrel->rd_att, attno);
+
+		if (!attr->attisdropped)
+		{
+			Expr	   *eq_op;
+			TypeCacheEntry *typentry;
+			RestrictInfo *restrict_info;
+			Var		   *leftarg;
+			Param	   *rightarg;
+			int			varno = 1;
+
+			typentry = lookup_type_cache(attr->atttypid, TYPECACHE_EQ_OPR_FINFO);
+
+			if (!OidIsValid(typentry->eq_opr))
+				continue;		/* no equality operator skip this column */
+
+			leftarg =
+				makeVar(varno, attr->attnum, attr->atttypid, attr->atttypmod,
+						attr->attcollation, 0);
+
+			rightarg = makeNode(Param);
+			rightarg->paramkind = PARAM_EXTERN;
+			rightarg->paramid = list_length(rel->baserestrictinfo) + 1;
+			rightarg->paramtype = attr->atttypid;
+			rightarg->paramtypmod = attr->atttypmod;
+			rightarg->paramcollid = attr->attcollation;
+			rightarg->location = -1;
+
+			eq_op = make_opclause(typentry->eq_opr, BOOLOID, false,
+								  (Expr *) leftarg, (Expr *) rightarg,
+								  InvalidOid, attr->attcollation);
+
+			restrict_info = make_simple_restrictinfo(root, eq_op);
+
+			rel->baserestrictinfo = lappend(rel->baserestrictinfo, restrict_info);
+		}
+	}
+
+	/* Build joinList, which consists of a single relation */
+	rt = makeNode(RangeTblRef);
+	rt->rtindex = 1;
+	joinList = list_make1(rt);
+
+	/*
+	 * Make sure the planner generates the relevant paths, including all the
+	 * possible index scans as well as sequential scan.
+	 */
+	rel = make_one_rel(root, joinList);
+
+	/*
+	 * Currently it is not possible for the planner to pick a partial index or
+	 * indexes only on expressions. We still want to be explicit and eliminate
+	 * such paths proactively.
+	 *
+	 * The reason that the planner would not pick partial indexes and indexes
+	 * with only expressions based on the way currently baserestrictinfos are
+	 * formed (e.g., col_1 = $1 ... AND col_N = $N).
+	 *
+	 * For the partial indexes, check_index_predicates() (via
+	 * operator_predicate_proof()) checks whether the predicate of the index
+	 * is implied by the baserestrictinfos. The check always returns false
+	 * because index predicates formed with CONSTs and baserestrictinfos are
+	 * formed with PARAMs. Hence, partial indexes are never picked.
+	 *
+	 * Indexes that consist of only expressions (e.g., no simple column
+	 * references on the index) are also eliminated with similar reasoning.
+	 * match_restriction_clauses_to_index() (via match_index_to_operand())
+	 * eliminates the use of the index if the restriction does not have the
+	 * equal expression with the index.
+	 *
+	 * XXX: We also eliminate non-btree indexes, which could be relaxed if
+	 * needed. If we allow non-btree indexes, we should adjust
+	 * RelationFindReplTupleByIndex() to support such indexes.
+	 */
+	rel->pathlist =
+		SuitablePathsForRepIdentFull(rel->pathlist);
+
+	if (rel->pathlist == NIL)
+	{
+		/*
+		 * If there are no suitable indexes, we should always be able
+		 * to fallback to sequential scan.
+		 */
+		Path	   *seqScanPath = create_seqscan_path(root, rel, NULL, 0);
+
+		add_path(rel, seqScanPath);
+	}
+
+	set_cheapest(rel);
+
+	Assert(rel->cheapest_total_path != NULL);
+
+	return rel->cheapest_total_path;
+}
+
+/*
+ * Returns an index oid if the planner submodules pick index scans
+ * over sequential scan.
+ *
+ * Otherwise, returns InvalidOid.
+ *
+ * Note that this is not a generic function, it expects REPLICA
+ * IDENTITY FULL for the remote relation.
+ */
+static Oid
+FindUsableIndexForReplicaIdentityFull(Relation localrel)
+{
+	MemoryContext usableIndexContext;
+	MemoryContext oldctx;
+	Path	   *cheapest_total_path;
+	Oid			idxoid;
+
+	usableIndexContext = AllocSetContextCreate(CurrentMemoryContext,
+											   "usableIndexContext",
+											   ALLOCSET_DEFAULT_SIZES);
+	oldctx = MemoryContextSwitchTo(usableIndexContext);
+
+	cheapest_total_path = GetCheapestReplicaIdentityFullPath(localrel);
+
+	idxoid = GetIndexOidFromPath(cheapest_total_path);
+
+	MemoryContextSwitchTo(oldctx);
+
+	MemoryContextDelete(usableIndexContext);
+
+	return idxoid;
+}
+
+/*
+ * Get replica identity index or if it is not defined a primary key.
+ *
+ * If neither is defined, returns InvalidOid
+ */
+static Oid
+GetRelationIdentityOrPK(Relation rel)
+{
+	Oid			idxoid;
+
+	idxoid = RelationGetReplicaIndex(rel);
+
+	if (!OidIsValid(idxoid))
+		idxoid = RelationGetPrimaryKeyIndex(rel);
+
+	return idxoid;
+}
+
+/*
+ * Returns an index oid if we can use an index for subscriber. If not,
+ * returns InvalidOid.
+ */
+static Oid
+FindLogicalRepUsableIndex(Relation localrel, LogicalRepRelation *remoterel)
+{
+	Oid			idxoid;
+
+	/*
+	 * We never need index oid for partitioned tables, always rely on leaf
+	 * partition's index.
+	 */
+	if (localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		return InvalidOid;
+
+	/*
+	 * Simple case, we already have a primary key or a replica identity index.
+	 *
+	 * Note that we do not use index scans below when enable_indexscan is false.
+	 * Allowing primary key or replica identity even when index scan is
+	 * disabled is the legacy behaviour. So we hesitate to move the below
+	 * enable_indexscan check to be done earlier in this function.
+	 */
+	idxoid = GetRelationIdentityOrPK(localrel);
+	if (OidIsValid(idxoid))
+		return idxoid;
+
+	/* If index scans are disabled, use a sequential scan */
+	if (!enable_indexscan)
+		return InvalidOid;
+
+	if (remoterel->replident == REPLICA_IDENTITY_FULL &&
+		RelationGetIndexList(localrel) != NIL)
+	{
+		/*
+		 * If we had a primary key or relation identity with a unique index,
+		 * we would have already found and returned that oid. At this point,
+		 * the remote relation has replica identity full and we have at least
+		 * one local index defined.
+		 *
+		 * We are looking for one more opportunity for using an index. If
+		 * there are any indexes defined on the local relation, try to pick
+		 * the cheapest index.
+		 *
+		 * The index selection safely assumes that all the columns are going
+		 * to be available for the index scan given that remote relation has
+		 * replica identity full.
+		 */
+		return FindUsableIndexForReplicaIdentityFull(localrel);
+	}
+
+	return InvalidOid;
 }
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5250ae7f54..cedcce3382 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -332,6 +332,8 @@ static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
 										 TupleTableSlot *remoteslot);
+static Oid	get_usable_indexoid(ApplyExecutionData *edata,
+									 ResultRelInfo *relinfo);
 static void apply_handle_update_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
 										 TupleTableSlot *remoteslot,
@@ -341,6 +343,7 @@ static void apply_handle_delete_internal(ApplyExecutionData *edata,
 										 TupleTableSlot *remoteslot);
 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
 									LogicalRepRelation *remoterel,
+									Oid localidxoid,
 									TupleTableSlot *remoteslot,
 									TupleTableSlot **localslot);
 static void apply_handle_tuple_routing(ApplyExecutionData *edata,
@@ -1611,24 +1614,6 @@ apply_handle_type(StringInfo s)
 	logicalrep_read_typ(s, &typ);
 }
 
-/*
- * Get replica identity index or if it is not defined a primary key.
- *
- * If neither is defined, returns InvalidOid
- */
-static Oid
-GetRelationIdentityOrPK(Relation rel)
-{
-	Oid			idxoid;
-
-	idxoid = RelationGetReplicaIndex(rel);
-
-	if (!OidIsValid(idxoid))
-		idxoid = RelationGetPrimaryKeyIndex(rel);
-
-	return idxoid;
-}
-
 /*
  * Check that we (the subscription owner) have sufficient privileges on the
  * target relation to perform the given operation.
@@ -1774,11 +1759,8 @@ check_relation_updatable(LogicalRepRelMapEntry *rel)
 	if (rel->updatable)
 		return;
 
-	/*
-	 * We are in error mode so it's fine this is somewhat slow. It's better to
-	 * give user correct error.
-	 */
-	if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
+	/* Give user more precise error if possible. */
+	if (OidIsValid(rel->usableIndexOid))
 	{
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1921,12 +1903,14 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 	TupleTableSlot *localslot;
 	bool		found;
 	MemoryContext oldctx;
+	Oid			usableIndexOid = get_usable_indexoid(edata, relinfo);
 
 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
 	ExecOpenIndices(relinfo, false);
 
 	found = FindReplTupleInLocalRel(estate, localrel,
 									&relmapentry->remoterel,
+									usableIndexOid,
 									remoteslot, &localslot);
 	ExecClearTuple(remoteslot);
 
@@ -2059,11 +2043,12 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 	EPQState	epqstate;
 	TupleTableSlot *localslot;
 	bool		found;
+	Oid			usableIndexOid = get_usable_indexoid(edata, relinfo);
 
 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
 	ExecOpenIndices(relinfo, false);
 
-	found = FindReplTupleInLocalRel(estate, localrel, remoterel,
+	found = FindReplTupleInLocalRel(estate, localrel, remoterel, usableIndexOid,
 									remoteslot, &localslot);
 
 	/* If found delete it. */
@@ -2094,20 +2079,52 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 	EvalPlanQualEnd(&epqstate);
 }
 
+/*
+ * Decide whether we can pick an index for the relinfo (e.g., the relation)
+ * we're actually deleting/updating from. If it is a child partition of
+ * edata->targetRelInfo, find the index on the partition.
+ *
+ * Note that if the corresponding relmapentry has invalid usableIndexOid,
+ * the function returns InvalidOid.
+ */
+static Oid
+get_usable_indexoid(ApplyExecutionData *edata, ResultRelInfo *relinfo)
+{
+	ResultRelInfo *targetResultRelInfo = edata->targetRelInfo;
+	LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+
+	char		targetrelkind = targetResultRelInfo->ri_RelationDesc->rd_rel->relkind;
+
+	if (targetrelkind == RELKIND_PARTITIONED_TABLE)
+	{
+		/* Target is a partitioned table, so find relmapentry of the partition */
+		TupleConversionMap *map = relinfo->ri_RootToPartitionMap;
+		AttrMap    *attrmap = map ? map->attrMap : NULL;
+
+		LogicalRepPartMapEntry *part_entry =
+		logicalrep_partition_open(relmapentry, relinfo->ri_RelationDesc,
+								  attrmap);
+
+		relmapentry = &part_entry->relmapentry;
+	}
+
+	return relmapentry->usableIndexOid;
+}
+
 /*
  * Try to find a tuple received from the publication side (in 'remoteslot') in
  * the corresponding local relation using either replica identity index,
- * primary key or if needed, sequential scan.
+ * primary key, index or if needed, sequential scan.
  *
  * Local tuple, if found, is returned in '*localslot'.
  */
 static bool
 FindReplTupleInLocalRel(EState *estate, Relation localrel,
 						LogicalRepRelation *remoterel,
+						Oid localidxoid,
 						TupleTableSlot *remoteslot,
 						TupleTableSlot **localslot)
 {
-	Oid			idxoid;
 	bool		found;
 
 	/*
@@ -2118,12 +2135,11 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
 
 	*localslot = table_slot_create(localrel, &estate->es_tupleTable);
 
-	idxoid = GetRelationIdentityOrPK(localrel);
-	Assert(OidIsValid(idxoid) ||
+	Assert(OidIsValid(localidxoid) ||
 		   (remoterel->replident == REPLICA_IDENTITY_FULL));
 
-	if (OidIsValid(idxoid))
-		found = RelationFindReplTupleByIndex(localrel, idxoid,
+	if (OidIsValid(localidxoid))
+		found = RelationFindReplTupleByIndex(localrel, localidxoid,
 											 LockTupleExclusive,
 											 remoteslot, *localslot);
 	else
@@ -2153,7 +2169,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 	TupleTableSlot *remoteslot_part;
 	TupleConversionMap *map;
 	MemoryContext oldctx;
-	LogicalRepRelMapEntry *part_entry = NULL;
+	LogicalRepPartMapEntry *part_entry = NULL;
 	AttrMap    *attrmap = NULL;
 
 	/* ModifyTableState is needed for ExecFindPartition(). */
@@ -2203,7 +2219,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 	{
 		part_entry = logicalrep_partition_open(relmapentry, partrel,
 											   attrmap);
-		check_relation_updatable(part_entry);
+		check_relation_updatable(&part_entry->relmapentry);
 	}
 
 	switch (operation)
@@ -2227,13 +2243,15 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 			 * suitable partition.
 			 */
 			{
+				LogicalRepRelMapEntry *partrelmapentry = &part_entry->relmapentry;
 				TupleTableSlot *localslot;
 				ResultRelInfo *partrelinfo_new;
 				bool		found;
 
 				/* Get the matching local tuple from the partition. */
 				found = FindReplTupleInLocalRel(estate, partrel,
-												&part_entry->remoterel,
+												&partrelmapentry->remoterel,
+												partrelmapentry->usableIndexOid,
 												remoteslot_part, &localslot);
 				if (!found)
 				{
@@ -2255,7 +2273,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 				 * remoteslot_part.
 				 */
 				oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-				slot_modify_data(remoteslot_part, localslot, part_entry,
+				slot_modify_data(remoteslot_part, localslot, partrelmapentry,
 								 newtup);
 				MemoryContextSwitchTo(oldctx);
 
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 78cd7e77f5..383e2ed721 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -13,6 +13,7 @@
 #define LOGICALRELATION_H
 
 #include "access/attmap.h"
+#include "catalog/index.h"
 #include "replication/logicalproto.h"
 
 typedef struct LogicalRepRelMapEntry
@@ -31,20 +32,39 @@ typedef struct LogicalRepRelMapEntry
 	Relation	localrel;		/* relcache entry (NULL when closed) */
 	AttrMap    *attrmap;		/* map of local attributes to remote ones */
 	bool		updatable;		/* Can apply updates/deletes? */
+	Oid			usableIndexOid; /* which index to use, or InvalidOid if none */
 
 	/* Sync state. */
 	char		state;
 	XLogRecPtr	statelsn;
 } LogicalRepRelMapEntry;
 
+/*
+ * Used for Partition mapping (see LogicalRepPartMap in logical/relation.c)
+ *
+ * When a partitioned table is used as replication target, replicated
+ * operations are actually performed on its leaf partitions, which requires
+ * the partitions to also be mapped to the remote relation.  Parent's entry
+ * (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because
+ * individual partitions may have different attribute numbers, which means
+ * attribute mappings to remote relation's attributes must be maintained
+ * separately for each partition.
+ */
+typedef struct LogicalRepPartMapEntry
+{
+	Oid			partoid;		/* LogicalRepPartMap's key */
+	LogicalRepRelMapEntry relmapentry;
+} LogicalRepPartMapEntry;
+
 extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
 extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel);
 
 extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
 												  LOCKMODE lockmode);
-extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root,
-														Relation partrel, AttrMap *map);
+extern LogicalRepPartMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root,
+														 Relation partrel, AttrMap *map);
 extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 								 LOCKMODE lockmode);
+extern bool IsIndexOnlyOnExpression(IndexInfo *indexInfo);
 
 #endif							/* LOGICALRELATION_H */
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index 85d1dd9295..0aa367ec69 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -36,6 +36,7 @@ tests += {
       't/029_on_error.pl',
       't/030_origin.pl',
       't/031_column_list.pl',
+      't/032_subscribe_use_index.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/032_subscribe_use_index.pl b/src/test/subscription/t/032_subscribe_use_index.pl
new file mode 100644
index 0000000000..71f5698af5
--- /dev/null
+++ b/src/test/subscription/t/032_subscribe_use_index.pl
@@ -0,0 +1,1202 @@
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test logical replication behavior with subscriber uses available index
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	"wal_retrieve_retry_interval = 1ms");
+
+# we don't want planner to pick bitmap scans instead of index scans
+# this is to make the tests consistent
+$node_subscriber->append_conf('postgresql.conf',
+       "enable_bitmapscan = off");
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+my $appname           = 'tap_sub';
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX
+#
+# Basic test where the subscriber uses index
+# and only updates 1 row for and deletes
+# 1 other row
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i FROM generate_series(0,21)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates one row via index";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test_replica_id_full WHERE x = 20;");
+$node_publisher->wait_for_catchup($appname);
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 2) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for'check subscriber tap_sub_rep_full deletes one row via index";
+
+# make sure that the subscriber has the correct data
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(DISTINCT x) FROM test_replica_id_full");
+is($result, qq(20), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION RE-CALCULATES INDEX AFTER CREATE/DROP INDEX
+#
+# The subscription should react if an index is dropped or recreated.
+# This test ensures that after CREATE INDEX, the subscriber can automatically
+# use the newly created index (provided that it fullfils the requirements).
+# Similarly, after DROP index, the subscriber can automatically switch to
+# sequential scan
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int NOT NULL, y int)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int NOT NULL, y int)");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i, i FROM generate_series(0,2100)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# now, create index and see that the index is used
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+
+# wait until the index is created
+$node_subscriber->poll_query_until(
+	'postgres', q{select count(*)=1 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates one row via index";
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates one row via index";
+
+
+# now, ingest more data and create index on column y which has higher cardinality
+# then create an index on column y so that future commands uses the index on column
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT 50, i FROM generate_series(0,3100)i;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idy ON test_replica_id_full(y)");
+
+# wait until the index is created
+$node_subscriber->poll_query_until(
+	'postgres', q{select count(*)=1 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idy';}
+) or die "Timed out while waiting for creating index test_replica_id_full_idy";
+
+# now, the update should use the test_replica_id_full_idy index
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET y = y + 1 WHERE y = 3000;");
+$node_publisher->wait_for_catchup($appname);
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idy';}
+) or die "Timed out while waiting for'check subscriber tap_sub_rep_full deletes one row via index";
+
+# let's also test dropping test_replica_id_full_idy and show that
+# it triggers re-calculation of the index, hence use test_replica_id_full_idx
+$node_subscriber->safe_psql('postgres',
+	"DROP INDEX test_replica_id_full_idy;");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 25;");
+$node_publisher->wait_for_catchup($appname);
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 2) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates one row via index";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM test_replica_id_full WHERE x = 15 OR x = 25 OR y = 3000;");
+is($result, qq(0), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION RE-CALCULATES INDEX AFTER CREATE/DROP INDEX
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX UPDATEs MULTIPLE ROWS
+#
+# Basic test where the subscriber uses index
+# and updates 50 rows
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+
+# insert some initial data within the range 0-19
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i%20 FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# updates 50 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 50) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates 50 rows via index";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full where x = 15;");
+is($result, qq(0), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX UPDATEs MULTIPLE ROWS
+# ====================================================================
+
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX WITH MULTIPLE COLUMNS
+#
+# Basic test where the subscriber uses index
+# and deletes 200 rows
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y text)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y text)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y)");
+
+# insert some initial data within the range 0-9
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT (i%10), (i%10)::text FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# deletes 200 rows
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test_replica_id_full WHERE x IN (5, 6);");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 200) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates 200 rows via index";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full where x in (5, 6);");
+is($result, qq(0), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX WITH MULTIPLE COLUMNS
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX WITH DROPPED COLUMNS
+#
+# Basic test where the subscriber uses index
+# and updates multiple rows with a table that has
+# dropped columns
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (drop_1 jsonb, x int, drop_2 point, y text, drop_3 timestamptz)"
+);
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_1");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_2");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_3");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (drop_1 jsonb, x int, drop_2 point, y text, drop_3 timestamptz)"
+);
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_1");
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_2");
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_3");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y)");
+
+# insert some initial data within the range 0-9
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT (i%10), (i%10)::text FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# updates 200 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x IN (5, 6);");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 200) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates 200 rows via index'";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(x+y::int) from test_replica_id_full;");
+is($result, qq(9200), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX WITH DROPPED COLUMNS
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX ON PARTITIONED TABLES
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_0 REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_1 REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX users_table_part_idx ON users_table_part(user_id, value_1)"
+);
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO users_table_part SELECT (i%100), (i%20), i FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE users_table_part");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# updates rows and moves between partitions
+$node_publisher->safe_psql('postgres',
+	"UPDATE users_table_part SET value_1 = 0 WHERE user_id = 4;");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=10 from pg_stat_all_indexes where indexrelname ilike 'users_table_part_%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates partitioned table'";
+
+# deletes rows and moves between partitions
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE user_id = 1 and value_1 = 1;");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE user_id = 12 and value_1 = 12;");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=30 from pg_stat_all_indexes where indexrelname ilike 'users_table_part_%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates partitioned table'";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(user_id+value_1+value_2) from users_table_part;");
+is($result, qq(550070), 'ensure subscriber has the correct data at the end of the test');
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(DISTINCT(user_id,value_1, value_2)) from users_table_part;");
+is($result, qq(981), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# Testcase end: SUBSCRIPTION USES INDEX ON PARTITIONED TABLES
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION DOES NOT USE PARTIAL INDEX
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full_part_index (x int);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full_part_index REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full_part_index (x int);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_part_idx ON test_replica_id_full_part_index(x) WHERE (x = 5);");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full_part_index SELECT i FROM generate_series(0,21)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full_part_index");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# update 2 rows, one of them is indexed
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full_part_index SET x = x + 1 WHERE x = 5;");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full_part_index SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# make sure that the index is not used
+$result = $node_subscriber->safe_psql('postgres',
+	"select idx_scan from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_part_idx'");
+is($result, qq(0), 'ensure subscriber tap_sub_rep_full updates one row via seq. scan with with partial index');
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM test_replica_id_full_part_index;");
+is($result, qq(22), 'ensure subscriber has the correct data at the end of the test');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(DISTINCT x) FROM test_replica_id_full_part_index;");
+is($result, qq(20), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full_part_index");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full_part_index");
+
+# Testcase end: SUBSCRIPTION DOES NOT USE PARTIAL INDEX
+# ====================================================================
+
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION DOES NOT USE INDEXES WITH ONLY EXPRESSIONS
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE people REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX people_names ON people ((firstname || ' ' || lastname));");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO people SELECT 'first_name_' || i::text, 'last_name_' || i::text FROM generate_series(0,200)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE people");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# update 2 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_1';");
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_2' AND lastname = 'last_name_2';");
+
+# make sure the index is not used on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"select idx_scan from pg_stat_all_indexes where indexrelname = 'people_names'");
+is($result, qq(0), 'ensure ssubscriber tap_sub_rep_full updates two rows via seq. scan with index on expressions');
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'first_name_3';");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'first_name_4' AND lastname = 'last_name_4';");
+
+# make sure the index is not used on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"select idx_scan from pg_stat_all_indexes where indexrelname = 'people_names'");
+is($result, qq(0), 'ensure ssubscriber tap_sub_rep_full updates two rows via seq. scan with index on expressions');
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM people;");
+is($result, qq(199), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE people");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE people");
+
+# Testcase end: SUBSCRIPTION DOES NOT USE INDEXES WITH ONLY EXPRESSIONS
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION CAN USE INDEXES WITH EXPRESSIONS AND COLUMNS
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE people REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX people_names ON people (firstname, lastname, (firstname || ' ' || lastname));");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO people SELECT 'first_name_' || i::text, 'last_name_' || i::text FROM generate_series(0, 200)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE people");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# update 2 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_1';");
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_3' AND lastname = 'last_name_3';");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'people_names';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates two rows via index scan with index on expressions and columns";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'Nan';");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=4 from pg_stat_all_indexes where indexrelname = 'people_names';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full deletes two rows via index scan with index on expressions and columns";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM people;");
+is($result, qq(199), 'ensure subscriber has the correct data at the end of the test');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM people WHERE firstname = 'NaN';");
+is($result, qq(0), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE people");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE people");
+
+# Testcase end: SUBSCRIPTION CAN USE INDEXES WITH EXPRESSIONS AND COLUMNS
+# ====================================================================
+
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE
+#
+# The information about whether the subscription uses an index or
+# sequential can be re-calculated by ANALYZE call on the table on
+# the subscriber. This is useful if at first sequential scan is
+# picked, but then the data size increased and index scan becomes
+# more efficient. In such cases, either ANALYZE done by autovacuum
+# or explicit user initiated ANALYZE can trigger to re-calculate
+# the selection.
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test (column_a int, column_b int);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test (column_a int, column_b int);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX index_a ON test (column_a);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX index_b ON test (column_b);");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test SELECT i,0 FROM generate_series(0, 2000)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_subscriber->safe_psql('postgres', "ANALYZE test;");
+
+# update 1 row and delete 1 row using index_a
+$node_publisher->safe_psql('postgres',
+	"UPDATE test SET column_b = column_b + 1 WHERE column_a = 15;");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test WHERE column_a = 20;");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'index_a';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates two rows via index scan with index on high cardinality column-1";
+
+# show that index_b is not used
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=0 from pg_stat_all_indexes where indexrelname = 'index_b';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates two rows via index scan with index on high cardinality column-2";
+
+# insert data such that the cardinality of column_b becomes much higher
+# and index_b becomes the candidate for index
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test SELECT 0,i FROM generate_series(0, 20000)i;");
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->safe_psql('postgres', "ANALYZE test;");
+
+# update 1 row and delete 1 row using index_b, so index_a still has 2 idx_scan
+$node_publisher->safe_psql('postgres',
+	"UPDATE test SET column_a = column_a + 1 WHERE column_b = 150;");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test WHERE column_b = 200;");
+
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'index_a';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates two rows via index scan with index on high cardinality column-3";
+
+# now, show that index_b used 2 times
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'index_b';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates two rows via index scan with index on high cardinality column-4";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT sum(column_a+column_b) from test;;");
+is($result, qq(202010782), 'ensure subscriber has the correct data at the end of the test');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(DISTINCT(column_a,column_b)) from test;;");
+is($result, qq(21999), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test");
+
+# Testcase end: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE
+# ====================================================================
+
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE - PARTITIONED TABLE
+# Similar to "SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE", for partitioned tables
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (user_id);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_0 REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_1 REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (user_id);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX users_table_ind_on_value_1 ON users_table_part(value_1)"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX users_table_ind_on_value_2 ON users_table_part(value_2)"
+);
+
+# insert some initial data where cardinality of value_1 is high, and cardinality of value_2 is very low
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO users_table_part SELECT (i%20), i, i%2 FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE users_table_part");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_subscriber->safe_psql('postgres',	"ANALYZE users_table_part");
+
+# updates rows and moves between partitions
+$node_publisher->safe_psql('postgres',
+	"UPDATE users_table_part SET value_1 = 0 WHERE value_1 = 30;");
+
+# show that index on value_1 is used for update
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=1 from pg_stat_all_indexes where indexrelname ilike 'users_table%value%1%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates partitioned table'";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE value_1 = 40");
+
+# show that index on value_1 is used for delete
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=2 from pg_stat_all_indexes where indexrelname ilike 'users_table%value%1%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates partitioned table'";
+
+# now, load some more data where cardinality of value_2 is high, and cardinality of value_1 is very low
+$node_publisher->safe_psql('postgres', "TRUNCATE users_table_part");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO users_table_part SELECT (i%20), i%2, i FROM generate_series(0,10000)i;"
+);
+$node_publisher->wait_for_catchup($appname);
+
+# analyze updates the table statistics, so that index on value_2 can be used
+$node_subscriber->safe_psql('postgres',	"ANALYZE users_table_part");
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE users_table_part SET value_1 = 0 WHERE value_2 = 3000;");
+
+# show that index on value_2 is used for update
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=1 from pg_stat_all_indexes where indexrelname ilike 'users_table%value%2%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates partitioned table'";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE value_2 = 4000");
+
+# show that index on value_2 is used for delete
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=2 from pg_stat_all_indexes where indexrelname ilike 'users_table%value%2%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates partitioned table'";
+
+
+# finally, make sure that even an index is only defined on a partition (e.g., not inherited from parent)
+# it can still be used during replication
+
+$node_subscriber->safe_psql('postgres',
+	"DROP INDEX users_table_ind_on_value_1;"
+);
+$node_subscriber->safe_psql('postgres',
+	"DROP INDEX users_table_ind_on_value_2;"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX users_table_ind_on_value_2 ON users_table_part_0(value_2)"
+);
+
+# now, load some more data where cardinality of value_2 is high, and cardinality of value_1 is very low
+$node_publisher->safe_psql('postgres', "TRUNCATE users_table_part");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO users_table_part SELECT (i%20), i%2, i FROM generate_series(0,10000)i;"
+);
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->safe_psql('postgres',	"ANALYZE users_table_part");
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE users_table_part SET value_1 = 0 WHERE value_2 > 3000 AND user_id = 0;");
+
+# show that index defined on partition on value_2 is used for update
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=350 from pg_stat_all_indexes where indexrelname ilike 'users_table_ind_on_value_2';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates partitioned table with index on partition'";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT sum(user_id+value_1+value_2) from users_table_part;");
+is($result, qq(50105000), 'ensure subscriber has the correct data at the end of the test');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(DISTINCT(user_id,value_1,value_2)) from users_table_part;");
+is($result, qq(10001), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# Testcase end: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE - PARTITIONED TABLE
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE - INHERITED TABLE
+# Similar to "SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE", for inherited tables
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE parent (a int);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE child_1 (b int) inherits (parent);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE child_2 (b int) inherits (parent);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE parent REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE child_1 REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE child_2 REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE parent (a int);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE child_1 (b int) inherits (parent);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE child_2 (b int) inherits (parent);"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX index_on_parent ON parent(a)"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX index_on_child_1_a ON child_1(a)"
+);
+
+# create another index on the child on a column with higher cardinality
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX index_on_child_1_b ON child_1(b)"
+);
+
+# insert some initial data where cardinality of value_1 is high, and cardinality of value_2 is very low
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO parent SELECT i FROM generate_series(0,1000)i;");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO child_1 SELECT (i%500), 0 FROM generate_series(0,1000)i;");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO child_2 SELECT (i%500), 0 FROM generate_series(0,1000)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE parent");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_subscriber->safe_psql('postgres',	"ANALYZE parent");
+$node_subscriber->safe_psql('postgres',	"ANALYZE child_1");
+$node_subscriber->safe_psql('postgres',	"ANALYZE child_2");
+
+# updating the row will use the index on the parent for one tuple,
+# as well as two tuples child_1
+$node_publisher->safe_psql('postgres',
+	"UPDATE parent SET a = 0 WHERE a = 10;");
+
+# show that index on the parent is used
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=1 from pg_stat_all_indexes where indexrelname = 'index_on_parent';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates parent table'";
+
+# delete 2 rows from the child_1 using index_on_child_1_a
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM child_1 WHERE a = 250");
+
+# 2 rows from first command, another 2 from the second command
+# overall index_on_child_1_a is used 4 times
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=4 from pg_stat_all_indexes where indexrelname = 'index_on_child_1_a';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates child_1 table'";
+
+
+# insert some more data where cardinality of column b is high on child_1
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO child_1 SELECT 0, i FROM generate_series(0,10000)i;",
+);
+$node_publisher->wait_for_catchup($appname);
+
+# ANALYZING child_1 will change the index used on child_1 and going to use index_on_child_1_b
+$node_subscriber->safe_psql('postgres',	"ANALYZE child_1");
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM child_1 WHERE b = 41");
+
+# show that now index_on_child_1_b is used
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=1 from pg_stat_all_indexes where indexrelname = 'index_on_child_1_b';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates child_1 table'";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(a) FROM parent;");
+is($result, qq(998950), 'ensure subscriber has the correct data at the end of the test');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(DISTINCT(a)) FROM parent;");
+is($result, qq(1000), 'ensure subscriber has the correct data at the end of the test');
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(a) FROM child_1;");
+is($result, qq(248980), 'ensure subscriber has the correct data at the end of the test');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(DISTINCT(a)) FROM child_1;");
+is($result, qq(498), 'ensure subscriber has the correct data at the end of the test');
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(a) FROM child_2;");
+is($result, qq(249480), 'ensure subscriber has the correct data at the end of the test');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(DISTINCT(a)) FROM child_2;");
+is($result, qq(499), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE parent, child_1, child_2");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE parent, child_1, child_2");
+
+# Testcase end: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE - INHERITED TABLE
+# ====================================================================
+
+# ====================================================================
+# Testcase start: Some NULL values
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y int);"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y);"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full VALUES (1), (2), (3);");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 1;");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 3;");
+
+# check if the index is used even when the index has NULL values
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates parent table'";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(x) from test_replica_id_full WHERE y IS NULL;");
+is($result, qq(8), 'ensure subscriber has the correct data at the end of the test');
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full WHERE y IS NULL;");
+is($result, qq(3), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: Some NULL values
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION BEHAVIOR WITH ENABLE_INDEXSCAN
+#
+# Even if enable_indexscan = false, we do use the primary keys, this
+# is the legacy behavior. However, we do not use non-primary/non replica
+# identity columns.
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int NOT NULL)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int NOT NULL)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+$node_subscriber->safe_psql('postgres',
+	"ALTER SYSTEM SET enable_indexscan TO off;");
+$node_subscriber->safe_psql('postgres',
+	"SELECT pg_reload_conf();");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i FROM generate_series(0,21)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 10000 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# show that the unique index on replica identity is used even when enable_indexscan=false
+$result = $node_subscriber->safe_psql('postgres',
+	"select idx_scan from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx'");
+is($result, qq(0), 'ensure subscriber has not used index with enable_indexscan=false');
+
+# we are done with this index, drop to simplify the tests
+$node_subscriber->safe_psql('postgres',
+	"DROP INDEX test_replica_id_full_idx");
+
+# now, create a unique index and set the replica
+$node_publisher->safe_psql('postgres',
+	"CREATE UNIQUE INDEX test_replica_id_full_unique ON test_replica_id_full(x);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE UNIQUE INDEX test_replica_id_full_unique ON test_replica_id_full(x);");
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY USING INDEX test_replica_id_full_unique;");
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY USING INDEX test_replica_id_full_unique;");
+
+# wait for the synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 10000 WHERE x = 14;");
+$node_publisher->wait_for_catchup($appname);
+
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan=1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_unique'}
+) or die "Timed out while waiting ensuring subscriber used unique index as replica identity even with enable_indexscan=false'";
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM test_replica_id_full WHERE x IN (14,15)");
+is($result, qq(0), 'ensure the results are accurate even with enable_indexscan=false');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+$node_subscriber->safe_psql('postgres',
+	"ALTER SYSTEM RESET enable_indexscan;");
+$node_subscriber->safe_psql('postgres',
+	"SELECT pg_reload_conf();");
+
+# Testcase start: SUBSCRIPTION BEHAVIOR WITH ENABLE_INDEXSCAN
+# ====================================================================
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
+
+done_testing();
-- 
2.34.1

Reply via email to