On 8/26/24 18:06, Tomas Vondra wrote:
> 
> I just noticed there's a couple failures in the regression tests, if I
> change the GUC to "true" by default. I haven't looked into that yet, but
> I guess there's some mistake in resetting the child node, or something
> like that. Will investigate.
> 

Turned out to be a silly bug - not resetting the queue on rescan. Fixed,
and also removed two not-quite-correct asserts. No other changes.


regards

-- 
Tomas Vondra
From b8ac5834802e4eb621ed920e8e7dd306ee843995 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Mon, 26 Aug 2024 01:13:56 +0200
Subject: [PATCH v20240827] nestloop prefetch - initial PoC

---
 src/backend/executor/execAmi.c         |  41 ++++++++
 src/backend/executor/execTuples.c      |   3 +-
 src/backend/executor/nodeIndexscan.c   |  67 +++++++++++++
 src/backend/executor/nodeNestloop.c    | 133 ++++++++++++++++++++++++-
 src/backend/utils/misc/guc_tables.c    |  10 ++
 src/include/executor/executor.h        |   2 +
 src/include/executor/nodeIndexscan.h   |   1 +
 src/include/nodes/execnodes.h          |   7 ++
 src/include/optimizer/cost.h           |   1 +
 src/test/regress/expected/sysviews.out |   3 +-
 10 files changed, 261 insertions(+), 7 deletions(-)

diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 3289e3e0219..8d336c713a2 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -594,6 +594,28 @@ ExecSupportsBackwardScan(Plan *node)
 	}
 }
 
+/*
+ * ExecSupportsPrefetch - does a plan type support prefetching?
+ *
+ * For now only plain index scans do, we could extend that to IOS. Not sure
+ * about other plans.
+ */
+bool
+ExecSupportsPrefetch(Plan *node)
+{
+	if (node == NULL)
+		return false;
+
+	switch (nodeTag(node))
+	{
+		case T_IndexScan:
+			return true;
+
+		default:
+			return false;
+	}
+}
+
 /*
  * An IndexScan or IndexOnlyScan node supports backward scan only if the
  * index's AM does.
@@ -651,3 +673,22 @@ ExecMaterializesOutput(NodeTag plantype)
 
 	return false;
 }
+
+
+/*
+ * ExecPrefetch
+ */
+void
+ExecPrefetch(PlanState *node)
+{
+	switch (nodeTag(node))
+	{
+		case T_IndexScanState:
+			ExecPrefetchIndexScan((IndexScanState *) node);
+			break;
+
+		default:
+			elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
+			break;
+	}
+}
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 00dc3396156..9337913d899 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -129,7 +129,8 @@ tts_virtual_clear(TupleTableSlot *slot)
 static void
 tts_virtual_getsomeattrs(TupleTableSlot *slot, int natts)
 {
-	elog(ERROR, "getsomeattrs is not required to be called on a virtual tuple table slot");
+	// we don't know if the slot is virtual or not
+	// elog(ERROR, "getsomeattrs is not required to be called on a virtual tuple table slot");
 }
 
 /*
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index 8000feff4c9..325230ae1d8 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -1729,3 +1729,70 @@ ExecIndexScanInitializeWorker(IndexScanState *node,
 					 node->iss_ScanKeys, node->iss_NumScanKeys,
 					 node->iss_OrderByKeys, node->iss_NumOrderByKeys);
 }
+
+void
+ExecPrefetchIndexScan(IndexScanState *node)
+{
+	EState	   *estate;
+	ScanDirection direction;
+	IndexScanDesc scandesc;
+	ItemPointer tid;
+
+	/*
+	 * extract necessary information from index scan node
+	 */
+	estate = node->ss.ps.state;
+
+	/*
+	 * Determine which direction to scan the index in based on the plan's scan
+	 * direction and the current direction of execution.
+	 */
+	direction = ScanDirectionCombine(estate->es_direction,
+									 ((IndexScan *) node->ss.ps.plan)->indexorderdir);
+	scandesc = node->iss_ScanDesc;
+
+	if (scandesc == NULL)
+	{
+		/*
+		 * We reach here if the index scan is not parallel, or if we're
+		 * serially executing an index scan that was planned to be parallel.
+		 */
+		scandesc = index_beginscan(node->ss.ss_currentRelation,
+								   node->iss_RelationDesc,
+								   estate->es_snapshot,
+								   node->iss_NumScanKeys,
+								   node->iss_NumOrderByKeys);
+
+		node->iss_ScanDesc = scandesc;
+
+		/*
+		 * If no run-time keys to calculate or they are ready, go ahead and
+		 * pass the scankeys to the index AM.
+		 */
+		if (node->iss_NumRuntimeKeys == 0 || node->iss_RuntimeKeysReady)
+			index_rescan(scandesc,
+						 node->iss_ScanKeys, node->iss_NumScanKeys,
+						 node->iss_OrderByKeys, node->iss_NumOrderByKeys);
+	}
+
+	/*
+	 * XXX This should probably prefetch only a limited number of tuples for
+	 * each key, not all of them - the index can easily have millions of them
+	 * for some keys. And prefetching more of those items would be subject
+	 * to the "regular" index prefetching, if ever implemented.
+	 *
+	 * XXX The one question is how to communicate back how many items would
+	 * be prefetched. If we find a key with 1M TIDs, it probably dones not
+	 * make much sense to prefetch further in nestloop, because the 1M will
+	 * likely trash the cache anyway.
+	 *
+	 * XXX This should consider how many items we actually need. For semi
+	 * join we only need the first one, for example.
+	 */
+	while ((tid = index_getnext_tid(scandesc, direction)) != NULL)
+	{
+		CHECK_FOR_INTERRUPTS();
+
+		PrefetchBuffer(scandesc->heapRelation, MAIN_FORKNUM, ItemPointerGetBlockNumber(tid));
+	}
+}
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index 7f4bf6c4dbb..2349a6de085 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -24,7 +24,9 @@
 #include "executor/execdebug.h"
 #include "executor/nodeNestloop.h"
 #include "miscadmin.h"
