On 31/10/2025 18:40, Heikki Linnakangas wrote:
On 31/10/2025 01:27, Joel Jacobson wrote:
On Fri, Oct 31, 2025, at 00:08, Joel Jacobson wrote:
On Thu, Oct 30, 2025, at 14:25, Heikki Linnakangas wrote:
Joel, since you've been working on some optimizations in this area too,
would you happen to have some suitable performance test scripts for this?

Glad you asked. I'm actually working on a benchmark+correctness tester.
It's very much work-in-progress though, don't look too much at the code,
or your eyes will bleed.

It's a combined benchmark + correctness tester, that verifies that only
the expected notifications are received on the expected connections,
while at the same time doing timing measurements.

To run multiple pg_bench_lino processes in parallell to simulate
concurrent workloads, I realized the randomization of the channel names
and payloads were not random enough to avoid collissions. New version
attached that uses real UUIDs for channel names and payloads.

Thanks! Here's a sketch for holding the bank lock across TransactionIdDidCommit() calls. In quick testing with your test program, I can't see any performance difference. However, I'm not quite sure what options I should be using to stress this. My gut feeling is that it's fine, but it'd be nice to do construct a real worst case test case to be sure.

I wrote another little stand-alone performance test program for this, attached. It launches N connections that send NOTIFYs to a single channel as fast as possible, and M threads that listen for the notifications. I ran it with different combinations of N and M, on 'master' and on REL_14_STABLE (which didn't have SLRU banks) and I cannot discern any performance difference from these patches. So it seems that holding the SLRU (bank) lock across the TransactionIdDidCommit() calls is fine.

- Heikki
#include "postgres_fe.h"
/*
 */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include "getopt_long.h"

#include "libpq-fe.h"
#include "portability/instr_time.h"
#include "port/pg_pthread.h"

#define PG_TIME_GET_DOUBLE(t) (0.000001 * (t))

static uint32_t num_notifies_sent;
static uint32_t num_notifies_received;

static void
exit_nicely(PGconn *conn)
{
	PQfinish(conn);
	exit(1);
}

static void *
notify_thread_main(void *arg)
{
	PGconn	   *conn;
	PGresult   *res;
	int			notifies_sent;

	/* Make a connection to the database */
	conn = PQconnectdb("dbname=postgres");

	/* Check to see that the backend connection was successfully made */
	if (PQstatus(conn) != CONNECTION_OK)
	{
		fprintf(stderr, "%s", PQerrorMessage(conn));
		exit_nicely(conn);
	}

	for(;;)
	{
		char buf[100];

		snprintf(buf, sizeof(buf), "NOTIFY testchannel, '%d'", notifies_sent);
		res = PQexec(conn, buf);
		if (PQresultStatus(res) != PGRES_COMMAND_OK)
		{
			fprintf(stderr, "NOTIFY command failed: %s", PQerrorMessage(conn));
			PQclear(res);
			exit_nicely(conn);
		}
		PQclear(res);

		__sync_fetch_and_add(&num_notifies_sent, 1);
	}
}

static void *
listen_thread_main(void *arg)
{
	PGconn	   *conn;
	PGresult   *res;
	PGnotify   *notify;
	int			nnotifies;

	/* Make a connection to the database */
	conn = PQconnectdb("dbname=postgres");

	/* Check to see that the backend connection was successfully made */
	if (PQstatus(conn) != CONNECTION_OK)
	{
		fprintf(stderr, "%s", PQerrorMessage(conn));
		exit_nicely(conn);
	}

	/*
	 * Issue LISTEN command to enable notifications from the rule's NOTIFY.
	 */
	res = PQexec(conn, "LISTEN testchannel");
	if (PQresultStatus(res) != PGRES_COMMAND_OK)
	{
		fprintf(stderr, "LISTEN command failed: %s", PQerrorMessage(conn));
		PQclear(res);
		exit_nicely(conn);
	}
	PQclear(res);

	for (;;)
	{
		/*
		 * Sleep until something happens on the connection.  We use select(2)
		 * to wait for input, but you could also use poll() or similar
		 * facilities.
		 */
		int			sock;
		fd_set		input_mask;

		sock = PQsocket(conn);

		if (sock < 0)
			break;				/* shouldn't happen */

		FD_ZERO(&input_mask);
		FD_SET(sock, &input_mask);

		if (select(sock + 1, &input_mask, NULL, NULL, NULL) < 0)
		{
			fprintf(stderr, "select() failed: %s\n", strerror(errno));
			exit_nicely(conn);
		}

		/* Now check for input */
		PQconsumeInput(conn);
		while ((notify = PQnotifies(conn)) != NULL)
		{
			/*
			fprintf(stderr,
					"ASYNC NOTIFY of '%s' received from backend PID %d\n",
					notify->relname, notify->be_pid);
			*/
			PQfreemem(notify);
			PQconsumeInput(conn);

			__sync_fetch_and_add(&num_notifies_received, 1);
		}
	}
}

typedef int64 pg_time_usec_t;

static inline pg_time_usec_t
pg_time_now(void)
{
	instr_time	now;

	INSTR_TIME_SET_CURRENT(now);

	return (pg_time_usec_t) INSTR_TIME_GET_MICROSEC(now);
}

int
main(int argc, char **argv)
{
	int			num_threads = 0;
	pthread_t  *threads;
	pg_time_usec_t start;
	static struct option long_options[] = {
		/* systematic long/short named options */
		{"listeners", required_argument, NULL, 1},
		{"notifiers", required_argument, NULL, 2},
		{NULL, 0, NULL, 0}
	};
	int			num_listen_threads = 1;
	int			num_notify_threads = 1;
	int			optindex;
	int			c;

	while ((c = getopt_long(argc, argv, "", long_options, &optindex)) != -1)
	{
		switch (c)
		{
			case 1:				/* listeners */
				num_listen_threads = atoi(optarg);
				if (num_listen_threads < 1)
				{
					fprintf(stderr, "invalid --listeners argument\n");
					exit(1);
				}
				break;

			case 2:				/* notifiers */
				num_notify_threads = atoi(optarg);
				if (num_notify_threads < 1)
				{
					fprintf(stderr, "invalid --notifiers argument\n");
					exit(1);
				}
				break;
		}
	}

	threads = malloc((num_notify_threads + num_listen_threads) * sizeof(pthread_t));

	for (int i = 0; i < num_notify_threads; i++)
	{
		int			s;

		s = pthread_create(&threads[num_threads++], NULL,
						   &notify_thread_main, NULL);
		if (s != 0)
		{
			fprintf(stderr, "pthread_create failed\n");
			exit(1);
		}
	}

	for (int i = 0; i < num_listen_threads; i++)
	{
		int			s;

		s = pthread_create(&threads[num_threads++], NULL,
						   &listen_thread_main, NULL);
		if (s != 0)
		{
			fprintf(stderr, "pthread_create failed\n");
			exit(1);
		}
	}

	start = pg_time_now();

	for (;;)
	{
		double		elapsed_sec;

		sleep(1);
		elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);

		fprintf(stderr, "%2.f s: %d sent, %d received\n",
				elapsed_sec, num_notifies_sent, num_notifies_received);
	}

	return 0;
}

Reply via email to