Hi, As far as I can see, the following is the answer to the only remaining open discussion in this thread. Let me know if anything is missed.
(b) it appears to me that the patch decides > >> which index to use the first time it opens the rel (or if the rel gets > >> invalidated) on subscriber and then for all consecutive operations it > >> uses the same index. It is quite possible that after some more > >> operations on the table, using the same index will actually be > >> costlier than a sequence scan or some other index scan > > > > > > Regarding (b), yes that is a concern I share. And, I was actually > considering sending another patch regarding this. > > > > Currently, I can see two options and happy to hear your take on these > (or maybe another idea?) > > > > - Add a new class of invalidation callbacks: Today, if we do ALTER TABLE > or CREATE INDEX on a table, the CacheRegisterRelcacheCallback helps us to > re-create the cache entries. In this case, as far as I can see, we need a > callback that is called when table "ANALYZE"d, because that is when the > statistics change. That is the time picking a new index makes sense. > > However, that seems like adding another dimension to this patch, which I > can try but also see that committing becomes even harder. > > > > This idea sounds worth investigating. I see that this will require > more work but OTOH, we can't allow the existing system to regress > especially because depending on workload it might regress badly. We > can create a patch for this atop the base patch for easier review/test > but I feel we need some way to address this point. > > It turns out that we already invalidate the relevant entries in LogicalRepRelMap/LogicalRepPartMap when "ANALYZE" (or VACUUM) updates any of the statistics in pg_class. The call-stack for analyze is roughly: do_analyze_rel() -> vac_update_relstats() -> heap_inplace_update() -> if needs to apply any statistical change -> CacheInvalidateHeapTuple() And, we register for those invalidations already: logicalrep_relmap_init() / logicalrep_partmap_init() -> CacheRegisterRelcacheCallback() Added a test which triggers this behavior. The test is as follows: - Create two indexes on the target, on column_a and column_b - Initially load data such that the column_a has a high cardinality - Show that we use the index on column_a - Load more data such that the column_b has higher cardinality - ANALYZE on the target table - Show that we use the index on column_b afterwards Thanks, Onder KALACI
From 890fd74d57979eb75a976b2131ee619581dc225f Mon Sep 17 00:00:00 2001 From: Onder Kalaci <onderkalaci@gmail.com> Date: Tue, 17 May 2022 10:47:39 +0200 Subject: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher It is often not feasible to use `REPLICA IDENTITY FULL` on the publication, because it leads to full table scan per tuple change on the subscription. This makes `REPLICA IDENTITY FULL` impracticable -- probably other than some small number of use cases. With this patch, I'm proposing the following change: If there is an index on the subscriber, use the index as long as the planner sub-modules picks any index over sequential scan. Majority of the logic on the subscriber side has already existed in the code. The subscriber is already capable of doing (unique) index scans. With this patch, we are allowing the index to iterate over the tuples fetched and only act when tuples are equal. The ones familiar with this part of the code could realize that the sequential scan code on the subscriber already implements the `tuples_equal()` function. In short, the changes on the subscriber is mostly combining parts of (unique) index scan and sequential scan codes. The decision on whether to use an index (or which index) is mostly derived from planner infrastructure. The idea is that on the subscriber we have all the columns. So, construct all the `Path`s with the restrictions on all columns, such as `col_1 = $1 AND col_2 = $2 ... AND col_n = $N`. Finally, let the planner sub-module -- `create_index_paths()` -- to give us the relevant index `Path`s. On top of that, add the sequential scan `Path` as well. Finally, pick the cheapest `Path` among. From the performance point of view, there are few things to note. First, the patch aims not to change the behavior when PRIMARY KEY or UNIQUE INDEX is used. Second, when REPLICA IDENTITY IS FULL on the publisher and an index is used on the subscriber, the difference mostly comes down to `index scan` vs `sequential scan`. That's why it is hard to claim certain number of improvements. It mostly depends on the data size, index and the data distribution. Still, below I try to show case the potential improvements using an index on the subscriber `pgbench_accounts(bid)`. With the index, the replication catches up around ~5 seconds. When the index is dropped, the replication takes around ~300 seconds. // init source db pgbench -i -s 100 -p 5432 postgres psql -c "ALTER TABLE pgbench_accounts DROP CONSTRAINT pgbench_accounts_pkey;" -p 5432 postgres psql -c "CREATE INDEX i1 ON pgbench_accounts(aid);" -p 5432 postgres psql -c "ALTER TABLE pgbench_accounts REPLICA IDENTITY FULL;" -p 5432 postgres psql -c "CREATE PUBLICATION pub_test_1 FOR TABLE pgbench_accounts;" -p 5432 postgres // init target db, drop existing primary key pgbench -i -p 9700 postgres psql -c "truncate pgbench_accounts;" -p 9700 postgres psql -c "ALTER TABLE pgbench_accounts DROP CONSTRAINT pgbench_accounts_pkey;" -p 9700 postgres psql -c "CREATE SUBSCRIPTION sub_test_1 CONNECTION 'host=localhost port=5432 user=onderkalaci dbname=postgres' PUBLICATION pub_test_1;" -p 9700 postgres // create one indxe, even on a low cardinality column psql -c "CREATE INDEX i2 ON pgbench_accounts(bid);" -p 9700 postgres // now, run some pgbench tests and observe replication pgbench -t 500 -b tpcb-like -p 5432 postgres --- src/backend/executor/execReplication.c | 94 ++- src/backend/replication/logical/relation.c | 314 +++++++- src/backend/replication/logical/worker.c | 98 ++- src/include/replication/logicalrelation.h | 14 +- .../subscription/t/032_subscribe_use_index.pl | 712 ++++++++++++++++++ 5 files changed, 1165 insertions(+), 67 deletions(-) create mode 100644 src/test/subscription/t/032_subscribe_use_index.pl diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index b000645d48..59480406f5 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -37,49 +37,72 @@ #include "utils/typcache.h" +static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, + TypeCacheEntry **eq); + /* * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that * is setup to match 'rel' (*NOT* idxrel!). * - * Returns whether any column contains NULLs. + * Returns how many columns should be used for the index scan. * - * This is not generic routine, it expects the idxrel to be replication - * identity of a rel and meet all limitations associated with that. + * This is not generic routine, it expects the idxrel to be an index + * that planner would choose if the searchslot includes all the columns + * (e.g., REPLICA IDENTITY FULL on the source). */ -static bool +static int build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot) { - int attoff; + int index_attoff; + int scankey_attoff; bool isnull; Datum indclassDatum; oidvector *opclass; int2vector *indkey = &idxrel->rd_index->indkey; - bool hasnulls = false; - - Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) || - RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel)); indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple, Anum_pg_index_indclass, &isnull); Assert(!isnull); opclass = (oidvector *) DatumGetPointer(indclassDatum); + scankey_attoff = 0; /* Build scankey for every attribute in the index. */ - for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++) + for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); + index_attoff++) { Oid operator; Oid opfamily; RegProcedure regop; - int pkattno = attoff + 1; - int mainattno = indkey->values[attoff]; - Oid optype = get_opclass_input_type(opclass->values[attoff]); + int pkattno = index_attoff + 1; + int mainattno = indkey->values[index_attoff]; + Oid optype = get_opclass_input_type(opclass->values[index_attoff]); + + if (!AttributeNumberIsValid(mainattno)) + { + /* + * There are two cases to consider. First, if the index is a primary or + * unique key, we cannot have any indexes with expressions. So, at this + * point we are sure that the index we deal is not these. + */ + Assert(RelationGetReplicaIndex(rel) != RelationGetRelid(idxrel) && + RelationGetPrimaryKeyIndex(rel) != RelationGetRelid(idxrel)); + + /* + * For a non-primary/unique index with an expression, we are sure that + * the expression cannot be used for replication index search. The + * reason is that we create relevant index paths by providing column + * equalities. And, the planner does not pick expression indexes via + * column equality restrictions in the query. + */ + continue; + } /* * Load the operator info. We need this to get the equality operator * function for the scan key. */ - opfamily = get_opclass_family(opclass->values[attoff]); + opfamily = get_opclass_family(opclass->values[index_attoff]); operator = get_opfamily_member(opfamily, optype, optype, @@ -91,23 +114,28 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, regop = get_opcode(operator); /* Initialize the scankey. */ - ScanKeyInit(&skey[attoff], + ScanKeyInit(&skey[scankey_attoff], pkattno, BTEqualStrategyNumber, regop, searchslot->tts_values[mainattno - 1]); - skey[attoff].sk_collation = idxrel->rd_indcollation[attoff]; + skey[scankey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff]; /* Check for null value. */ if (searchslot->tts_isnull[mainattno - 1]) { - hasnulls = true; - skey[attoff].sk_flags |= SK_ISNULL; + skey[scankey_attoff].sk_flags |= SK_ISNULL; } + + scankey_attoff++; + } - return hasnulls; + /* we should always use at least one attribute for the index scan */ + Assert (scankey_attoff > 0); + + return scankey_attoff; } /* @@ -128,28 +156,38 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid, TransactionId xwait; Relation idxrel; bool found; + TypeCacheEntry **eq; + bool indisunique; + int scankey_attoff; /* Open the index. */ idxrel = index_open(idxoid, RowExclusiveLock); + indisunique = idxrel->rd_index->indisunique; + if (!indisunique) + eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts); /* Start an index scan. */ InitDirtySnapshot(snap); - scan = index_beginscan(rel, idxrel, &snap, - IndexRelationGetNumberOfKeyAttributes(idxrel), - 0); /* Build scan key. */ - build_replindex_scan_key(skey, rel, idxrel, searchslot); + scankey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot); + scan = index_beginscan(rel, idxrel, &snap, + scankey_attoff, 0); retry: found = false; - index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0); + index_rescan(scan, skey, scankey_attoff, NULL, 0); /* Try to find the tuple */ - if (index_getnext_slot(scan, ForwardScanDirection, outslot)) + while (index_getnext_slot(scan, ForwardScanDirection, outslot)) { - found = true; + /* avoid expensive equality check if index is unique */ + if (!indisunique && !tuples_equal(outslot, searchslot, eq)) + { + continue; + } + ExecMaterializeSlot(outslot); xwait = TransactionIdIsValid(snap.xmin) ? @@ -164,6 +202,10 @@ retry: XactLockTableWait(xwait, NULL, NULL, XLTW_None); goto retry; } + + /* Found our tuple and it's not locked */ + found = true; + break; } /* Found tuple, try to lock it in the lockmode. */ diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index e989047681..f3294c365c 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -17,14 +17,30 @@ #include "postgres.h" +#ifdef USE_ASSERT_CHECKING +#include "access/genam.h" +#endif #include "access/table.h" #include "catalog/namespace.h" +#ifdef USE_ASSERT_CHECKING +#include "catalog/index.h" +#endif +#include "catalog/pg_am_d.h" #include "catalog/pg_subscription_rel.h" +#include "catalog/pg_operator.h" +#include "commands/defrem.h" #include "executor/executor.h" #include "nodes/makefuncs.h" #include "replication/logicalrelation.h" #include "replication/worker_internal.h" +#include "optimizer/cost.h" +#include "optimizer/paramassign.h" +#include "optimizer/pathnode.h" +#include "optimizer/paths.h" +#include "optimizer/plancat.h" +#include "optimizer/restrictinfo.h" #include "utils/inval.h" +#include "utils/typcache.h" static MemoryContext LogicalRepRelMapContext = NULL; @@ -44,11 +60,9 @@ static HTAB *LogicalRepRelMap = NULL; */ static MemoryContext LogicalRepPartMapContext = NULL; static HTAB *LogicalRepPartMap = NULL; -typedef struct LogicalRepPartMapEntry -{ - Oid partoid; /* LogicalRepPartMap's key */ - LogicalRepRelMapEntry relmapentry; -} LogicalRepPartMapEntry; + +static Oid LogicalRepUsableIndex(Relation localrel, + LogicalRepRelation *remoterel); /* * Relcache invalidation callback for our relation map cache. @@ -438,6 +452,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) */ logicalrep_rel_mark_updatable(entry); + entry->usableIndexOid = LogicalRepUsableIndex(entry->localrel, remoterel); entry->localrelvalid = true; } @@ -581,13 +596,14 @@ logicalrep_partmap_init(void) * Note there's no logicalrep_partition_close, because the caller closes the * component relation. */ -LogicalRepRelMapEntry * +LogicalRepPartMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map) { LogicalRepRelMapEntry *entry; LogicalRepPartMapEntry *part_entry; LogicalRepRelation *remoterel = &root->remoterel; + Oid partOid = RelationGetRelid(partrel); AttrMap *attrmap = root->attrmap; bool found; @@ -615,7 +631,7 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, if (found && entry->localrelvalid) { entry->localrel = partrel; - return entry; + return part_entry; } /* Switch to longer-lived context. */ @@ -696,10 +712,292 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, /* Set if the table's replica identity is enough to apply update/delete. */ logicalrep_rel_mark_updatable(entry); + part_entry->usableIndexOid = LogicalRepUsableIndex(partrel, remoterel); + entry->localrelvalid = true; /* state and statelsn are left set to 0. */ MemoryContextSwitchTo(oldctx); - return entry; + return part_entry; +} + +/* + * GetIndexOidFromPath returns a valid index oid if + * the input path is an index path. + * Otherwise, return invalid oid. + */ +static +Oid +GetIndexOidFromPath(Path *cheapest_total_path) +{ + Oid indexOid; + + switch (cheapest_total_path->pathtype) + { + case T_IndexScan: + case T_IndexOnlyScan: + case T_BitmapIndexScan: + { + IndexPath *index_sc = (IndexPath *) cheapest_total_path; + indexOid = index_sc->indexinfo->indexoid; + + break; + } + + case T_BitmapHeapScan: + { + BitmapHeapPath *index_sc = (BitmapHeapPath *) cheapest_total_path; + + /* + * Accept only a simple bitmap scan with a single IndexPath, + * not AND/OR cases with multiple indexes. + */ + Path *bmqual = ((BitmapHeapPath *) index_sc)->bitmapqual; + + if (IsA(bmqual, IndexPath)) + { + IndexPath *index_sc = (IndexPath *) bmqual; + + indexOid = index_sc->indexinfo->indexoid; + } + else + { + indexOid = InvalidOid; + } + break; + } + + default: + indexOid = InvalidOid; + } + +#ifdef USE_ASSERT_CHECKING + + /* we never pick partial indexes */ + if(OidIsValid(indexOid)) + { + Relation indexRelation = index_open(indexOid, RowExclusiveLock); + IndexInfo *indexInfo = BuildIndexInfo(indexRelation); + Assert(indexInfo->ii_Predicate == NIL); + index_close(indexRelation, NoLock); + } +#endif + + return indexOid; +} + + +/* + * GetCheapestReplicaIdentityFullPath generates all the possible paths + * for the given subscriber relation, assuming that the source relation + * is replicated via REPLICA IDENTITY FULL. + * + * The function assumes that all the columns will be provided during + * the execution phase, given that REPLICA IDENTITY FULL gurantees + * that. + */ +static Path * +GetCheapestReplicaIdentityFullPath(Relation localrel) +{ + PlannerInfo *root; + Query *query; + PlannerGlobal *glob; + RangeTblEntry *rte; + RelOptInfo *rel; + int attno; + RangeTblRef *rt; + List *joinList; + + /* Set up mostly-dummy planner state */ + query = makeNode(Query); + query->commandType = CMD_SELECT; + + glob = makeNode(PlannerGlobal); + + root = makeNode(PlannerInfo); + root->parse = query; + root->glob = glob; + root->query_level = 1; + root->planner_cxt = CurrentMemoryContext; + root->wt_param_id = -1; + + /* Build a minimal RTE for the rel */ + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = localrel->rd_id; + rte->relkind = RELKIND_RELATION; + rte->rellockmode = AccessShareLock; + rte->lateral = false; + rte->inh = false; + rte->inFromCl = true; + query->rtable = list_make1(rte); + + rt = makeNode(RangeTblRef); + rt->rtindex = 1; + joinList = list_make1(rt); + + /* Set up RTE/RelOptInfo arrays */ + setup_simple_rel_arrays(root); + + /* Build RelOptInfo */ + rel = build_simple_rel(root, 1, NULL); + + /* + * Generate restrictions for all columns in the form of col_1 = $1 + * AND col_2 = $2 ... + */ + for (attno = 0; attno < RelationGetNumberOfAttributes(localrel); attno++) + { + Form_pg_attribute attr = TupleDescAttr(localrel->rd_att, attno); + + if (attr->attisdropped) + { + continue; + } + else + { + Expr *eq_op; + TypeCacheEntry *typentry; + RestrictInfo *restrict_info; + Var *leftarg; + Param *rightarg; + int varno = 1; + + typentry = lookup_type_cache(attr->atttypid, + TYPECACHE_EQ_OPR_FINFO); + + if (!OidIsValid(typentry->eq_opr)) + continue; /* no equality operator skip this column */ + + leftarg = + makeVar(varno, attr->attnum, attr->atttypid, attr->atttypmod, + attr->attcollation, 0); + + rightarg = makeNode(Param); + rightarg->paramkind = PARAM_EXTERN; + rightarg->paramid = list_length(rel->baserestrictinfo) + 1; + rightarg->paramtype = attr->atttypid; + rightarg->paramtypmod = attr->atttypmod; + rightarg->paramcollid = attr->attcollation; + rightarg->location = -1; + + eq_op = make_opclause(typentry->eq_opr, BOOLOID, false, + (Expr *) leftarg, (Expr *) rightarg, + InvalidOid, attr->attcollation); + + restrict_info = make_simple_restrictinfo(root, eq_op); + + rel->baserestrictinfo = lappend(rel->baserestrictinfo, restrict_info); + } + } + + /* + * Make sure the planner generates the relevant paths, including + * all the possible index scans as well as sequential scan. + */ + rel = make_one_rel(root, joinList); + set_cheapest(rel); + + Assert(rel->cheapest_total_path != NULL); + + return rel->cheapest_total_path; +} + + +/* + * FindUsableIndexForReplicaIdentityFull returns an index oid if + * the planner submodules picks index scans over sequential scan. + * + * Note that this is not a generic function, it expects REPLICA IDENTITY + * FULL for the remote relation. + */ +static Oid +FindUsableIndexForReplicaIdentityFull(Relation localrel) +{ + MemoryContext usableIndexContext; + MemoryContext oldctx; + Path *cheapest_total_path; + Oid indexOid; + + usableIndexContext = AllocSetContextCreate(CurrentMemoryContext, + "usableIndexContext", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(usableIndexContext); + + cheapest_total_path = GetCheapestReplicaIdentityFullPath(localrel); + + indexOid = GetIndexOidFromPath(cheapest_total_path); + + MemoryContextSwitchTo(oldctx); + + MemoryContextDelete(usableIndexContext); + + return indexOid; +} + +/* + * Get replica identity index or if it is not defined a primary key. + * + * If neither is defined, returns InvalidOid + */ +static Oid +GetRelationIdentityOrPK(Relation rel) +{ + Oid idxoid; + + idxoid = RelationGetReplicaIndex(rel); + + if (!OidIsValid(idxoid)) + idxoid = RelationGetPrimaryKeyIndex(rel); + + return idxoid; +} + +/* + * LogicalRepUsableIndex returns an index oid if we can use an index + * for the apply side. + */ +static Oid +LogicalRepUsableIndex(Relation localrel, LogicalRepRelation *remoterel) +{ + Oid idxoid; + + /* + * We never need index oid for partitioned tables, always rely on leaf + * partition's index. + */ + if (localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + return InvalidOid; + + /* simple case, we already have an identity or pkey */ + idxoid = GetRelationIdentityOrPK(localrel); + if (OidIsValid(idxoid)) + return idxoid; + + /* indexscans are disabled, use seq. scan */ + if (!enable_indexscan) + return InvalidOid; + + if (remoterel->replident == REPLICA_IDENTITY_FULL && + RelationGetIndexList(localrel) != NIL) + { + /* + * If we had a primary key or relation identity with a unique index, + * we would have already found a valid oid. At this point, the remote + * relation has replica identity full and we have at least one local + * index defined. + * + * We are looking for one more opportunity for using an index. If + * there are any indexes defined on the local relation, try to pick + * the cheapest index. + * + * The index selection safely assumes that all the columns are going + * to be available for the index scan given that remote relation has + * replica identity full. + */ + return FindUsableIndexForReplicaIdentityFull(localrel); + } + + return InvalidOid; } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5f8c541763..79fb34d639 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -331,6 +331,8 @@ static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot); +static Oid usable_indexoid_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo); static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, @@ -339,6 +341,7 @@ static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot); static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, + Oid localidxoid, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot); @@ -1586,24 +1589,6 @@ apply_handle_type(StringInfo s) logicalrep_read_typ(s, &typ); } -/* - * Get replica identity index or if it is not defined a primary key. - * - * If neither is defined, returns InvalidOid - */ -static Oid -GetRelationIdentityOrPK(Relation rel) -{ - Oid idxoid; - - idxoid = RelationGetReplicaIndex(rel); - - if (!OidIsValid(idxoid)) - idxoid = RelationGetPrimaryKeyIndex(rel); - - return idxoid; -} - /* * Check that we (the subscription owner) have sufficient privileges on the * target relation to perform the given operation. @@ -1753,7 +1738,7 @@ check_relation_updatable(LogicalRepRelMapEntry *rel) * We are in error mode so it's fine this is somewhat slow. It's better to * give user correct error. */ - if (OidIsValid(GetRelationIdentityOrPK(rel->localrel))) + if (OidIsValid(rel->usableIndexOid)) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -1896,11 +1881,13 @@ apply_handle_update_internal(ApplyExecutionData *edata, TupleTableSlot *localslot; bool found; MemoryContext oldctx; + Oid usableIndexOid = usable_indexoid_internal(edata, relinfo); EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); ExecOpenIndices(relinfo, false); found = FindReplTupleInLocalRel(estate, localrel, + usableIndexOid, &relmapentry->remoterel, remoteslot, &localslot); ExecClearTuple(remoteslot); @@ -2030,16 +2017,19 @@ apply_handle_delete_internal(ApplyExecutionData *edata, { EState *estate = edata->estate; Relation localrel = relinfo->ri_RelationDesc; - LogicalRepRelation *remoterel = &edata->targetRel->remoterel; + LogicalRepRelMapEntry *targetRel = edata->targetRel; + LogicalRepRelation *remoterel = &targetRel->remoterel; EPQState epqstate; TupleTableSlot *localslot; bool found; + Oid usableIndexOid = usable_indexoid_internal(edata, relinfo); EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); ExecOpenIndices(relinfo, false); - found = FindReplTupleInLocalRel(estate, localrel, remoterel, - remoteslot, &localslot); + + found = FindReplTupleInLocalRel(estate, localrel, usableIndexOid, + remoterel, remoteslot, &localslot); /* If found delete it. */ if (found) @@ -2069,6 +2059,49 @@ apply_handle_delete_internal(ApplyExecutionData *edata, EvalPlanQualEnd(&epqstate); } + +/* + * Decide whether we can pick an index for the relinfo (e.g., the relation) + * we're actually deleting/updating from. If it is a child partition of + * edata->targetRelInfo, find the index on the partition. + */ +static Oid +usable_indexoid_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo) +{ + Oid usableIndexOid; + ResultRelInfo *targetResultRelInfo = edata->targetRelInfo; + LogicalRepRelMapEntry *relmapentry = edata->targetRel; + + Oid targetrelid = targetResultRelInfo->ri_RelationDesc->rd_rel->oid; + Oid localrelid = relinfo->ri_RelationDesc->rd_id; + + if (targetrelid == localrelid) + { + /* target is a regular table */ + usableIndexOid = relmapentry->usableIndexOid; + } + else + { + /* + * Target is a partitioned table, get the index oid the partition. + */ + TupleConversionMap *map = relinfo->ri_RootToPartitionMap; + AttrMap *attrmap = map ? map->attrMap : NULL; + + LogicalRepPartMapEntry *part_entry = + logicalrep_partition_open(relmapentry, relinfo->ri_RelationDesc, + attrmap); + + Assert(targetResultRelInfo->ri_RelationDesc->rd_rel->relkind == + RELKIND_PARTITIONED_TABLE); + + usableIndexOid = part_entry->usableIndexOid; + } + + return usableIndexOid; +} + + /* * Try to find a tuple received from the publication side (in 'remoteslot') in * the corresponding local relation using either replica identity index, @@ -2078,11 +2111,11 @@ apply_handle_delete_internal(ApplyExecutionData *edata, */ static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, + Oid localidxoid, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot) { - Oid idxoid; bool found; /* @@ -2093,12 +2126,11 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel, *localslot = table_slot_create(localrel, &estate->es_tupleTable); - idxoid = GetRelationIdentityOrPK(localrel); - Assert(OidIsValid(idxoid) || + Assert(OidIsValid(localidxoid) || (remoterel->replident == REPLICA_IDENTITY_FULL)); - if (OidIsValid(idxoid)) - found = RelationFindReplTupleByIndex(localrel, idxoid, + if (OidIsValid(localidxoid)) + found = RelationFindReplTupleByIndex(localrel, localidxoid, LockTupleExclusive, remoteslot, *localslot); else @@ -2128,7 +2160,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot_part; TupleConversionMap *map; MemoryContext oldctx; - LogicalRepRelMapEntry *part_entry = NULL; + LogicalRepPartMapEntry *part_entry = NULL; AttrMap *attrmap = NULL; /* ModifyTableState is needed for ExecFindPartition(). */ @@ -2178,7 +2210,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, { part_entry = logicalrep_partition_open(relmapentry, partrel, attrmap); - check_relation_updatable(part_entry); + check_relation_updatable(&part_entry->relmapentry); } switch (operation) @@ -2202,13 +2234,17 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, * suitable partition. */ { + LogicalRepRelMapEntry *entry; TupleTableSlot *localslot; ResultRelInfo *partrelinfo_new; bool found; + entry = &part_entry->relmapentry; + /* Get the matching local tuple from the partition. */ found = FindReplTupleInLocalRel(estate, partrel, - &part_entry->remoterel, + part_entry->usableIndexOid, + &entry->remoterel, remoteslot_part, &localslot); if (!found) { @@ -2230,7 +2266,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, * remoteslot_part. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_modify_data(remoteslot_part, localslot, part_entry, + slot_modify_data(remoteslot_part, localslot, entry, newtup); MemoryContextSwitchTo(oldctx); diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 78cd7e77f5..2f8d9bffd0 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -31,19 +31,29 @@ typedef struct LogicalRepRelMapEntry Relation localrel; /* relcache entry (NULL when closed) */ AttrMap *attrmap; /* map of local attributes to remote ones */ bool updatable; /* Can apply updates/deletes? */ + Oid usableIndexOid; /* which index to use? (Invalid when no index + * used) */ /* Sync state. */ char state; XLogRecPtr statelsn; } LogicalRepRelMapEntry; +typedef struct LogicalRepPartMapEntry +{ + Oid partoid; /* LogicalRepPartMap's key */ + LogicalRepRelMapEntry relmapentry; + Oid usableIndexOid; /* which index to use? (Invalid when no index + * used) */ +} LogicalRepPartMapEntry; + extern void logicalrep_relmap_update(LogicalRepRelation *remoterel); extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel); extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode); -extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root, - Relation partrel, AttrMap *map); +extern LogicalRepPartMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root, + Relation partrel, AttrMap *map); extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode); diff --git a/src/test/subscription/t/032_subscribe_use_index.pl b/src/test/subscription/t/032_subscribe_use_index.pl new file mode 100644 index 0000000000..15c4897403 --- /dev/null +++ b/src/test/subscription/t/032_subscribe_use_index.pl @@ -0,0 +1,712 @@ +# Copyright (c) 2021-2022, PostgreSQL Global Development Group + +# Test logical replication behavior with subscriber uses available index +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# create publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + "wal_retrieve_retry_interval = 1ms"); +$node_subscriber->start; + +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +my $appname = 'tap_sub'; + +# ==================================================================== +# Testcase start: SUBSCRIPTION USES INDEX +# +# Basic test where the subscriber uses index +# and only touches 1 row +# + +# create tables pub and sub +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_replica_id_full (x int)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_replica_id_full (x int)"); +$node_subscriber->safe_psql('postgres', + "CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)"); + +# insert some initial data +$node_publisher->safe_psql('postgres', + "INSERT INTO test_replica_id_full SELECT i FROM generate_series(0,21)i;"); + +# create pub/sub +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full" +); + +# wait for initial table synchronization to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$node_publisher->safe_psql('postgres', + "UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;"); +$node_publisher->wait_for_catchup($appname); + +# we check if the index is used or not +my $result = $node_subscriber->safe_psql('postgres', + "select idx_scan from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';" +); +is($result, qq(1), + 'check subscriber tap_sub_rep_full_0 updates one row via index'); + +$node_publisher->safe_psql('postgres', + "DELETE FROM test_replica_id_full WHERE x = 20;"); +$node_publisher->wait_for_catchup($appname); + +# TODO: Somehow pg_stat_all_indexes gets updated a little late after DELETE +# figure out why +sleep(1); + +# we check if the index is used or not +my $result_del = $node_subscriber->safe_psql('postgres', + "select idx_scan from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';" +); +is($result_del, qq(2), + 'check subscriber tap_sub_rep_full_0 deletes one row via index'); + +# cleanup pub +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full"); +$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full"); +# cleanup sub +$node_subscriber->safe_psql('postgres', + "DROP SUBSCRIPTION tap_sub_rep_full_0"); +$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full"); + +# Testcase end: SUBSCRIPTION USES INDEX +# ==================================================================== + +# ==================================================================== +# Testcase start: SUBSCRIPTION USES INDEX MODIFIES MULTIPILE ROWS +# +# Basic test where the subscriber uses index +# and only touches multiple rows +# + +# create tables pub and sub +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_replica_id_full (x int)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_replica_id_full (x int)"); +$node_subscriber->safe_psql('postgres', + "CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)"); + +# insert some initial data within the range 0-1000 +$node_publisher->safe_psql('postgres', + "INSERT INTO test_replica_id_full SELECT i%20 FROM generate_series(0,1000)i;" +); + +# create pub/sub +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_rep_full_2 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full" +); + +# wait for initial table synchronization to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# updates 1000 rows +$node_publisher->safe_psql('postgres', + "UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;"); +$node_publisher->wait_for_catchup($appname); + + +my $idx_scan_2 = $node_subscriber->safe_psql('postgres', + "select idx_scan from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';" +); +is($idx_scan_2, qq(50), + 'check subscriber tap_sub_rep_full_2 updates 50 rows via index'); + +my $count_from_table = + $node_subscriber->safe_psql('postgres', + "select count(*) from test_replica_id_full where x = 15;"); +is($count_from_table, qq(0), + 'check subscriber tap_sub_rep_full_2 for no rows remaining with x=15'); + +# cleanup pub +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full"); +$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full"); +# cleanup sub +$node_subscriber->safe_psql('postgres', + "DROP SUBSCRIPTION tap_sub_rep_full_2"); +$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full"); + +# Testcase end: SUBSCRIPTION USES INDEX MODIFIES MULTIPILE ROWS +# ==================================================================== + + +# ==================================================================== +# Testcase start: SUBSCRIPTION USES INDEX WITH MULTIPILE COLUMNS +# +# Basic test where the subscriber uses index +# and only touches multiple rows +# + +# create tables pub and sub +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_replica_id_full (x int, y text)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_replica_id_full (x int, y text)"); +$node_subscriber->safe_psql('postgres', + "CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y)"); + +# insert some initial data within the range 0-1000 +$node_publisher->safe_psql('postgres', + "INSERT INTO test_replica_id_full SELECT (i%10), (i%10)::text FROM generate_series(0,1000)i;" +); + +# create pub/sub +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_rep_full_3 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full" +); + +# wait for initial table synchronization to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# updates 1000 rows +$node_publisher->safe_psql('postgres', + "UPDATE test_replica_id_full SET x = x + 1 WHERE x IN (5, 6);"); +$node_publisher->wait_for_catchup($appname); + + +my $idx_scan_3 = $node_subscriber->safe_psql('postgres', + "select idx_scan from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';" +); +is($idx_scan_3, qq(200), + 'check subscriber tap_sub_rep_full_3 updates 200 rows via index'); + +my $sum_from_table = + $node_subscriber->safe_psql('postgres', + "select sum(x+y::int) from test_replica_id_full;"); +is($sum_from_table, qq(9200), + 'check subscriber tap_sub_rep_full_3 for query result'); + +# cleanup pub +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full"); +$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full"); +# cleanup sub +$node_subscriber->safe_psql('postgres', + "DROP SUBSCRIPTION tap_sub_rep_full_3"); +$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full"); + +# Testcase end: SUBSCRIPTION USES INDEX MODIFIES MULTIPILE ROWS +# ==================================================================== + +# ==================================================================== +# Testcase start: SUBSCRIPTION USES INDEX WITH DROPPED COLUMNS +# +# Basic test where the subscriber uses index +# and only touches multiple rows +# + +# create tables pub and sub +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_replica_id_full (drop_1 jsonb, x int, drop_2 point, y text, drop_3 timestamptz)" +); +$node_publisher->safe_psql('postgres', + "ALTER TABLE test_replica_id_full DROP COLUMN drop_1"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE test_replica_id_full DROP COLUMN drop_2"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE test_replica_id_full DROP COLUMN drop_3"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_replica_id_full (drop_1 jsonb, x int, drop_2 point, y text, drop_3 timestamptz)" +); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_replica_id_full DROP COLUMN drop_1"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_replica_id_full DROP COLUMN drop_2"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_replica_id_full DROP COLUMN drop_3"); + +$node_subscriber->safe_psql('postgres', + "CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y)"); + +# insert some initial data within the range 0-1000 +$node_publisher->safe_psql('postgres', + "INSERT INTO test_replica_id_full SELECT (i%10), (i%10)::text FROM generate_series(0,1000)i;" +); + +# create pub/sub +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_rep_full_4 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full" +); + +# wait for initial table synchronization to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# updates 1000 rows +$node_publisher->safe_psql('postgres', + "UPDATE test_replica_id_full SET x = x + 1 WHERE x IN (5, 6);"); +$node_publisher->wait_for_catchup($appname); + + +my $idx_scan_4 = $node_subscriber->safe_psql('postgres', + "select idx_scan from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';" +); +is($idx_scan_4, qq(200), + 'check subscriber tap_sub_rep_full_4 updates 200 rows via index'); + +my $sum_from_table_2 = + $node_subscriber->safe_psql('postgres', + "select sum(x+y::int) from test_replica_id_full;"); +is($sum_from_table_2, qq(9200), + 'check subscriber tap_sub_rep_full_4 for query result'); + +# cleanup pub +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full"); +$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full"); +# cleanup sub +$node_subscriber->safe_psql('postgres', + "DROP SUBSCRIPTION tap_sub_rep_full_4"); +$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full"); + +# Testcase end: SUBSCRIPTION USES INDEX WITH DROPPED COLUMNS +# ==================================================================== + +# ==================================================================== +# Testcase start: SUBSCRIPTION USES INDEX ON PARTITIONED TABLES + +# create tables pub and sub +$node_publisher->safe_psql('postgres', + "CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);" +); +$node_publisher->safe_psql('postgres', + "CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);" +); +$node_publisher->safe_psql('postgres', + "CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);" +); + +$node_publisher->safe_psql('postgres', + "ALTER TABLE users_table_part REPLICA IDENTITY FULL;"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE users_table_part_0 REPLICA IDENTITY FULL;"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE users_table_part_1 REPLICA IDENTITY FULL;"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);" +); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);" +); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);" +); +$node_subscriber->safe_psql('postgres', + "CREATE INDEX users_table_part_idx ON users_table_part(user_id, value_1)" +); + +# insert some initial data within the range 0-1000 +$node_publisher->safe_psql('postgres', + "INSERT INTO users_table_part SELECT (i%100), (i%20), i FROM generate_series(0,1000)i;" +); + +# create pub/sub +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_rep_full FOR TABLE users_table_part"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_rep_full_5 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full" +); + +# wait for initial table synchronization to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# updates rows and moves between partitions +$node_publisher->safe_psql('postgres', + "UPDATE users_table_part SET value_1 = 0 WHERE user_id = 4;"); +$node_publisher->wait_for_catchup($appname); + +my $idx_scan_5 = $node_subscriber->safe_psql('postgres', + "select sum(idx_scan) from pg_stat_all_indexes where indexrelname ilike 'users_table_part_%';" +); +is($idx_scan_5, qq(10), + 'check subscriber tap_sub_rep_full_5 updates partitioned table'); + +# updates rows and moves between partitions +$node_publisher->safe_psql('postgres', + "DELETE FROM users_table_part WHERE user_id = 1 and value_1 = 1;"); +$node_publisher->safe_psql('postgres', + "DELETE FROM users_table_part WHERE user_id = 12 and value_1 = 12;"); + +$node_publisher->wait_for_catchup($appname); + +# TODO: Somehow pg_stat_all_indexes gets updated a little late after DELETE +# figure out why +sleep(1); + +my $idx_scan_6 = $node_subscriber->safe_psql('postgres', + "select sum(idx_scan) from pg_stat_all_indexes where indexrelname ilike 'users_table_part_%';" +); +is($idx_scan_6, qq(30), + 'check subscriber tap_sub_rep_full_5 deletes partitioned table'); + +# cleanup pub +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full"); +$node_publisher->safe_psql('postgres', "DROP TABLE users_table_part"); + +# cleanup sub +$node_subscriber->safe_psql('postgres', + "DROP SUBSCRIPTION tap_sub_rep_full_5"); +$node_subscriber->safe_psql('postgres', "DROP TABLE users_table_part"); + +# Testcase end: SUBSCRIPTION USES INDEX ON PARTITIONED TABLES +# ==================================================================== + +# ==================================================================== +# Testcase start: SUBSCRIPTION DOES NOT USE PARTIAL INDEX + +# create tables pub and sub +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_replica_id_full_part_index (x int);"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE test_replica_id_full_part_index REPLICA IDENTITY FULL;"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_replica_id_full_part_index (x int);"); +$node_subscriber->safe_psql('postgres', + "CREATE INDEX test_replica_id_full_part_idx ON test_replica_id_full_part_index(x) WHERE (x = 5);"); + +# insert some initial data +$node_publisher->safe_psql('postgres', + "INSERT INTO test_replica_id_full_part_index SELECT i FROM generate_series(0,21)i;"); + +# create pub/sub +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full_part_index"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full" +); + +# wait for initial table synchronization to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# update 2 rows, one of them is indexed +$node_publisher->safe_psql('postgres', + "UPDATE test_replica_id_full_part_index SET x = x + 1 WHERE x = 5;"); +$node_publisher->safe_psql('postgres', + "UPDATE test_replica_id_full_part_index SET x = x + 1 WHERE x = 15;"); +$node_publisher->wait_for_catchup($appname); + +# we check if the index is used or not +my $result_part_index = $node_subscriber->safe_psql('postgres', + "select idx_scan from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_part_idx';" +); +is($result_part_index, qq(0), + 'check subscriber tap_sub_rep_full_0 updates one row via seq. scan with with partial index'); + +$node_publisher->safe_psql('postgres', + "DELETE FROM test_replica_id_full_part_index WHERE x = 5;"); +$node_publisher->wait_for_catchup($appname); + +# TODO: Somehow pg_stat_all_indexes gets updated a little late after DELETE +# figure out why +sleep(1); + +# we check if the index is used or not +my $result_del_part_index = $node_subscriber->safe_psql('postgres', + "select idx_scan from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_part_idx';" +); +is($result_del_part_index, qq(0), + 'check subscriber tap_sub_rep_full_0 deletes one row via seq. scan with with partial index'); + +# cleanup pub +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full"); +$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full_part_index"); +# cleanup sub +$node_subscriber->safe_psql('postgres', + "DROP SUBSCRIPTION tap_sub_rep_full_0"); +$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full_part_index"); + +# Testcase end: SUBSCRIPTION DOES NOT USE PARTIAL INDEX +# ==================================================================== + + +# ==================================================================== +# Testcase start: SUBSCRIPTION DOES NOT USE INDEXES WITH ONLY EXPRESSIONS + +# create tables pub and sub +$node_publisher->safe_psql('postgres', + "CREATE TABLE people (firstname text, lastname text);"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE people REPLICA IDENTITY FULL;"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE people (firstname text, lastname text);"); +$node_subscriber->safe_psql('postgres', + "CREATE INDEX people_names ON people ((firstname || ' ' || lastname));"); + +# insert some initial data +$node_publisher->safe_psql('postgres', + "INSERT INTO people SELECT 'first_name_' || i::text, 'last_name_' || i::text FROM generate_series(0,200)i;"); + +# create pub/sub +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_rep_full FOR TABLE people"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full" +); + +# wait for initial table synchronization to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# update 2 rows +$node_publisher->safe_psql('postgres', + "UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_1';"); +$node_publisher->safe_psql('postgres', + "UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_2' AND lastname = 'last_name_2';"); +$node_publisher->wait_for_catchup($appname); + +# we check if the index is used or not +my $result_expr_index = $node_subscriber->safe_psql('postgres', + "select idx_scan from pg_stat_all_indexes where indexrelname = 'people_names';" +); +is($result_expr_index, qq(0), + 'check subscriber tap_sub_rep_full_0 updates two rows via seq. scan with index on expressions'); + +$node_publisher->safe_psql('postgres', + "DELETE FROM people WHERE firstname = 'first_name_3';"); +$node_publisher->safe_psql('postgres', + "DELETE FROM people WHERE firstname = 'first_name_4' AND lastname = 'last_name_4';"); + +$node_publisher->wait_for_catchup($appname); + +# TODO: Somehow pg_stat_all_indexes gets updated a little late after DELETE +# figure out why +sleep(1); + +# we check if the index is used or not +my $result_del_expr_index = $node_subscriber->safe_psql('postgres', + "select idx_scan from pg_stat_all_indexes where indexrelname = 'people_names';" +); +is($result_del_expr_index, qq(0), + 'check subscriber tap_sub_rep_full_0 deletes one row via seq. scan with index on expressions'); + +# cleanup pub +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full"); +$node_publisher->safe_psql('postgres', "DROP TABLE people"); +# cleanup sub +$node_subscriber->safe_psql('postgres', + "DROP SUBSCRIPTION tap_sub_rep_full_0"); +$node_subscriber->safe_psql('postgres', "DROP TABLE people"); + +# Testcase end: SUBSCRIPTION DOES NOT USE INDEXES WITH ONLY EXPRESSIONS +# ==================================================================== + +# ==================================================================== +# Testcase start: SUBSCRIPTION CAN USE INDEXES WITH EXPRESSIONS AND COLUMNS + +# create tables pub and sub +$node_publisher->safe_psql('postgres', + "CREATE TABLE people (firstname text, lastname text);"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE people REPLICA IDENTITY FULL;"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE people (firstname text, lastname text);"); +$node_subscriber->safe_psql('postgres', + "CREATE INDEX people_names ON people (firstname, lastname, (firstname || ' ' || lastname));"); + +# insert some initial data +$node_publisher->safe_psql('postgres', + "INSERT INTO people SELECT 'first_name_' || i::text, 'last_name_' || i::text FROM generate_series(0, 200)i;"); + +# create pub/sub +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_rep_full FOR TABLE people"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full" +); + +# wait for initial table synchronization to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# update 2 rows +$node_publisher->safe_psql('postgres', + "UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_1';"); +$node_publisher->safe_psql('postgres', + "UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_3' AND lastname = 'last_name_3';"); + +$node_publisher->wait_for_catchup($appname); + +# TODO: Somehow pg_stat_all_indexes gets updated a little late after +# figure out why +sleep(1); + +# we check if the index is used or not +my $result_expr_index_2 = $node_subscriber->safe_psql('postgres', + "select idx_scan from pg_stat_all_indexes where indexrelname = 'people_names';" +); +is($result_expr_index_2, qq(2), + 'check subscriber tap_sub_rep_full_0 updates two rows via index scan with index on expressions and columns'); + +$node_publisher->safe_psql('postgres', + "DELETE FROM people WHERE firstname = 'Nan';"); + +$node_publisher->wait_for_catchup($appname); + +# TODO: Somehow pg_stat_all_indexes gets updated a little late after DELETE +# figure out why +sleep(1); + +# we check if the index is used or not +my $result_expr_index_3 = $node_subscriber->safe_psql('postgres', + "select idx_scan from pg_stat_all_indexes where indexrelname = 'people_names';" +); +is($result_expr_index_3, qq(4), + 'check subscriber tap_sub_rep_full_0 deletes two rows via index scan with index on expressions and columns'); + +# cleanup pub +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full"); +$node_publisher->safe_psql('postgres', "DROP TABLE people"); +# cleanup sub +$node_subscriber->safe_psql('postgres', + "DROP SUBSCRIPTION tap_sub_rep_full_0"); +$node_subscriber->safe_psql('postgres', "DROP TABLE people"); + +# Testcase end: SUBSCRIPTION CAN USE INDEXES WITH EXPRESSIONS AND COLUMNS +# ==================================================================== + + +# ==================================================================== +# Testcase start: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE + +# create tables pub and sub +$node_publisher->safe_psql('postgres', + "CREATE TABLE test (column_a int, column_b int);"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE test REPLICA IDENTITY FULL;"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test (column_a int, column_b int);"); +$node_subscriber->safe_psql('postgres', + "CREATE INDEX index_a ON test (column_a);"); +$node_subscriber->safe_psql('postgres', + "CREATE INDEX index_b ON test (column_b);"); + +# insert some initial data +$node_publisher->safe_psql('postgres', + "INSERT INTO test SELECT i,0 FROM generate_series(0, 2000)i;"); + +# create pub/sub +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_rep_full FOR TABLE test"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full" +); + +# wait for initial table synchronization to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$node_subscriber->safe_psql('postgres', "ANALYZE test;"); + +# update 2 rows +$node_publisher->safe_psql('postgres', + "UPDATE test SET column_b = column_b + 1 WHERE column_a = 15;"); +$node_publisher->safe_psql('postgres', + "DELETE FROM test WHERE column_a = 20;"); + +$node_publisher->wait_for_catchup($appname); + +# TODO: Somehow pg_stat_all_indexes gets updated a little late after +# figure out why +sleep(1); + +# we check if the index is used or not +my $result_change_index_1 = $node_subscriber->safe_psql('postgres', + "select idx_scan from pg_stat_all_indexes where indexrelname = 'index_a';" +); +is($result_change_index_1, qq(2), + 'check subscriber tap_sub_rep_full_0 updates two rows via index scan with index on high cardinality column-1'); + +# do not use index_b +my $result_change_index_2 = $node_subscriber->safe_psql('postgres', + "select idx_scan from pg_stat_all_indexes where indexrelname = 'index_b';" +); +is($result_change_index_2, qq(0), + 'check subscriber tap_sub_rep_full_0 updates two rows via index scan with index on high cardinality column-2'); + + +# insert data such that the cardinality of column_b becomes much higher +$node_publisher->safe_psql('postgres', + "INSERT INTO test SELECT 0,i FROM generate_series(0, 20000)i;"); + +$node_publisher->wait_for_catchup($appname); + +$node_subscriber->safe_psql('postgres', "ANALYZE test;"); + +# update 2 rows +$node_publisher->safe_psql('postgres', + "UPDATE test SET column_a = column_a + 1 WHERE column_b = 150;"); +$node_publisher->safe_psql('postgres', + "DELETE FROM test WHERE column_b = 200;"); + +# TODO: Somehow pg_stat_all_indexes gets updated a little late after +# figure out why +sleep(1); + +# do not use index_a anymore +my $result_change_index_3 = $node_subscriber->safe_psql('postgres', + "select idx_scan from pg_stat_all_indexes where indexrelname = 'index_a';" +); +is($result_change_index_3, qq(2), + 'check subscriber tap_sub_rep_full_0 updates two rows via index scan with index on high cardinality column-3'); + +# now, use index_b as well +my $result_change_index_4 = $node_subscriber->safe_psql('postgres', + "select idx_scan from pg_stat_all_indexes where indexrelname = 'index_b';" +); +is($result_change_index_4, qq(2), + 'check subscriber tap_sub_rep_full_0 updates two rows via index scan with index on high cardinality column-4'); + +# cleanup pub +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full"); +$node_publisher->safe_psql('postgres', "DROP TABLE test"); +# cleanup sub +$node_subscriber->safe_psql('postgres', + "DROP SUBSCRIPTION tap_sub_rep_full_0"); +$node_subscriber->safe_psql('postgres', "DROP TABLE test"); + +# Testcase end: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE +# ==================================================================== + + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); + +done_testing(); -- 2.34.1