+#include "optimizer/cost.h"
 
+bool              enable_nestloop_prefetch = false;
 
 /* ----------------------------------------------------------------
  *		ExecNestLoop(node)
@@ -105,15 +107,107 @@ ExecNestLoop(PlanState *pstate)
 		if (node->nl_NeedNewOuter)
 		{
 			ENL1_printf("getting new outer tuple");
-			outerTupleSlot = ExecProcNode(outerPlan);
 
 			/*
-			 * if there are no more outer tuples, then the join is complete..
+			 * Without prefetching, get the next slot from the outer plan
+			 * directly. With prefetching get the next slot from the batch,
+			 * or fill the batch if needed.
 			 */
-			if (TupIsNull(outerTupleSlot))
+			if (node->nl_PrefetchCount == 0)	/* no prefetching */
 			{
-				ENL1_printf("no outer tuple, ending join");
-				return NULL;
+				outerTupleSlot = ExecProcNode(outerPlan);
+
+				/*
+				 * if there are no more outer tuples, then the join is complete.
+				 */
+				if (TupIsNull(outerTupleSlot))
+				{
+					ENL1_printf("no outer tuple, ending join");
+					return NULL;
+				}
+			}
+			else								/* prefetching */
+			{
+
+				/*
+				 * No more slots availabe in the queue - try to load from
+				 * the outer plan, unless we've already reached the end.
+				 */
+				if ((node->nl_PrefetchNext == node->nl_PrefetchUsed) &&
+					(!node->nl_PrefetchDone))
+				{
+					/* reset */
+					node->nl_PrefetchNext = 0;
+					node->nl_PrefetchUsed = 0;
+
+					while (node->nl_PrefetchUsed < node->nl_PrefetchCount)
+					{
+						outerTupleSlot = ExecProcNode(outerPlan);
+
+						/*
+						 * if there are no more outer tuples, then the join is complete.
+						 */
+						if (TupIsNull(outerTupleSlot))
+						{
+							node->nl_PrefetchDone = true;
+							break;
+						}
+
+						ExecClearTuple(node->nl_PrefetchSlots[node->nl_PrefetchUsed]);
+
+						ExecCopySlot(node->nl_PrefetchSlots[node->nl_PrefetchUsed],
+									 outerTupleSlot);
+
+						ENL1_printf("prefetching inner node");
+						econtext->ecxt_outertuple = node->nl_PrefetchSlots[node->nl_PrefetchUsed];
+
+						/*
+						 * fetch the values of any outer Vars that must be passed to the
+						 * inner scan, and store them in the appropriate PARAM_EXEC slots.
+						 */
+						foreach(lc, nl->nestParams)
+						{
+							NestLoopParam *nlp = (NestLoopParam *) lfirst(lc);
+							int			paramno = nlp->paramno;
+							ParamExecData *prm;
+
+							prm = &(econtext->ecxt_param_exec_vals[paramno]);
+							/* Param value should be an OUTER_VAR var */
+							Assert(IsA(nlp->paramval, Var));
+							Assert(nlp->paramval->varno == OUTER_VAR);
+							Assert(nlp->paramval->varattno > 0);
+							prm->value = slot_getattr(node->nl_PrefetchSlots[node->nl_PrefetchUsed],
+													  nlp->paramval->varattno,
+													  &(prm->isnull));
+							/* Flag parameter value as changed */
+							innerPlan->chgParam = bms_add_member(innerPlan->chgParam,
+																 paramno);
+						}
+
+						/*
+						 * now prefetch the inner plan
+						 */
+						ENL1_printf("prefetching inner plan");
+						ExecReScan(innerPlan);
+						ExecPrefetch(innerPlan);
+
+						node->nl_PrefetchUsed++;
+					}
+				}
+
+				/*
+				 * Now we should either have a slot in the queue, or know
+				 * that we've exhausted the outer side.
+				 */
+				if (node->nl_PrefetchNext == node->nl_PrefetchUsed)
+				{
+					Assert(node->nl_PrefetchDone);
+					ENL1_printf("no outer tuple, ending join");
+					return NULL;
+				}
+
+				/* get the next slot from the queue */
+				outerTupleSlot = node->nl_PrefetchSlots[node->nl_PrefetchNext++];
 			}
 
 			ENL1_printf("saving new outer tuple information");
