Hello hackers,

Parallel sequential scan relies on the kernel detecting sequential
access, but we don't make the job easy.  The resulting striding
pattern works terribly on strict next-block systems like FreeBSD UFS,
and degrades rapidly when you add too many workers on sliding window
systems like Linux.

Demonstration using FreeBSD on UFS on a virtual machine, taking ball
park figures from iostat:

  create table t as select generate_series(1, 200000000)::int i;

  set max_parallel_workers_per_gather = 0;
  select count(*) from t;
  -> execution time 13.3s, average read size = ~128kB, ~500MB/s

  set max_parallel_workers_per_gather = 1;
  select count(*) from t;
  -> execution time 24.9s, average read size = ~32kB, ~250MB/s

Note the small read size, which means that there was no read
clustering happening at all: that's the logical block size of this
filesystem.

That explains some complaints I've heard about PostgreSQL performance
on that filesystem: parallel query destroys I/O performance.

As a quick experiment, I tried teaching the block allocated to
allocate ranges of up 64 blocks at a time, ramping up incrementally,
and ramping down at the end, and I got:

  set max_parallel_workers_per_gather = 1;
  select count(*) from t;
  -> execution time 7.5s, average read size = ~128kB, ~920MB/s

  set max_parallel_workers_per_gather = 3;
  select count(*) from t;
  -> execution time 5.2s, average read size = ~128kB, ~1.2GB/s

I've attached the quick and dirty patch I used for that.
From 65726902f42a19c319623eb7194b0040517008a6 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Tue, 19 May 2020 09:16:07 +1200
Subject: [PATCH] Use larger step sizes for Parallel Seq Scan.

Instead of handing out a single block at a time, which
confuses the read ahead heuristics of many systems, use
an increasing step size.

XXX Proof of concept grade code only, needs better ramp
down algorithm and contains a magic number 16
---
 src/backend/access/heap/heapam.c   | 22 +++++++++++++------
 src/backend/access/table/tableam.c | 34 +++++++++++++++++++++++++++---
 src/include/access/relscan.h       | 13 +++++++++++-
 src/include/access/tableam.h       |  2 ++
 4 files changed, 61 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 0d4ed602d7..8982d59ff0 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -520,12 +520,14 @@ heapgettup(HeapScanDesc scan,
 			{
 				ParallelBlockTableScanDesc pbscan =
 				(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+				ParallelBlockTableScanWork pbscanwork =
+				(ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work;
 
 				table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
-														 pbscan);
+														 pbscanwork, pbscan);
 
 				page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
-														 pbscan);
+														 pbscanwork, pbscan);
 
 				/* Other processes might have already finished the scan. */
 				if (page == InvalidBlockNumber)
