On Wed, Jun 12, 2024 at 3:37 AM Jonathan S. Katz <jk...@postgresql.org> wrote:
> If you're curious, I can fire up some of my more serious benchmarks on
> this to do a before/after to see if there's anything interesting. I have
> a few large datasets (10s of millions) of larger vectors (1536dim => 6KB
> payloads) that could see the net effect here.
>
> > (Make sure you remember to set effective_io_concurrency to an
> > interesting number if you want to generate a lot of overlapping
> > fadvise calls.)
>
> What would you recommend as an "interesting number?" - particularly
> using the data parameters above.

Hi Jonathan,

Sorry for not replying sooner (ETOOMANYPROJECTS).  For HNSW, I think
the maximum useful effective_io_concurrency is bound by the number of
connections per HNSW layer ("m").  Here are some times I measured
using m=16 on two laptops:

              |     linux (xfs)           |  macos (apfs)
 branch | eic |  avg   | speedup | stdev  |  avg   | speedup | stdev
--------+-----+--------+---------+--------+--------+---------+--------
 master |     | 73.959 |     1.0 | 24.168 | 72.290 |     1.0 | 11.851
 stream |   0 | 70.117 |     1.1 | 36.699 | 76.289 |     1.0 | 12.742
 stream |   1 | 57.983 |     1.3 |  5.845 | 79.969 |     1.2 |  8.308
 stream |   2 | 35.629 |     2.1 |  4.088 | 49.198 |     2.0 |  7.686
 stream |   3 | 28.477 |     2.6 |  2.607 | 37.540 |     2.5 |  5.272
 stream |   4 | 26.493 |     2.8 |  3.691 | 33.014 |     2.7 |  4.444
 stream |   5 | 23.711 |     3.1 |  2.435 | 32.622 |     3.0 |  2.270
 stream |   6 | 22.885 |     3.2 |  1.908 | 31.254 |     3.2 |  4.170
 stream |   7 | 21.910 |     3.4 |  2.153 | 33.669 |     3.3 |  4.616
 stream |   8 | 20.741 |     3.6 |  1.594 | 34.182 |     3.5 |  3.819
 stream |   9 | 22.471 |     3.3 |  3.094 | 30.690 |     3.2 |  2.677
 stream |  10 | 19.895 |     3.7 |  1.695 | 32.631 |     3.6 |  4.976
 stream |  11 | 19.447 |     3.8 |  1.647 | 31.163 |     3.7 |  3.351
 stream |  12 | 18.658 |     4.0 |  1.503 | 30.817 |     3.9 |  3.538
 stream |  13 | 18.886 |     3.9 |  0.874 | 29.184 |     3.8 |  4.832
 stream |  14 | 18.667 |     4.0 |  1.692 | 28.783 |     3.9 |  3.459
 stream |  15 | 19.080 |     3.9 |  1.429 | 28.928 |     3.8 |  3.396
 stream |  16 | 18.929 |     3.9 |  3.469 | 29.282 |     3.8 |  2.868

Those are millisecond times to run the test() function shown earlier,
with empty kernel cache and PostgreSQL cache (see below) for maximum
physical I/O.  I ran the master test 30 times, and each
effective_io_concurrency level 10 times, to show that the variance
decreases even at the default effective_io_concurency = 1, so we're
not only talking about the avg speed improving.

The all-cached performance also seems to improve, ~8.9ms -> ~6.9ms on
Linux, but I can't fully explain why that is, maybe just some random
stuff about memory layout run-to-run in my quick and dirty test or
something like that, so I'm not claiming that is significant.  It
certainly didn't get slower, anyway.

I think you would get very different numbers on a high latency storage
system (say, non-local cloud storage) and potentially much more
speedup with your large test indexes.  Also my 6d random number test
may not be very representative and you may be able to come up with
much better tests.

