Hi, I've been experimenting with executor nodes inspired by papers on executor robustness (think "Algorithms that don't explode if an estimate is off."). And I decided to use the CustomScan API, because it seemed like an ideal solution for my experiments - convenient, isolated, easy to try on other releases, ...
I'm going to discuss a couple issues with using CustomScan for joins, and propose some improvements to the CustomScan API to address those. I'd welcome feedback on the proposed changes / ideas of alternative approaches, etc. Of course, I may be wrong / missing something about the CustomScan design, feel free to point that out. For the "scan" algorithm (SmoothScan [1]), this mostly worked fine. I ended up copying some (a lot) of the code matching clauses to an index from indexscan planning, and that's a bit ugly. But the CustomScan API does not promise to address that, so it did not surprise me, and I accept that and I can deal with that (at least for now). Other than that, the CustomScan worked fine for my experimental "scan" method. But when implementing a custom join (generalized join [2]), it was a quite different story. The CustomScan claims to support joins, you just need to use the set_join_pathlist hook, and set a couple fields in the plan/executor nodes differently. Like, leave scanrelid=0 etc. And that kinda works for the planning phase, but at execution it turned out to be much trickier. The main hurdle I ran into is how do you construct the result tuple? In regular joins, you can do that by setting ecxt_innertuple/ecxt_outertuple, and calling ExecProject(). Or something along those lines. But for CustomScan joins, that's not possible - the targetlist is modified so that all the Vars have INDEX_VAR, as pointed out by a comment in primnodes.h: > In ForeignScan and CustomScan plan nodes, INDEX_VAR is abused to > signify references to columns of a custom scan tuple type. Which makes sense, because while the CustomScan can have nested plans, it does not have a concept of an explicit inner/outer plan. It seemed to me I'd have to essentially build the tuples "on my own", which seems quite tricky and inconvenient. And also a bit against the idea of CustomScan shielding extensions from this kind of "core" stuff. I may be entirely wrong, of course. Perhaps I'm missing something, and there's a simple way to do this? I tried to look at existing extensions implementing joins through CustomScan, but there are not that many, and I haven't found any good solution. I also tried reading through the ~2014 threads related to CustomScan, and how it got modified to allow joins. But I don't see this discussed there either. It seems to me the CustomScan received the minimum amount of "tweaks" to allow joins, but it's not very practical. I realize ForeignScan supports joins in a very similar way (i.e. you leave scanrelid=0, etc.). But I think there's a difference - the foreign join code is not really supposed to build the tuples, it gets the "formed" tuples from somewhere else, more or less. For example postgres_fdw deparses a query, sends it somewhere, that other node does the actual join, builds the tuple from inner/outer, and sends it back. The postgres_fdw code does not need to worry about mapping the target list to inner/outer etc. This does not work for CustomScan I think (unless it's doing the same sort of query offloading). I did ask for suggestions on Discord what's the right way to do this with CustomScan and joins, and the response was that this may not have been thought through very carefully, and some improvements may be necessary to make it more convenient. So I decided to give this a try. The way I see it, most of the issues stem from grafting joins onto an interface that's designed for scans. The whole sequence of custom nodes: CustomPath -> CustomScan -> CustomScanState is based on scans. CustomPath "inherits" from Path, CustomScan from "Scan", CustomScanState from "ScanState". It's not clear to me how to make this work with "JoinPath", "JoinPlan" and "JoinState" in a reasonable way. I suppose some of the "join" data can be stashed in the private fields, of the structs, but then the various planner/executor parts need to know about that in some way. How else would setrefs do the right thing with translating the targetlist into inner/outer references? (Maybe it could be done in PlanCustomPath, but it seems too early?) The "proper" way seems to be to have separate nodes for joins: CustomJoinPath -> CustomJoin -> CustomJoinState The attached PoC patch does that (except for the CustomJoinPath, it can do with CustomPath for now). It's more or less a copy-paste adjusting all the places modifying all the places with "case CustomScan" to also deal with "CustomJoin" - either in the same way, or sometimes in a way that works for joins. It's fairly mechanical. With this patch, my custom join can simply do econtext->ecxt_outertuple = outer; econtext->ecxt_innertuple = inner; return ExecProject(node->js.ps.ps_ProjInfo); and it works. One thing that surprised me a bit is that there's no testing extension implementing a simple custom scan/join. So it's hard to show this :-( I'm sure there's places that need more work (some of which are marked with FIXME, and I probably missed some). But it surprised me how small the patch is - most of it is the mechanical adjustments of switches. It would get a bit larger, e.g. due to sgml docs (which the PoC patch does not update). The patch also renames a couple structs to have "Scan" in them, e.g. CustomExecMethods are now CustomScanExecMethods. This is necessary because the methods get the "plan state" of the particular type (i.e. CustomScanState vs. CustomJoinState), etc. I guess we could do with some "shared" state, but it seems like a recipe for confusion, and I'm not sure it'd remove that much "code" anyway. So that's what I have for now. Note: I mentioned some extensions implementing SmoothScan/G-join. I plan to publish those once I polish that a bit more. It's more a research rather than something ready to use right now. regards [1] https://scholar.harvard.edu/files/stratos/files/smooth_vldbj.pdf [2] https://dl.gi.de/server/api/core/bitstreams/ce8e3fab-0bac-45fc-a6d4-66edaa52d574/content -- Tomas Vondra
From cfa70d2414dd5ad3c447af700ce55ae18a77f774 Mon Sep 17 00:00:00 2001 From: Tomas Vondra <to...@vondra.me> Date: Tue, 22 Jul 2025 20:56:27 +0200 Subject: [PATCH v1] PoC: introduce CustomJoin as separate from CustomScan Introduce a new CustomJoin node, intended for out-of-core join nodes. The CustomScan is supposed to allow that, but there's a couple pieces that make it inconvenient. The targetlist uses INDEX_VAR, so it's not clear which relation the Vars come from, etc. So this adds a new node, descended from Join (not Scan), with all the processing specific to joins (in setrefs etc.). Experimental. Does are not updated, some bits may be irrelevant for joins, etc. --- src/backend/commands/explain.c | 77 ++++++++- src/backend/executor/execAmi.c | 4 + src/backend/executor/execParallel.c | 20 +++ src/backend/executor/execProcnode.c | 12 ++ src/backend/executor/nodeCustom.c | 199 +++++++++++++++++++++++- src/backend/nodes/extensible.c | 25 +++ src/backend/nodes/gen_node_support.pl | 26 +++- src/backend/nodes/nodeFuncs.c | 7 + src/backend/optimizer/plan/createplan.c | 73 +++++++++ src/backend/optimizer/plan/setrefs.c | 44 ++++++ src/backend/optimizer/plan/subselect.c | 26 ++++ src/backend/utils/adt/ruleutils.c | 2 + src/include/executor/nodeCustom.h | 20 +++ src/include/nodes/execnodes.h | 30 +++- src/include/nodes/extensible.h | 60 ++++++- src/include/nodes/plannodes.h | 38 +++++ src/tools/pgindent/typedefs.list | 6 +- 17 files changed, 653 insertions(+), 16 deletions(-) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 7e2792ead71..86bff53692d 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -159,8 +159,10 @@ static void ExplainMemberNodes(PlanState **planstates, int nplans, static void ExplainMissingMembers(int nplans, int nchildren, ExplainState *es); static void ExplainSubPlans(List *plans, List *ancestors, const char *relationship, ExplainState *es); -static void ExplainCustomChildren(CustomScanState *css, - List *ancestors, ExplainState *es); +static void ExplainCustomScanChildren(CustomScanState *css, + List *ancestors, ExplainState *es); +static void ExplainCustomJoinChildren(CustomJoinState *cjs, + List *ancestors, ExplainState *es); static ExplainWorkersState *ExplainCreateWorkersState(int num_workers); static void ExplainOpenWorker(int n, ExplainState *es); static void ExplainCloseWorker(int n, ExplainState *es); @@ -1210,6 +1212,10 @@ ExplainPreScanNode(PlanState *planstate, Bitmapset **rels_used) *rels_used = bms_add_members(*rels_used, ((CustomScan *) plan)->custom_relids); break; + case T_CustomJoin: + *rels_used = bms_add_members(*rels_used, + ((CustomJoin *) plan)->custom_relids); + break; case T_ModifyTable: *rels_used = bms_add_member(*rels_used, ((ModifyTable *) plan)->nominalRelation); @@ -1305,6 +1311,18 @@ plan_is_disabled(Plan *plan) child_disabled_nodes += subplan->disabled_nodes; } } + else if (IsA(plan, CustomJoin)) + { + ListCell *lc; + CustomJoin *cplan = (CustomJoin *) plan; + + foreach(lc, cplan->custom_plans) + { + Plan *subplan = lfirst(lc); + + child_disabled_nodes += subplan->disabled_nodes; + } + } else { /* @@ -1513,6 +1531,15 @@ ExplainNode(PlanState *planstate, List *ancestors, else pname = sname; break; + case T_CustomJoin: + /* XXX jointype switch adds another "Join" (so it's there twice) */ + sname = "Custom Join"; + custom_name = ((CustomJoin *) plan)->methods->CustomName; + if (custom_name) + pname = psprintf("Custom Join (%s)", custom_name); + else + pname = sname; + break; case T_Material: pname = sname = "Materialize"; break; @@ -1712,6 +1739,7 @@ ExplainNode(PlanState *planstate, List *ancestors, case T_NestLoop: case T_MergeJoin: case T_HashJoin: + case T_CustomJoin: { const char *jointype; @@ -2155,6 +2183,25 @@ ExplainNode(PlanState *planstate, List *ancestors, css->methods->ExplainCustomScan(css, ancestors, es); } break; + case T_CustomJoin: + { + CustomJoinState *cjs = (CustomJoinState *) planstate; + + show_upper_qual(((CustomJoin *) plan)->join.joinqual, + "Join Filter", planstate, ancestors, es); + if (((CustomJoin *) plan)->join.joinqual) + show_instrumentation_count("Rows Removed by Join Filter", 1, + planstate, es); + + show_scan_qual(plan->qual, "Filter", planstate, ancestors, es); + if (plan->qual) + show_instrumentation_count("Rows Removed by Filter", 1, + planstate, es); + + if (cjs->methods->ExplainCustomJoin) + cjs->methods->ExplainCustomJoin(cjs, ancestors, es); + } + break; case T_NestLoop: show_upper_qual(((NestLoop *) plan)->join.joinqual, "Join Filter", planstate, ancestors, es); @@ -2354,6 +2401,8 @@ ExplainNode(PlanState *planstate, List *ancestors, IsA(plan, SubqueryScan) || (IsA(planstate, CustomScanState) && ((CustomScanState *) planstate)->custom_ps != NIL) || + (IsA(planstate, CustomJoinState) && + ((CustomJoinState *) planstate)->custom_ps != NIL) || planstate->subPlan; if (haschildren) { @@ -2404,8 +2453,12 @@ ExplainNode(PlanState *planstate, List *ancestors, "Subquery", NULL, es); break; case T_CustomScan: - ExplainCustomChildren((CustomScanState *) planstate, - ancestors, es); + ExplainCustomScanChildren((CustomScanState *) planstate, + ancestors, es); + break; + case T_CustomJoin: + ExplainCustomJoinChildren((CustomJoinState *) planstate, + ancestors, es); break; default: break; @@ -4814,7 +4867,7 @@ ExplainSubPlans(List *plans, List *ancestors, * Explain a list of children of a CustomScan. */ static void -ExplainCustomChildren(CustomScanState *css, List *ancestors, ExplainState *es) +ExplainCustomScanChildren(CustomScanState *css, List *ancestors, ExplainState *es) { ListCell *cell; const char *label = @@ -4824,6 +4877,20 @@ ExplainCustomChildren(CustomScanState *css, List *ancestors, ExplainState *es) ExplainNode((PlanState *) lfirst(cell), ancestors, label, NULL, es); } +/* + * Explain a list of children of a CustomJoin. + */ +static void +ExplainCustomJoinChildren(CustomJoinState *cjs, List *ancestors, ExplainState *es) +{ + ListCell *cell; + const char *label = + (list_length(cjs->custom_ps) != 1 ? "children" : "child"); + + foreach(cell, cjs->custom_ps) + ExplainNode((PlanState *) lfirst(cell), ancestors, label, NULL, es); +} + /* * Create a per-plan-node workspace for collecting per-worker data. * diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c index 1d0e8ad57b4..10da51834f5 100644 --- a/src/backend/executor/execAmi.c +++ b/src/backend/executor/execAmi.c @@ -238,6 +238,10 @@ ExecReScan(PlanState *node) ExecReScanCustomScan((CustomScanState *) node); break; + case T_CustomJoinState: + ExecReScanCustomJoin((CustomJoinState *) node); + break; + case T_NestLoopState: ExecReScanNestLoop((NestLoopState *) node); break; diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index fc76f22fb82..9650310e852 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -276,6 +276,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecCustomScanEstimate((CustomScanState *) planstate, e->pcxt); break; + case T_CustomJoinState: + if (planstate->plan->parallel_aware) + ExecCustomJoinEstimate((CustomJoinState *) planstate, + e->pcxt); + break; case T_BitmapHeapScanState: if (planstate->plan->parallel_aware) ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate, @@ -503,6 +508,11 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecCustomScanInitializeDSM((CustomScanState *) planstate, d->pcxt); break; + case T_CustomJoinState: + if (planstate->plan->parallel_aware) + ExecCustomJoinInitializeDSM((CustomJoinState *) planstate, + d->pcxt); + break; case T_BitmapHeapScanState: if (planstate->plan->parallel_aware) ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate, @@ -1003,6 +1013,11 @@ ExecParallelReInitializeDSM(PlanState *planstate, ExecCustomScanReInitializeDSM((CustomScanState *) planstate, pcxt); break; + case T_CustomJoinState: + if (planstate->plan->parallel_aware) + ExecCustomJoinReInitializeDSM((CustomJoinState *) planstate, + pcxt); + break; case T_BitmapHeapScanState: if (planstate->plan->parallel_aware) ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate, @@ -1371,6 +1386,11 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) ExecCustomScanInitializeWorker((CustomScanState *) planstate, pwcxt); break; + case T_CustomJoinState: + if (planstate->plan->parallel_aware) + ExecCustomJoinInitializeWorker((CustomJoinState *) planstate, + pwcxt); + break; case T_BitmapHeapScanState: if (planstate->plan->parallel_aware) ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index f5f9cfbeead..c005974507c 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -309,6 +309,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags) estate, eflags); break; + case T_CustomJoin: + result = (PlanState *) ExecInitCustomJoin((CustomJoin *) node, + estate, eflags); + break; + /* * materialization nodes */ @@ -699,6 +704,10 @@ ExecEndNode(PlanState *node) ExecEndHashJoin((HashJoinState *) node); break; + case T_CustomJoinState: + ExecEndCustomJoin((CustomJoinState *) node); + break; + /* * materialization nodes */ @@ -817,6 +826,9 @@ ExecShutdownNode_walker(PlanState *node, void *context) case T_HashJoinState: ExecShutdownHashJoin((HashJoinState *) node); break; + case T_CustomJoinState: + ExecShutdownCustomJoin((CustomJoinState *) node); + break; default: break; } diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c index ac2196b64c7..38d75fc6341 100644 --- a/src/backend/executor/nodeCustom.c +++ b/src/backend/executor/nodeCustom.c @@ -20,6 +20,7 @@ #include "utils/rel.h" static TupleTableSlot *ExecCustomScan(PlanState *pstate); +static TupleTableSlot *ExecCustomJoin(PlanState *pstate); CustomScanState * @@ -160,7 +161,7 @@ ExecCustomRestrPos(CustomScanState *node) void ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt) { - const CustomExecMethods *methods = node->methods; + const CustomScanExecMethods *methods = node->methods; if (methods->EstimateDSMCustomScan) { @@ -173,7 +174,7 @@ ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt) void ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt) { - const CustomExecMethods *methods = node->methods; + const CustomScanExecMethods *methods = node->methods; if (methods->InitializeDSMCustomScan) { @@ -189,7 +190,7 @@ ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt) void ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt) { - const CustomExecMethods *methods = node->methods; + const CustomScanExecMethods *methods = node->methods; if (methods->ReInitializeDSMCustomScan) { @@ -205,7 +206,7 @@ void ExecCustomScanInitializeWorker(CustomScanState *node, ParallelWorkerContext *pwcxt) { - const CustomExecMethods *methods = node->methods; + const CustomScanExecMethods *methods = node->methods; if (methods->InitializeWorkerCustomScan) { @@ -220,8 +221,196 @@ ExecCustomScanInitializeWorker(CustomScanState *node, void ExecShutdownCustomScan(CustomScanState *node) { - const CustomExecMethods *methods = node->methods; + const CustomScanExecMethods *methods = node->methods; if (methods->ShutdownCustomScan) methods->ShutdownCustomScan(node); } + +CustomJoinState * +ExecInitCustomJoin(CustomJoin *cjoin, EState *estate, int eflags) +{ + CustomJoinState *cjs; + const TupleTableSlotOps *slotOps; + Relation scan_rel = NULL; + // FIXME + Index scanrelid = 0; + // Index scanrelid = cscan->scan.scanrelid; + // int tlistvarno; + + /* + * Allocate the CustomJoinState object. We let the custom scan provider + * do the palloc, in case it wants to make a larger object that embeds + * CustomJoinState as the first field. It must set the node tag and the + * methods field correctly at this time. Other standard fields should be + * set to zero. + */ + cjs = castNode(CustomJoinState, + cjoin->methods->CreateCustomJoinState(cjoin)); + + /* ensure flags is filled correctly */ + cjs->flags = cjoin->flags; + + /* fill up fields of ScanState */ + cjs->js.ps.plan = &cjoin->join.plan; + cjs->js.ps.state = estate; + cjs->js.ps.ExecProcNode = ExecCustomJoin; + + /* create expression context for node */ + ExecAssignExprContext(estate, &cjs->js.ps); + + /* + * open the scan relation, if any + */ + if (scanrelid > 0) + { + scan_rel = ExecOpenScanRelation(estate, scanrelid, eflags); + // FIXME + // cjs->js.ss_currentRelation = scan_rel; + } + + /* + * Use a custom slot if specified in CustomJoinState or use virtual slot + * otherwise. + */ + slotOps = cjs->slotOps; + if (!slotOps) + slotOps = &TTSOpsVirtual; + + /* + * Determine the scan tuple type. If the custom scan provider provided a + * targetlist describing the scan tuples, use that; else use base + * relation's rowtype. + */ + if (cjoin->custom_join_tlist != NIL || scan_rel == NULL) + { + // TupleDesc scan_tupdesc; + + // FIXME + // scan_tupdesc = ExecTypeFromTL(cjoin->custom_scan_tlist); + // ExecInitScanTupleSlot(estate, &cjs->js, scan_tupdesc, slotOps); + /* Node's targetlist will contain Vars with varno = INDEX_VAR */ + // tlistvarno = INDEX_VAR; + } + else + { + // FIXME + // ExecInitScanTupleSlot(estate, &cjs->js, RelationGetDescr(scan_rel), + // slotOps); + /* Node's targetlist will contain Vars with varno = scanrelid */ + // tlistvarno = scanrelid; + } + + /* + * Initialize result slot, type and projection. + */ + ExecInitResultTupleSlotTL(&cjs->js.ps, &TTSOpsVirtual); + // FIXME + // ExecAssignScanProjectionInfoWithVarno(&cjs->js, tlistvarno); + + /* initialize child expressions */ + cjs->js.ps.qual = + ExecInitQual(cjoin->join.plan.qual, (PlanState *) cjs); + + /* + * The callback of custom-scan provider applies the final initialization + * of the custom-scan-state node according to its logic. + */ + cjs->methods->BeginCustomJoin(cjs, estate, eflags); + + return cjs; +} + +static TupleTableSlot * +ExecCustomJoin(PlanState *pstate) +{ + CustomJoinState *node = castNode(CustomJoinState, pstate); + + CHECK_FOR_INTERRUPTS(); + + Assert(node->methods->ExecCustomJoin != NULL); + return node->methods->ExecCustomJoin(node); +} + +void +ExecEndCustomJoin(CustomJoinState *node) +{ + Assert(node->methods->EndCustomJoin != NULL); + node->methods->EndCustomJoin(node); +} + +void +ExecReScanCustomJoin(CustomJoinState *node) +{ + Assert(node->methods->ReScanCustomJoin != NULL); + node->methods->ReScanCustomJoin(node); +} + +void +ExecCustomJoinEstimate(CustomJoinState *node, ParallelContext *pcxt) +{ + const CustomJoinExecMethods *methods = node->methods; + + if (methods->EstimateDSMCustomJoin) + { + node->pscan_len = methods->EstimateDSMCustomJoin(node, pcxt); + shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +void +ExecCustomJoinInitializeDSM(CustomJoinState *node, ParallelContext *pcxt) +{ + const CustomJoinExecMethods *methods = node->methods; + + if (methods->InitializeDSMCustomJoin) + { + int plan_node_id = node->js.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len); + methods->InitializeDSMCustomJoin(node, pcxt, coordinate); + shm_toc_insert(pcxt->toc, plan_node_id, coordinate); + } +} + +void +ExecCustomJoinReInitializeDSM(CustomJoinState *node, ParallelContext *pcxt) +{ + const CustomJoinExecMethods *methods = node->methods; + + if (methods->ReInitializeDSMCustomJoin) + { + int plan_node_id = node->js.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false); + methods->ReInitializeDSMCustomJoin(node, pcxt, coordinate); + } +} + +void +ExecCustomJoinInitializeWorker(CustomJoinState *node, + ParallelWorkerContext *pwcxt) +{ + const CustomJoinExecMethods *methods = node->methods; + + if (methods->InitializeWorkerCustomJoin) + { + int plan_node_id = node->js.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false); + methods->InitializeWorkerCustomJoin(node, pwcxt->toc, coordinate); + } +} + +void +ExecShutdownCustomJoin(CustomJoinState *node) +{ + const CustomJoinExecMethods *methods = node->methods; + + if (methods->ShutdownCustomJoin) + methods->ShutdownCustomJoin(node); +} diff --git a/src/backend/nodes/extensible.c b/src/backend/nodes/extensible.c index 3ede1ee0f5d..04d18dd091b 100644 --- a/src/backend/nodes/extensible.c +++ b/src/backend/nodes/extensible.c @@ -25,6 +25,7 @@ static HTAB *extensible_node_methods = NULL; static HTAB *custom_scan_methods = NULL; +static HTAB *custom_join_methods = NULL; typedef struct { @@ -93,6 +94,18 @@ RegisterCustomScanMethods(const CustomScanMethods *methods) methods); } +/* + * Register a new type of custom join node + */ +void +RegisterCustomJoinMethods(const CustomJoinMethods *methods) +{ + RegisterExtensibleNodeEntry(&custom_join_methods, + "Custom Join Methods", + methods->CustomName, + methods); +} + /* * An internal routine to get an ExtensibleNodeEntry by the given identifier */ @@ -141,3 +154,15 @@ GetCustomScanMethods(const char *CustomName, bool missing_ok) CustomName, missing_ok); } + +/* + * Get the methods for a given name of CustomJoinMethods + */ +const CustomJoinMethods * +GetCustomJoinMethods(const char *CustomName, bool missing_ok) +{ + return (const CustomJoinMethods *) + GetExtensibleNodeEntry(custom_join_methods, + CustomName, + missing_ok); +} diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl index 9ecddb14231..681db989d32 100644 --- a/src/backend/nodes/gen_node_support.pl +++ b/src/backend/nodes/gen_node_support.pl @@ -862,7 +862,8 @@ _equal${n}(const $n *a, const $n *b) print $eff "\tCOMPARE_ARRAY_FIELD($f);\n" unless $equal_ignore; } elsif ($t eq 'struct CustomPathMethods*' - || $t eq 'struct CustomScanMethods*') + || $t eq 'struct CustomScanMethods*' + || $t eq 'struct CustomJoinMethods*') { # Fields of these types are required to be a pointer to a # static table of callback functions. So we don't copy @@ -1222,6 +1223,29 @@ _read${n}(void) methods = GetCustomScanMethods(custom_name, false); local_node->methods = methods; } +! unless $no_read; + } + elsif ($t eq 'struct CustomJoinMethods*') + { + # FIXME it's a bit weird the condition above has both + # CustomPathMethods and CustomScanMethods, and this + # only has CustomJoinMethods, possibly incomplete + print $off q{ + /* CustomName is a key to lookup CustomJoinMethods */ + appendStringInfoString(str, " :methods "); + outToken(str, node->methods->CustomName); +}; + print $rff q! + { + /* Lookup CustomJoinMethods by CustomName */ + char *custom_name; + const CustomJoinMethods *methods; + token = pg_strtok(&length); /* skip methods: */ + token = pg_strtok(&length); /* CustomName */ + custom_name = nullable_string(token, length); + methods = GetCustomJoinMethods(custom_name, false); + local_node->methods = methods; + } ! unless $no_read; } else diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c index 7bc823507f1..abc0933f38f 100644 --- a/src/backend/nodes/nodeFuncs.c +++ b/src/backend/nodes/nodeFuncs.c @@ -4789,6 +4789,13 @@ planstate_tree_walker_impl(PlanState *planstate, return true; } break; + case T_CustomJoin: + foreach(lc, ((CustomJoinState *) planstate)->custom_ps) + { + if (PSWALK(lfirst(lc))) + return true; + } + break; default: break; } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 8a9f1d7a943..49fd9fd5f9d 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -160,6 +160,8 @@ static ForeignScan *create_foreignscan_plan(PlannerInfo *root, ForeignPath *best static CustomScan *create_customscan_plan(PlannerInfo *root, CustomPath *best_path, List *tlist, List *scan_clauses); +static CustomJoin *create_customjoin_plan(PlannerInfo *root, + CustomPath *best_path); static NestLoop *create_nestloop_plan(PlannerInfo *root, NestPath *best_path); static MergeJoin *create_mergejoin_plan(PlannerInfo *root, MergePath *best_path); static HashJoin *create_hashjoin_plan(PlannerInfo *root, HashPath *best_path); @@ -415,6 +417,7 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags) case T_HashJoin: case T_MergeJoin: case T_NestLoop: + case T_CustomJoin: plan = create_join_plan(root, (JoinPath *) best_path); break; @@ -1097,6 +1100,10 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path) plan = (Plan *) create_nestloop_plan(root, (NestPath *) best_path); break; + case T_CustomJoin: + plan = (Plan *) create_customjoin_plan(root, + (CustomPath *) best_path); + break; default: elog(ERROR, "unrecognized node type: %d", (int) best_path->path.pathtype); @@ -4380,6 +4387,72 @@ create_customscan_plan(PlannerInfo *root, CustomPath *best_path, return cplan; } +/* + * create_customjoin_plan + * + * Transform a CustomPath into a Plan. + * + * XXX likely very incomplete, need to look at the other join nodes + */ +static CustomJoin * +create_customjoin_plan(PlannerInfo *root, CustomPath *best_path) +{ + CustomJoin *cplan; + RelOptInfo *rel = best_path->path.parent; + List *custom_plans = NIL; + ListCell *lc; + List *tlist = build_path_tlist(root, &best_path->path); + List *scan_clauses = NIL; + + /* Recursively transform child paths. */ + foreach(lc, best_path->custom_paths) + { + Plan *plan = create_plan_recurse(root, (Path *) lfirst(lc), + CP_EXACT_TLIST); + + custom_plans = lappend(custom_plans, plan); + } + + /* + * Invoke custom plan provider to create the Plan node represented by the + * CustomPath. + */ + cplan = castNode(CustomJoin, + best_path->methods->PlanCustomPath(root, + rel, + best_path, + tlist, + scan_clauses, + custom_plans)); + + /* + * Copy cost data from Path to Plan; no need to make custom-plan providers + * do this + */ + copy_generic_path_info(&cplan->join.plan, &best_path->path); + + /* Likewise, copy the relids that are represented by this custom scan */ + cplan->custom_relids = best_path->path.parent->relids; + + /* + * Replace any outer-relation variables with nestloop params in the qual + * and custom_exprs expressions. We do this last so that the custom-plan + * provider doesn't have to be involved. (Note that parts of custom_exprs + * could have come from join clauses, so doing this beforehand on the + * scan_clauses wouldn't work.) We assume custom_scan_tlist contains no + * such variables. + */ + if (best_path->path.param_info) + { + cplan->join.plan.qual = (List *) + replace_nestloop_params(root, (Node *) cplan->join.plan.qual); + cplan->custom_exprs = (List *) + replace_nestloop_params(root, (Node *) cplan->custom_exprs); + } + + return cplan; +} + /***************************************************************************** * diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 846e44186c3..805cd26107b 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -149,6 +149,9 @@ static void set_foreignscan_references(PlannerInfo *root, static void set_customscan_references(PlannerInfo *root, CustomScan *cscan, int rtoffset); +static void set_customjoin_references(PlannerInfo *root, + CustomJoin *cjoin, + int rtoffset); static Plan *set_append_references(PlannerInfo *root, Append *aplan, int rtoffset); @@ -852,6 +855,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) case T_NestLoop: case T_MergeJoin: case T_HashJoin: + case T_CustomJoin: set_join_references(root, (Join *) plan, rtoffset); break; @@ -1738,6 +1742,41 @@ set_customscan_references(PlannerInfo *root, cscan->custom_relids = offset_relid_set(cscan->custom_relids, rtoffset); } +/* + * set_customscan_references + * Do set_plan_references processing on a CustomJoin + * + * FIXME some of this is probably duplicate with set_join_references + */ +static void +set_customjoin_references(PlannerInfo *root, + CustomJoin *cjoin, + int rtoffset) +{ + ListCell *lc; + + if (cjoin->custom_join_tlist != NIL) + { + /* custom_scan_tlist itself just needs fix_scan_list() adjustments */ + cjoin->custom_join_tlist = + fix_scan_list(root, cjoin->custom_join_tlist, + rtoffset, NUM_EXEC_TLIST((Plan *) cjoin)); + } + + /* Adjust custom_exprs in the standard way */ + cjoin->custom_exprs = + fix_scan_list(root, cjoin->custom_exprs, + rtoffset, NUM_EXEC_QUAL((Plan *) cjoin)); + + /* Adjust child plan-nodes recursively, if needed */ + foreach(lc, cjoin->custom_plans) + { + lfirst(lc) = set_plan_refs(root, (Plan *) lfirst(lc), rtoffset); + } + + cjoin->custom_relids = offset_relid_set(cjoin->custom_relids, rtoffset); +} + /* * register_partpruneinfo * Subroutine for set_append_references and set_mergeappend_references @@ -2423,6 +2462,11 @@ set_join_references(PlannerInfo *root, Join *join, int rtoffset) NRM_EQUAL, NUM_EXEC_QUAL((Plan *) join)); } + else if (IsA(join, CustomJoin)) + { + /* FIXME needs to do something more? */ + set_customjoin_references(root, (CustomJoin *) join, rtoffset); + } /* * Now we need to fix up the targetlist and qpqual, which are logically diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index d71ed958e31..7d5f5961a04 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -2844,6 +2844,32 @@ finalize_plan(PlannerInfo *root, Plan *plan, &context); break; + case T_CustomJoin: + /* FIXME needs to worry about joinqual etc? */ + { + CustomJoin *cjoin = (CustomJoin *) plan; + ListCell *lc; + + finalize_primnode((Node *) cjoin->custom_exprs, + &context); + /* We assume custom_scan_tlist cannot contain Params */ + context.paramids = + bms_add_members(context.paramids, scan_params); + + /* child nodes if any */ + foreach(lc, cjoin->custom_plans) + { + context.paramids = + bms_add_members(context.paramids, + finalize_plan(root, + (Plan *) lfirst(lc), + gather_param, + valid_params, + scan_params)); + } + } + break; + case T_Hash: finalize_primnode((Node *) ((Hash *) plan)->hashkeys, &context); diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 3d6e6bdbfd2..0f449d4f35c 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -5220,6 +5220,8 @@ set_deparse_plan(deparse_namespace *dpns, Plan *plan) dpns->index_tlist = ((ForeignScan *) plan)->fdw_scan_tlist; else if (IsA(plan, CustomScan)) dpns->index_tlist = ((CustomScan *) plan)->custom_scan_tlist; + else if (IsA(plan, CustomJoin)) + dpns->index_tlist = ((CustomJoin *) plan)->custom_join_tlist; else dpns->index_tlist = NIL; } diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h index bc0ca512e09..55e328c6b6f 100644 --- a/src/include/executor/nodeCustom.h +++ b/src/include/executor/nodeCustom.h @@ -26,6 +26,14 @@ extern void ExecReScanCustomScan(CustomScanState *node); extern void ExecCustomMarkPos(CustomScanState *node); extern void ExecCustomRestrPos(CustomScanState *node); + +extern CustomJoinState *ExecInitCustomJoin(CustomJoin *cscan, + EState *estate, int eflags); +extern void ExecEndCustomJoin(CustomJoinState *node); + +extern void ExecReScanCustomJoin(CustomJoinState *node); + + /* * Parallel execution support */ @@ -39,4 +47,16 @@ extern void ExecCustomScanInitializeWorker(CustomScanState *node, ParallelWorkerContext *pwcxt); extern void ExecShutdownCustomScan(CustomScanState *node); + +extern void ExecCustomJoinEstimate(CustomJoinState *node, + ParallelContext *pcxt); +extern void ExecCustomJoinInitializeDSM(CustomJoinState *node, + ParallelContext *pcxt); +extern void ExecCustomJoinReInitializeDSM(CustomJoinState *node, + ParallelContext *pcxt); +extern void ExecCustomJoinInitializeWorker(CustomJoinState *node, + ParallelWorkerContext *pwcxt); +extern void ExecShutdownCustomJoin(CustomJoinState *node); + + #endif /* NODECUSTOM_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index e107d6e5f81..6c5ab8f5229 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2129,7 +2129,7 @@ typedef struct ForeignScanState * the BeginCustomScan method. * ---------------- */ -struct CustomExecMethods; +struct CustomScanExecMethods; typedef struct CustomScanState { @@ -2138,7 +2138,7 @@ typedef struct CustomScanState * nodes/extensible.h */ List *custom_ps; /* list of child PlanState nodes, if any */ Size pscan_len; /* size of parallel coordination information */ - const struct CustomExecMethods *methods; + const struct CustomScanExecMethods *methods; const struct TupleTableSlotOps *slotOps; } CustomScanState; @@ -2275,6 +2275,32 @@ typedef struct HashJoinState bool hj_OuterNotEmpty; } HashJoinState; +/* ---------------- + * CustomJoinState information + * + * CustomJoin nodes are used to execute custom code within executor. + * + * Core code must avoid assuming that the CustomJoinState is only as large as + * the structure declared here; providers are allowed to make it the first + * element in a larger structure, and typically would need to do so. The + * struct is actually allocated by the CreateCustomJoinState method associated + * with the plan node. Any additional fields can be initialized there, or in + * the BeginCustomJoin method. + * ---------------- + */ +struct CustomJoinExecMethods; + +typedef struct CustomJoinState +{ + JoinState js; + uint32 flags; /* mask of CUSTOMPATH_* flags, see + * nodes/extensible.h */ + List *custom_ps; /* list of child PlanState nodes, if any */ + Size pscan_len; /* size of parallel coordination information */ + const struct CustomJoinExecMethods *methods; + const struct TupleTableSlotOps *slotOps; +} CustomJoinState; + /* ---------------------------------------------------------------- * Materialization State Information diff --git a/src/include/nodes/extensible.h b/src/include/nodes/extensible.h index 1129c4ba4b1..a4c0a33a3f3 100644 --- a/src/include/nodes/extensible.h +++ b/src/include/nodes/extensible.h @@ -121,7 +121,7 @@ typedef struct CustomScanMethods * Execution-time methods for a CustomScanState. This is more complex than * what we need for a custom path or scan. */ -typedef struct CustomExecMethods +typedef struct CustomScanExecMethods { const char *CustomName; @@ -155,10 +155,66 @@ typedef struct CustomExecMethods void (*ExplainCustomScan) (CustomScanState *node, List *ancestors, ExplainState *es); -} CustomExecMethods; +} CustomScanExecMethods; extern void RegisterCustomScanMethods(const CustomScanMethods *methods); extern const CustomScanMethods *GetCustomScanMethods(const char *CustomName, bool missing_ok); +/* + * Custom join. Here again, there's not much to do: we need to be able to + * generate a JoinState corresponding to the join. + */ +typedef struct CustomJoinMethods +{ + const char *CustomName; + + /* Create execution state (CustomJoinState) from a CustomJoin plan node */ + Node *(*CreateCustomJoinState) (CustomJoin *cscan); +} CustomJoinMethods; + +/* + * Execution-time methods for a CustomJoinState. This is more complex than + * what we need for a custom path or scan. + */ +typedef struct CustomJoinExecMethods +{ + const char *CustomName; + + /* Required executor methods */ + void (*BeginCustomJoin) (CustomJoinState *node, + EState *estate, + int eflags); + TupleTableSlot *(*ExecCustomJoin) (CustomJoinState *node); + void (*EndCustomJoin) (CustomJoinState *node); + void (*ReScanCustomJoin) (CustomJoinState *node); + + /* Optional methods: needed if mark/restore is supported */ + void (*MarkPosCustomJoin) (CustomJoinState *node); + void (*RestrPosCustomJoin) (CustomJoinState *node); + + /* Optional methods: needed if parallel execution is supported */ + Size (*EstimateDSMCustomJoin) (CustomJoinState *node, + ParallelContext *pcxt); + void (*InitializeDSMCustomJoin) (CustomJoinState *node, + ParallelContext *pcxt, + void *coordinate); + void (*ReInitializeDSMCustomJoin) (CustomJoinState *node, + ParallelContext *pcxt, + void *coordinate); + void (*InitializeWorkerCustomJoin) (CustomJoinState *node, + shm_toc *toc, + void *coordinate); + void (*ShutdownCustomJoin) (CustomJoinState *node); + + /* Optional: print additional information in EXPLAIN */ + void (*ExplainCustomJoin) (CustomJoinState *node, + List *ancestors, + ExplainState *es); +} CustomJoinExecMethods; + +extern void RegisterCustomJoinMethods(const CustomJoinMethods *methods); +extern const CustomJoinMethods *GetCustomJoinMethods(const char *CustomName, + bool missing_ok); + #endif /* EXTENSIBLE_H */ diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 46e2e09ea35..0f10514c835 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -1024,6 +1024,44 @@ typedef struct HashJoin List *hashkeys; } HashJoin; +/* ---------------- + * CustomJoin node + * + * The comments for ForeignScan's fdw_exprs, fdw_private, fdw_scan_tlist, + * and fs_relids fields apply equally to CustomJoin's custom_exprs, + * custom_private, custom_join_tlist, and custom_relids fields. + * + * Note that since Plan trees can be copied, custom join providers *must* + * fit all plan data they need into those fields; embedding CustomJoin in + * a larger struct will not work. + * ---------------- + */ +struct CustomJoinMethods; + +typedef struct CustomJoin +{ + Join join; + /* mask of CUSTOMPATH_* flags, see nodes/extensible.h */ + uint32 flags; + /* list of Plan nodes, if any */ + List *custom_plans; + /* expressions that custom code may evaluate */ + List *custom_exprs; + /* private data for custom code */ + List *custom_private; + /* optional tlist describing scan tuple */ + List *custom_join_tlist; + /* RTIs generated by this scan */ + Bitmapset *custom_relids; + + /* + * NOTE: The method field of CustomJoin is required to be a pointer to a + * static table of callback functions. So we don't copy the table itself, + * just reference the original one. + */ + const struct CustomJoinMethods *methods; +} CustomJoin; + /* ---------------- * materialization node * ---------------- diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 4353befab99..8b8d3a6529a 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -579,7 +579,11 @@ CteState CtlCommand CtxtHandle CurrentOfExpr -CustomExecMethods +CustomJoin +CustomJoinMethods +CustomJoinState +CustomJoinExecMethods +CustomScanExecMethods CustomOutPtrType CustomPath CustomScan -- 2.50.1