@@ -345,6 +439,24 @@ ExecInitNestLoop(NestLoop *node, EState *estate, int eflags)
 	nlstate->nl_NeedNewOuter = true;
 	nlstate->nl_MatchedOuter = false;
 
+	/* with inner plan supporting prefetching, initialize the batch */
+	nlstate->nl_PrefetchCount = 0;
+	if (enable_nestloop_prefetch &&
+		ExecSupportsPrefetch(innerPlan(node)))
+	{
+		/* batch of 32 slots seem about right for now */
+#define NL_PREFETCH_BATCH_SIZE 32
+		nlstate->nl_PrefetchCount = NL_PREFETCH_BATCH_SIZE;
+		nlstate->nl_PrefetchSlots = palloc0(sizeof(TupleTableSlot *) * nlstate->nl_PrefetchCount);
+
+		for (int i = 0; i < nlstate->nl_PrefetchCount; i++)
+		{
+			nlstate->nl_PrefetchSlots[i]
+				= MakeSingleTupleTableSlot(ExecGetResultType(outerPlanState(nlstate)),
+										   &TTSOpsVirtual);
+		}
+	}
+
 	NL1_printf("ExecInitNestLoop: %s\n",
 			   "node initialized");
 
@@ -369,6 +481,12 @@ ExecEndNestLoop(NestLoopState *node)
 	ExecEndNode(outerPlanState(node));
 	ExecEndNode(innerPlanState(node));
 
+	/* cleanup batch of prefetch slots */
+	for (int i = 0; i < node->nl_PrefetchCount; i++)
+	{
+		ExecDropSingleTupleTableSlot(node->nl_PrefetchSlots[i]);
+	}
+
 	NL1_printf("ExecEndNestLoop: %s\n",
 			   "node processing ended");
 }