Here's a new version with a TODO tidied up.  I also understood that we
need to tweak the read_stream_reset() function, so that it doesn't
forget its current readhead distance when it hops between HNSW nodes
(which is something that comes up in several other potential uses
cases including another one I am working in in core).  Without this
patch for PostgreSQL, it reads 1, 2, 4, 7 blocks (= 16 in total)
before it has to take a break to hop to a new page, and then it start
again at 1.  Oops.  With this patch, it is less forgetful, and reaches
the full possible I/O concurrency of 16 (or whatever the minimum of
HNSW's m parameter and effective_io_concurrency is for you).

PSA two patches, one for PostgreSQL and one for pgvector.

I am not actively working on this right now.  If someone wants to try
to develop it further, please feel free!  I haven't looked at IVFFlat
at all.

--- function to let you do SELECT uncache('t_embedding_idx'),
--- which is the opposite of SELECT pg_prewarm('t_embedding_idx')
--- see also "echo 1 | sudo tee /proc/sys/vm/drop_caches" (Linux)
--- "sudo purge" (macOS)
create extension pg_buffercache;
create or replace function uncache(name text) returns bool
begin atomic;
  select bool_and(pg_buffercache_evict(bufferid))
    from pg_buffercache where relfilenode = name::regclass;
end;
From 74b45e6a6387f0b7a0f12060a0d8cb401a85552e Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sat, 15 Jun 2024 14:37:26 +1200
Subject: [PATCH] Remember ReadStream look-ahead distance on reset.

Some ReadStream clients need to do more work to make a new stream of
block numbers available.  The callback indicates that block numbers have
run out by returning InvalidBlockNumber, but some time later the client
code resets the ReadStream to tell it that the callback now has more
block number, so that it can begin consuming buffers again.  For
example, pgvector's HNSW index scan reach a new page, and find a new set
of connections to many other pages that will soon be accesed.

When the stream is reset, it would previously reset its look-ahead
distance to 1, and have to build it back up again if I/O is necessary.
With this patch, it will remember what it had before, which seems like a
better bet.  If that's wrong, it will soon ramp down via the usual
algorithm.

Discussion: https://postgr.es/m/CA%2BhUKGJ_7NKd46nx1wbyXWriuZSNzsTfm%2BrhEuvU6nxZi3-KVw%40mail.gmail.com
---
 src/backend/storage/aio/read_stream.c | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 7f0e07d9586..5d3e070afae 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -115,6 +115,7 @@ struct ReadStream
 	int16		max_pinned_buffers;
 	int16		pinned_buffers;
 	int16		distance;
+	int16		reset_distance;
 	bool		advice_enabled;
 
 	/*
@@ -335,6 +336,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 		if (blocknum == InvalidBlockNumber)
 		{
 			/* End of stream. */
+			stream->reset_distance = stream->distance;
 			stream->distance = 0;
 			break;
 		}
@@ -526,6 +528,7 @@ read_stream_begin_impl(int flags,
 		stream->distance = Min(max_pinned_buffers, io_combine_limit);
 	else
 		stream->distance = 1;
+	stream->reset_distance = stream->distance;
 
 	/*
 	 * Since we always access the same relation, we can initialize parts of
@@ -822,8 +825,11 @@ read_stream_reset(ReadStream *stream)
 	Assert(stream->pinned_buffers == 0);
 	Assert(stream->ios_in_progress == 0);
 
-	/* Start off assuming data is cached. */
-	stream->distance = 1;
+	/*
+	 * If the callback ran out of blocks temporarily, restore the distance from
+	 * before.
+	 */
+	stream->distance = Max(stream->reset_distance, 1);
 }
 
 /*
-- 
2.46.0

From 21b404d8a6f400bb6da1bbfca7509a41f2b9f002 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Tue, 11 Jun 2024 14:32:47 +1200
Subject: [PATCH v2] Use streaming I/O for HNSW blocks.

If data is not already in PostgreSQL's cache, it will be accessed using
the new ReadStream API in PostgreSQL 17.  We know the next 'm' HNSW
blocks we will access, so the ReadStream can read them into the
kernel's page cache asynchronously, up to the limit of the
effective_io_concurrency setting.   While that currently defaults to
only 1, even 1 provides some speedup for cold caches, and higher number
help more.

XXX This is a proof-of-concept

Author: Thomas Munro <thomas.mu...@gmail.com>
Discussion: https://www.postgresql.org/message-id/flat/CA%2BhUKGJ_7NKd46nx1wbyXWriuZSNzsTfm%2BrhEuvU6nxZi3-KVw%40mail.gmail.com
---
 src/hnsw.h      |   1 +
 src/hnswutils.c | 113 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 114 insertions(+)

diff --git a/src/hnsw.h b/src/hnsw.h
index 480ad9f..e738241 100644
--- a/src/hnsw.h
+++ b/src/hnsw.h
@@ -136,6 +136,7 @@ struct HnswElementData
 	uint8		deleted;
 	uint32		hash;
 	HnswNeighborsPtr neighbors;
+	Buffer		buffer;
 	BlockNumber blkno;
 	OffsetNumber offno;
 	OffsetNumber neighborOffno;
diff --git a/src/hnswutils.c b/src/hnswutils.c
index 96c5026..46e76be 100644
--- a/src/hnswutils.c
+++ b/src/hnswutils.c
@@ -14,6 +14,10 @@
 #include "utils/memdebug.h"
 #include "utils/rel.h"
 
+#if PG_VERSION_NUM >= 170000
+#include "storage/read_stream.h"
+#endif
+
 #if PG_VERSION_NUM >= 130000
 #include "common/hashfn.h"
 #else
@@ -278,6 +282,9 @@ HnswInitElementFromBlock(BlockNumber blkno, OffsetNumber offno)
 	HnswElement element = palloc(sizeof(HnswElementData));
 	char	   *base = NULL;
 
+#if PG_VERSION_NUM >= 170000
+	element->buffer = InvalidBuffer;
+#endif
 	element->blkno = blkno;
 	element->offno = offno;
 	HnswPtrStore(base, element->neighbors, (HnswNeighborArrayPtr *) NULL);
@@ -555,7 +562,20 @@ HnswLoadElement(HnswElement element, float *distance, Datum *q, Relation index,
 	HnswElementTuple etup;
 
 	/* Read vector */
