I wrote: > Michael Paquier <mich...@paquier.xyz> writes: >> On Wed, May 19, 2021 at 04:23:55PM -0400, Tom Lane wrote: >>> * Replace the edata->resultRelInfo field with two fields, one for >>> the original parent and one for the actual/current target. Perhaps >>> this is worth doing, not sure.
>> This one sounds more natural to me, though. > OK, I'll give that a try tomorrow. Here's a version that does it like that. I'm not entirely convinced whether this is better or not. regards, tom lane
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 1432554d5a..d19269ebce 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -178,6 +178,18 @@ static HTAB *xidhash = NULL; /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; +typedef struct ApplyExecutionData +{ + EState *estate; /* executor state, used to track resources */ + ResultRelInfo *origRelInfo; /* the originally-named target relation */ + ResultRelInfo *activeRelInfo; /* the actual target relation */ + /* activeRelInfo can equal origRelInfo, or be for a partition of it */ + + /* These fields are used when the target relation is partitioned: */ + ModifyTableState *mtstate; /* dummy ModifyTable state */ + PartitionTupleRouting *proute; /* partition routing info */ +} ApplyExecutionData; + typedef struct SubXactInfo { TransactionId xid; /* XID of the subxact */ @@ -226,21 +238,20 @@ static void apply_dispatch(StringInfo s); static void apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data); -static void apply_handle_insert_internal(ResultRelInfo *relinfo, - EState *estate, TupleTableSlot *remoteslot); -static void apply_handle_update_internal(ResultRelInfo *relinfo, - EState *estate, TupleTableSlot *remoteslot, +static void apply_handle_insert_internal(ApplyExecutionData *edata, + TupleTableSlot *remoteslot); +static void apply_handle_update_internal(ApplyExecutionData *edata, + TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry); -static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, +static void apply_handle_delete_internal(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepRelation *remoterel); static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot); -static void apply_handle_tuple_routing(ResultRelInfo *relinfo, - EState *estate, +static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry, @@ -336,20 +347,21 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) /* * Executor state preparation for evaluation of constraint expressions, - * indexes and triggers. + * indexes and triggers for the specified relation. * - * resultRelInfo is a ResultRelInfo for the relation to be passed to the - * executor routines. The caller must open and close any indexes to be - * updated independently of the relation registered here. + * Note that the caller must open and close any indexes to be updated. */ -static EState * -create_estate_for_relation(LogicalRepRelMapEntry *rel, - ResultRelInfo **resultRelInfo) +static ApplyExecutionData * +create_edata_for_relation(LogicalRepRelMapEntry *rel) { + ApplyExecutionData *edata; EState *estate; RangeTblEntry *rte; + ResultRelInfo *resultRelInfo; - estate = CreateExecutorState(); + edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData)); + + edata->estate = estate = CreateExecutorState(); rte = makeNode(RangeTblEntry); rte->rtekind = RTE_RELATION; @@ -358,13 +370,16 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel, rte->rellockmode = AccessShareLock; ExecInitRangeTable(estate, list_make1(rte)); - *resultRelInfo = makeNode(ResultRelInfo); + edata->origRelInfo = resultRelInfo = makeNode(ResultRelInfo); + + /* Initially, set activeRelInfo to be the named rel */ + edata->activeRelInfo = resultRelInfo; /* * Use Relation opened by logicalrep_rel_open() instead of opening it * again. */ - InitResultRelInfo(*resultRelInfo, rel->localrel, 1, NULL, 0); + InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0); /* * We put the ResultRelInfo in the es_opened_result_relations list, even @@ -377,29 +392,38 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel, * an apply operation being responsible for that. */ estate->es_opened_result_relations = - lappend(estate->es_opened_result_relations, *resultRelInfo); + lappend(estate->es_opened_result_relations, resultRelInfo); estate->es_output_cid = GetCurrentCommandId(true); /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); - return estate; + /* other fields of edata remain NULL for now */ + + return edata; } /* * Finish any operations related to the executor state created by - * create_estate_for_relation(). + * create_edata_for_relation(). */ static void -finish_estate(EState *estate) +finish_edata(ApplyExecutionData *edata) { + EState *estate = edata->estate; + /* Handle any queued AFTER triggers. */ AfterTriggerEndQuery(estate); + /* Shut down tuple routing, if any was done. */ + if (edata->proute) + ExecCleanupTupleRouting(edata->mtstate, edata->proute); + /* Cleanup. */ ExecResetTupleTable(estate->es_tupleTable, false); FreeExecutorState(estate); + pfree(edata); } /* @@ -1181,10 +1205,10 @@ GetRelationIdentityOrPK(Relation rel) static void apply_handle_insert(StringInfo s) { - ResultRelInfo *resultRelInfo; LogicalRepRelMapEntry *rel; LogicalRepTupleData newtup; LogicalRepRelId relid; + ApplyExecutionData *edata; EState *estate; TupleTableSlot *remoteslot; MemoryContext oldctx; @@ -1207,7 +1231,8 @@ apply_handle_insert(StringInfo s) } /* Initialize the executor state. */ - estate = create_estate_for_relation(rel, &resultRelInfo); + edata = create_edata_for_relation(rel); + estate = edata->estate; remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); @@ -1223,15 +1248,14 @@ apply_handle_insert(StringInfo s) /* For a partitioned table, insert the tuple into a partition. */ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - apply_handle_tuple_routing(resultRelInfo, estate, + apply_handle_tuple_routing(edata, remoteslot, NULL, rel, CMD_INSERT); else - apply_handle_insert_internal(resultRelInfo, estate, - remoteslot); + apply_handle_insert_internal(edata, remoteslot); PopActiveSnapshot(); - finish_estate(estate); + finish_edata(edata); logicalrep_rel_close(rel, NoLock); @@ -1240,9 +1264,13 @@ apply_handle_insert(StringInfo s) /* Workhorse for apply_handle_insert() */ static void -apply_handle_insert_internal(ResultRelInfo *relinfo, - EState *estate, TupleTableSlot *remoteslot) +apply_handle_insert_internal(ApplyExecutionData *edata, + TupleTableSlot *remoteslot) { + EState *estate = edata->estate; + ResultRelInfo *relinfo = edata->activeRelInfo; + + /* We must open indexes here. */ ExecOpenIndices(relinfo, false); /* Do the insert. */ @@ -1293,9 +1321,9 @@ check_relation_updatable(LogicalRepRelMapEntry *rel) static void apply_handle_update(StringInfo s) { - ResultRelInfo *resultRelInfo; LogicalRepRelMapEntry *rel; LogicalRepRelId relid; + ApplyExecutionData *edata; EState *estate; LogicalRepTupleData oldtup; LogicalRepTupleData newtup; @@ -1326,7 +1354,8 @@ apply_handle_update(StringInfo s) check_relation_updatable(rel); /* Initialize the executor state. */ - estate = create_estate_for_relation(rel, &resultRelInfo); + edata = create_edata_for_relation(rel); + estate = edata->estate; remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); @@ -1368,15 +1397,15 @@ apply_handle_update(StringInfo s) /* For a partitioned table, apply update to correct partition. */ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - apply_handle_tuple_routing(resultRelInfo, estate, + apply_handle_tuple_routing(edata, remoteslot, &newtup, rel, CMD_UPDATE); else - apply_handle_update_internal(resultRelInfo, estate, + apply_handle_update_internal(edata, remoteslot, &newtup, rel); PopActiveSnapshot(); - finish_estate(estate); + finish_edata(edata); logicalrep_rel_close(rel, NoLock); @@ -1385,11 +1414,13 @@ apply_handle_update(StringInfo s) /* Workhorse for apply_handle_update() */ static void -apply_handle_update_internal(ResultRelInfo *relinfo, - EState *estate, TupleTableSlot *remoteslot, +apply_handle_update_internal(ApplyExecutionData *edata, + TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry) { + EState *estate = edata->estate; + ResultRelInfo *relinfo = edata->activeRelInfo; Relation localrel = relinfo->ri_RelationDesc; EPQState epqstate; TupleTableSlot *localslot; @@ -1448,10 +1479,10 @@ apply_handle_update_internal(ResultRelInfo *relinfo, static void apply_handle_delete(StringInfo s) { - ResultRelInfo *resultRelInfo; LogicalRepRelMapEntry *rel; LogicalRepTupleData oldtup; LogicalRepRelId relid; + ApplyExecutionData *edata; EState *estate; TupleTableSlot *remoteslot; MemoryContext oldctx; @@ -1477,7 +1508,8 @@ apply_handle_delete(StringInfo s) check_relation_updatable(rel); /* Initialize the executor state. */ - estate = create_estate_for_relation(rel, &resultRelInfo); + edata = create_edata_for_relation(rel); + estate = edata->estate; remoteslot = ExecInitExtraTupleSlot(estate, RelationGetDescr(rel->localrel), &TTSOpsVirtual); @@ -1491,15 +1523,14 @@ apply_handle_delete(StringInfo s) /* For a partitioned table, apply delete to correct partition. */ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - apply_handle_tuple_routing(resultRelInfo, estate, + apply_handle_tuple_routing(edata, remoteslot, NULL, rel, CMD_DELETE); else - apply_handle_delete_internal(resultRelInfo, estate, - remoteslot, &rel->remoterel); + apply_handle_delete_internal(edata, remoteslot, &rel->remoterel); PopActiveSnapshot(); - finish_estate(estate); + finish_edata(edata); logicalrep_rel_close(rel, NoLock); @@ -1508,10 +1539,12 @@ apply_handle_delete(StringInfo s) /* Workhorse for apply_handle_delete() */ static void -apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate, +apply_handle_delete_internal(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepRelation *remoterel) { + EState *estate = edata->estate; + ResultRelInfo *relinfo = edata->activeRelInfo; Relation localrel = relinfo->ri_RelationDesc; EPQState epqstate; TupleTableSlot *localslot; @@ -1582,16 +1615,17 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel, * This handles insert, update, delete on a partitioned table. */ static void -apply_handle_tuple_routing(ResultRelInfo *relinfo, - EState *estate, +apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, LogicalRepRelMapEntry *relmapentry, CmdType operation) { + EState *estate = edata->estate; + ResultRelInfo *relinfo = edata->origRelInfo; Relation parentrel = relinfo->ri_RelationDesc; - ModifyTableState *mtstate = NULL; - PartitionTupleRouting *proute = NULL; + ModifyTableState *mtstate; + PartitionTupleRouting *proute; ResultRelInfo *partrelinfo; Relation partrel; TupleTableSlot *remoteslot_part; @@ -1599,12 +1633,14 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, MemoryContext oldctx; /* ModifyTableState is needed for ExecFindPartition(). */ - mtstate = makeNode(ModifyTableState); + edata->mtstate = mtstate = makeNode(ModifyTableState); mtstate->ps.plan = NULL; mtstate->ps.state = estate; mtstate->operation = operation; mtstate->resultRelInfo = relinfo; - proute = ExecSetupPartitionTupleRouting(estate, parentrel); + + /* ... as is PartitionTupleRouting. */ + edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel); /* * Find the partition to which the "search tuple" belongs. @@ -1616,6 +1652,9 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, Assert(partrelinfo != NULL); partrel = partrelinfo->ri_RelationDesc; + /* Make that partition the active target rel. */ + edata->activeRelInfo = partrelinfo; + /* * To perform any of the operations below, the tuple must match the * partition's rowtype. Convert if needed or just copy, using a dedicated @@ -1638,12 +1677,12 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, switch (operation) { case CMD_INSERT: - apply_handle_insert_internal(partrelinfo, estate, + apply_handle_insert_internal(edata, remoteslot_part); break; case CMD_DELETE: - apply_handle_delete_internal(partrelinfo, estate, + apply_handle_delete_internal(edata, remoteslot_part, &relmapentry->remoterel); break; @@ -1757,11 +1796,12 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, Assert(partrelinfo_new != partrelinfo); /* DELETE old tuple found in the old partition. */ - apply_handle_delete_internal(partrelinfo, estate, + apply_handle_delete_internal(edata, localslot, &relmapentry->remoterel); /* INSERT new tuple into the new partition. */ + edata->activeRelInfo = partrelinfo_new; /* * Convert the replacement tuple to match the destination @@ -1787,7 +1827,7 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, slot_getallattrs(remoteslot); } MemoryContextSwitchTo(oldctx); - apply_handle_insert_internal(partrelinfo_new, estate, + apply_handle_insert_internal(edata, remoteslot_part); } } @@ -1797,8 +1837,6 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo, elog(ERROR, "unrecognized CmdType: %d", (int) operation); break; } - - ExecCleanupTupleRouting(mtstate, proute); } /*