@@ -397,4 +515,9 @@ ExecReScanNestLoop(NestLoopState *node)
 
 	node->nl_NeedNewOuter = true;
 	node->nl_MatchedOuter = false;
+
+	/* reset the prefetch batch too */
+	node->nl_PrefetchNext = 0;
+	node->nl_PrefetchUsed = 0;
+	node->nl_PrefetchDone = false;
 }
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index af227b1f248..781c3d29b99 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -879,6 +879,16 @@ struct config_bool ConfigureNamesBool[] =
 		true,
 		NULL, NULL, NULL
 	},
+	{
+		{"enable_nestloop_prefetch", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables prefetching in nested-loop join plans."),
+			NULL,
+			GUC_EXPLAIN
+		},
+		&enable_nestloop_prefetch,
+		true,
+		NULL, NULL, NULL
+	},
 	{
 		{"enable_mergejoin", PGC_USERSET, QUERY_TUNING_METHOD,
 			gettext_noop("Enables the planner's use of merge join plans."),
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 69c3ebff00a..05ab4cae4a0 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -107,7 +107,9 @@ extern void ExecMarkPos(PlanState *node);
 extern void ExecRestrPos(PlanState *node);
 extern bool ExecSupportsMarkRestore(struct Path *pathnode);
 extern bool ExecSupportsBackwardScan(Plan *node);
+extern bool ExecSupportsPrefetch(Plan *node);
 extern bool ExecMaterializesOutput(NodeTag plantype);
+extern void ExecPrefetch(PlanState *node);
 
 /*
  * prototypes from functions in execCurrent.c
diff --git a/src/include/executor/nodeIndexscan.h b/src/include/executor/nodeIndexscan.h
index 3cddece67c8..114ca94352e 100644
--- a/src/include/executor/nodeIndexscan.h
+++ b/src/include/executor/nodeIndexscan.h
@@ -28,6 +28,7 @@ extern void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pc
 extern void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
 extern void ExecIndexScanInitializeWorker(IndexScanState *node,
 										  ParallelWorkerContext *pwcxt);
+extern void ExecPrefetchIndexScan(IndexScanState *node);
 
 /*
  * These routines are exported to share code with nodeIndexonlyscan.c and
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index af7d8fd1e72..ddd9f1f3332 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2131,6 +2131,13 @@ typedef struct NestLoopState
 	bool		nl_NeedNewOuter;
 	bool		nl_MatchedOuter;
 	TupleTableSlot *nl_NullInnerTupleSlot;
+
+	/* for prefetch of batch items */
+	int			nl_PrefetchCount;		/* maximum number of queued slots */
+	int			nl_PrefetchUsed;		/* current number of queued slots */
+	int			nl_PrefetchNext;		/* next slot to return from queue */
+	bool		nl_PrefetchDone;		/* no more outer slots */
+	TupleTableSlot **nl_PrefetchSlots;	/* array of virtual slots */
 } NestLoopState;
 
 /* ----------------
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 854a782944a..3303677c05b 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -58,6 +58,7 @@ extern PGDLLIMPORT bool enable_sort;
 extern PGDLLIMPORT bool enable_incremental_sort;
 extern PGDLLIMPORT bool enable_hashagg;
 extern PGDLLIMPORT bool enable_nestloop;
+extern PGDLLIMPORT bool enable_nestloop_prefetch;
 extern PGDLLIMPORT bool enable_material;
 extern PGDLLIMPORT bool enable_memoize;
 extern PGDLLIMPORT bool enable_mergejoin;
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index fad7fc3a7e0..5ca6c16f3a0 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -161,6 +161,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_memoize                 | on
  enable_mergejoin               | on
  enable_nestloop                | on
+ enable_nestloop_prefetch       | on
  enable_parallel_append         | on
  enable_parallel_hash           | on
  enable_partition_pruning       | on
@@ -170,7 +171,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_seqscan                 | on
  enable_sort                    | on
  enable_tidscan                 | on
-(22 rows)
+(23 rows)
 
 -- There are always wait event descriptions for various types.  InjectionPoint
 -- may be present or absent, depending on history since last postmaster start.
-- 
2.46.0

Reply via email to