+#if PG_VERSION_NUM >= 170000
+	if (element->buffer != InvalidBuffer)
+	{
+		/* Buffer pinned already. */
+		buf = element->buffer;
+		Assert(BufferGetBlockNumber(buf) == element->blkno);
+	}
+	else
+	{
+		buf = ReadBuffer(index, element->blkno);
+	}
+#else
 	buf = ReadBuffer(index, element->blkno);
+#endif
 	LockBuffer(buf, BUFFER_LOCK_SHARE);
 	page = BufferGetPage(buf);
 
@@ -717,6 +737,34 @@ CountElement(char *base, HnswElement skipElement, HnswCandidate * hc)
 	return e->heaptidsLength != 0;
 }
 
+#if PG_VERSION_NUM >= 170000
+typedef struct HnswSearchLayerNextBlockData {
+	char	   *base;
+	HnswCandidate **items;
+	int			nitems;
+	int			i;
+} HnswSearchLayerNextBlockData;
+
+/*
+ * Callback used to feed block numbers to the ReadStream.
+ */
+static BlockNumber
+HnswSearchLayerNextBlock(ReadStream *stream,
+						 void *callback_data,
+						 void *per_buffer_data)
+{
+	HnswSearchLayerNextBlockData *data = callback_data;
+	HnswElement hce;
+
+	if (data->i == data->nitems)
+		return InvalidBlockNumber;
+
+	hce = HnswPtrAccess(data->base, data->items[data->i++]->element);
+
+	return hce->blkno;
+}
+#endif
+
 /*
  * Algorithm 2 from paper
  */
@@ -732,6 +780,27 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 	HnswNeighborArray *neighborhoodData = NULL;
 	Size		neighborhoodSize = 0;
 
+#if PG_VERSION_NUM >= 170000
+	HnswSearchLayerNextBlockData stream_callback_data = {0};
+	ReadStream *stream;
+
+	/*
+	 * If we're searching an index, create a stream so that we can generate
+	 * some I/O asynchronicity when the index is cold, if
+	 * effective_io_concurrency is configured.
+	 */
+	if (index)
+		stream = read_stream_begin_relation(READ_STREAM_FULL,
+											NULL,
+											index,
+											MAIN_FORKNUM,
+											HnswSearchLayerNextBlock,
+											&stream_callback_data,
+											0);
+	else
+		stream = NULL;
+#endif
+
 	InitVisited(base, &v, index, ef, m);
 
 	/* Create local memory for neighborhood if needed */
@@ -767,6 +836,8 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 		HnswCandidate *c = ((HnswPairingHeapNode *) pairingheap_remove_first(C))->inner;
 		HnswCandidate *f = ((HnswPairingHeapNode *) pairingheap_first(W))->inner;
 		HnswElement cElement;
+		HnswCandidate *items[HNSW_MAX_SIZE];
+		int nitems;
 
 		if (c->distance > f->distance)
 			break;
@@ -788,6 +859,8 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 			neighborhood = neighborhoodData;
 		}
 
+		/* Build a list of indexes of neighbors to visit. */
+		nitems = 0;
 		for (int i = 0; i < neighborhood->length; i++)
 		{
 			HnswCandidate *e = &neighborhood->items[i];
@@ -796,6 +869,35 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 			AddToVisited(base, &v, e, index, &visited);
 
 			if (!visited)
+				items[nitems++] = e;
+		}
+
+#if PG_VERSION_NUM >= 170000
+		if (stream)
+		{
+			/*
+			 * Give the callback the information it needs to find future block
+			 * numbers.
+			 */
+			stream_callback_data.base = base;
+			stream_callback_data.items = items;
+			stream_callback_data.nitems = nitems;
+			stream_callback_data.i = 0;
+
+			/*
+			 * Reset the stream.  This is necessary because each time the
+			 * callback runs out of data, the stream needs to be resetarted
+			 * before it tries to look ahead again.
+			 */
+			read_stream_reset(stream);
+		}
+#endif
+
+		/* Visit them. */
+		for (int i = 0; i < nitems; i++)
+		{
+			HnswCandidate *e = items[i];
+
 			{
 				float		eDistance;
 				HnswElement eElement = HnswPtrAccess(base, e->element);
@@ -806,7 +908,13 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 				if (index == NULL)
 					eDistance = GetCandidateDistance(base, e, q, procinfo, collation);
 				else
+				{
+#if PG_VERSION_NUM >= 170000
+					if (stream)
+						eElement->buffer = read_stream_next_buffer(stream, NULL);
+#endif
 					HnswLoadElement(eElement, &eDistance, &q, index, procinfo, collation, inserting, alwaysAdd ? NULL : &f->distance);
+				}
 
 				if (eDistance < f->distance || alwaysAdd)
 				{
@@ -844,6 +952,11 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 		}
 	}
 
+#if PG_VERSION_NUM >= 170000
+	if (stream)
+		read_stream_end(stream);
+#endif
+
 	/* Add each element of W to w */
 	while (!pairingheap_is_empty(W))
 	{
-- 
2.46.0

Reply via email to