@@ -720,9 +722,11 @@ heapgettup(HeapScanDesc scan,
 		{
 			ParallelBlockTableScanDesc pbscan =
 			(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+			ParallelBlockTableScanWork pbscanwork =
+			(ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work;
 
 			page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
-													 pbscan);
+													 pbscanwork, pbscan);
 			finished = (page == InvalidBlockNumber);
 		}
 		else
@@ -834,12 +838,14 @@ heapgettup_pagemode(HeapScanDesc scan,
 			{
 				ParallelBlockTableScanDesc pbscan =
 				(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+				ParallelBlockTableScanWork pbscanwork =
+				(ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work;
 
 				table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
-														 pbscan);
+														 pbscanwork, pbscan);
 
 				page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
-														 pbscan);
+														 pbscanwork, pbscan);
 
 				/* Other processes might have already finished the scan. */
 				if (page == InvalidBlockNumber)
@@ -1019,9 +1025,11 @@ heapgettup_pagemode(HeapScanDesc scan,
 		{
 			ParallelBlockTableScanDesc pbscan =
 			(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+			ParallelBlockTableScanWork pbscanwork =
+			(ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work;
 
 			page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
-													 pbscan);
+													 pbscanwork, pbscan);
 			finished = (page == InvalidBlockNumber);
 		}
 		else
@@ -1155,6 +1163,8 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 	scan->rs_base.rs_nkeys = nkeys;
 	scan->rs_base.rs_flags = flags;
 	scan->rs_base.rs_parallel = parallel_scan;
+	scan->rs_base.rs_parallel_work =
+		palloc0(sizeof(ParallelBlockTableScanWorkData));
 	scan->rs_strategy = NULL;	/* set in initscan */
 
 	/*
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index c814733b22..6d72589673 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -404,10 +404,15 @@ table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
  * to set the startblock once.
  */
 void
-table_block_parallelscan_startblock_init(Relation rel, ParallelBlockTableScanDesc pbscan)
+table_block_parallelscan_startblock_init(Relation rel,
+										 ParallelBlockTableScanWork workspace,
+										 ParallelBlockTableScanDesc pbscan)
 {
 	BlockNumber sync_startpage = InvalidBlockNumber;
 
+	/* Reset the state we use for controlling allocation size. */
+	memset(workspace, 0, sizeof(*workspace));
+
 retry:
 	/* Grab the spinlock. */
 	SpinLockAcquire(&pbscan->phs_mutex);
@@ -447,7 +452,9 @@ retry:
  * backend gets an InvalidBlockNumber return.
  */
 BlockNumber
-table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbscan)
+table_block_parallelscan_nextpage(Relation rel,
+								  ParallelBlockTableScanWork workspace,
+								  ParallelBlockTableScanDesc pbscan)
 {
 	BlockNumber page;
 	uint64		nallocated;
@@ -467,7 +474,28 @@ table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbsca
 	 * The actual page to return is calculated by adding the counter to the
 	 * starting block number, modulo nblocks.
 	 */
-	nallocated = pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, 1);
+	if (workspace->phsw_nallocated_range > 0)
+	{
+		nallocated = ++workspace->phsw_nallocated;
+		workspace->phsw_nallocated_range--;
+	}
+	else
+	{
+		/*
+		 * Ramp up the step size as we go, until we approach the end of the
+		 * scan, and then ramp it back down again to avoid unfair allocation.
+		 * XXX Come up with a better algorithm!
+		 */
+		if (workspace->phsw_nallocated > pbscan->phs_nblocks - 64)
+			workspace->phsw_step_size = 1;
+		else if (workspace->phsw_step_size < 64)
+			workspace->phsw_step_size++;
+
+		nallocated = workspace->phsw_nallocated =
+			pg_atomic_fetch_add_u64(&pbscan->phs_nallocated,
+									workspace->phsw_step_size);
+		workspace->phsw_nallocated_range = workspace->phsw_step_size - 1;
+	}
 	if (nallocated >= pbscan->phs_nblocks)
 		page = InvalidBlockNumber;	/* all blocks have been allocated */
 	else
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 6f0258831f..0d440c9de3 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -42,9 +42,9 @@ typedef struct TableScanDescData
 	 */
 	uint32		rs_flags;
 
+	void	   *rs_parallel_work;
 	struct ParallelTableScanDescData *rs_parallel;	/* parallel scan
 													 * information */
-
 } TableScanDescData;
 typedef struct TableScanDescData *TableScanDesc;
 
@@ -81,6 +81,17 @@ typedef struct ParallelBlockTableScanDescData
 }			ParallelBlockTableScanDescData;
 typedef struct ParallelBlockTableScanDescData *ParallelBlockTableScanDesc;
 
+/*
+ * Per backend state for parallel table sacan, for block oriented storage.
+ */
+typedef struct ParallelBlockTableScanWorkData
+{
+	uint64		phsw_nallocated;
+	int			phsw_nallocated_range;
+	int			phsw_step_size;
+}			ParallelBlockTableScanWorkData;
+typedef struct ParallelBlockTableScanWorkData *ParallelBlockTableScanWork;
+
 /*
  * Base class for fetches from a table via an index. This is the base-class
  * for such scans, which needs to be embedded in the respective struct for
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 94903dd8de..d0a854a692 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -1790,8 +1790,10 @@ extern Size table_block_parallelscan_initialize(Relation rel,
 extern void table_block_parallelscan_reinitialize(Relation rel,
 												  ParallelTableScanDesc pscan);
 extern BlockNumber table_block_parallelscan_nextpage(Relation rel,
+													 ParallelBlockTableScanWork pbscanwork,
 													 ParallelBlockTableScanDesc pbscan);
 extern void table_block_parallelscan_startblock_init(Relation rel,
+													 ParallelBlockTableScanWork pbscanwork,
 													 ParallelBlockTableScanDesc pbscan);
 
 
-- 
2.20.1

Reply via email to