On 31/10/2025 18:40, Heikki Linnakangas wrote:
On 31/10/2025 01:27, Joel Jacobson wrote:
On Fri, Oct 31, 2025, at 00:08, Joel Jacobson wrote:
On Thu, Oct 30, 2025, at 14:25, Heikki Linnakangas wrote:
Joel, since you've been working on some optimizations in this area too,
would you happen to have some suitable performance test scripts for
this?
Glad you asked. I'm actually working on a benchmark+correctness tester.
It's very much work-in-progress though, don't look too much at the code,
or your eyes will bleed.
It's a combined benchmark + correctness tester, that verifies that only
the expected notifications are received on the expected connections,
while at the same time doing timing measurements.
To run multiple pg_bench_lino processes in parallell to simulate
concurrent workloads, I realized the randomization of the channel names
and payloads were not random enough to avoid collissions. New version
attached that uses real UUIDs for channel names and payloads.
Thanks! Here's a sketch for holding the bank lock across
TransactionIdDidCommit() calls. In quick testing with your test program,
I can't see any performance difference. However, I'm not quite sure what
options I should be using to stress this. My gut feeling is that it's
fine, but it'd be nice to do construct a real worst case test case to be
sure.
I wrote another little stand-alone performance test program for this,
attached. It launches N connections that send NOTIFYs to a single
channel as fast as possible, and M threads that listen for the
notifications. I ran it with different combinations of N and M, on
'master' and on REL_14_STABLE (which didn't have SLRU banks) and I
cannot discern any performance difference from these patches. So it
seems that holding the SLRU (bank) lock across the
TransactionIdDidCommit() calls is fine.
- Heikki
#include "postgres_fe.h"
/*
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include "getopt_long.h"
#include "libpq-fe.h"
#include "portability/instr_time.h"
#include "port/pg_pthread.h"
#define PG_TIME_GET_DOUBLE(t) (0.000001 * (t))
static uint32_t num_notifies_sent;
static uint32_t num_notifies_received;
static void
exit_nicely(PGconn *conn)
{
PQfinish(conn);
exit(1);
}
static void *
notify_thread_main(void *arg)
{
PGconn *conn;
PGresult *res;
int notifies_sent;
/* Make a connection to the database */
conn = PQconnectdb("dbname=postgres");
/* Check to see that the backend connection was successfully made */
if (PQstatus(conn) != CONNECTION_OK)
{
fprintf(stderr, "%s", PQerrorMessage(conn));
exit_nicely(conn);
}
for(;;)
{
char buf[100];
snprintf(buf, sizeof(buf), "NOTIFY testchannel, '%d'", notifies_sent);
res = PQexec(conn, buf);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "NOTIFY command failed: %s", PQerrorMessage(conn));
PQclear(res);
exit_nicely(conn);
}
PQclear(res);
__sync_fetch_and_add(&num_notifies_sent, 1);
}
}
static void *
listen_thread_main(void *arg)
{
PGconn *conn;
PGresult *res;
PGnotify *notify;
int nnotifies;
/* Make a connection to the database */
conn = PQconnectdb("dbname=postgres");
/* Check to see that the backend connection was successfully made */
if (PQstatus(conn) != CONNECTION_OK)
{
fprintf(stderr, "%s", PQerrorMessage(conn));
exit_nicely(conn);
}
/*
* Issue LISTEN command to enable notifications from the rule's NOTIFY.
*/
res = PQexec(conn, "LISTEN testchannel");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "LISTEN command failed: %s", PQerrorMessage(conn));
PQclear(res);
exit_nicely(conn);
}
PQclear(res);
for (;;)
{
/*
* Sleep until something happens on the connection. We use select(2)
* to wait for input, but you could also use poll() or similar
* facilities.
*/
int sock;
fd_set input_mask;
sock = PQsocket(conn);
if (sock < 0)
break; /* shouldn't happen */
FD_ZERO(&input_mask);
FD_SET(sock, &input_mask);
if (select(sock + 1, &input_mask, NULL, NULL, NULL) < 0)
{
fprintf(stderr, "select() failed: %s\n", strerror(errno));
exit_nicely(conn);
}
/* Now check for input */
PQconsumeInput(conn);
while ((notify = PQnotifies(conn)) != NULL)
{
/*
fprintf(stderr,
"ASYNC NOTIFY of '%s' received from backend PID %d\n",
notify->relname, notify->be_pid);
*/
PQfreemem(notify);
PQconsumeInput(conn);
__sync_fetch_and_add(&num_notifies_received, 1);
}
}
}
typedef int64 pg_time_usec_t;
static inline pg_time_usec_t
pg_time_now(void)
{
instr_time now;
INSTR_TIME_SET_CURRENT(now);
return (pg_time_usec_t) INSTR_TIME_GET_MICROSEC(now);
}
int
main(int argc, char **argv)
{
int num_threads = 0;
pthread_t *threads;
pg_time_usec_t start;
static struct option long_options[] = {
/* systematic long/short named options */
{"listeners", required_argument, NULL, 1},
{"notifiers", required_argument, NULL, 2},
{NULL, 0, NULL, 0}
};
int num_listen_threads = 1;
int num_notify_threads = 1;
int optindex;
int c;
while ((c = getopt_long(argc, argv, "", long_options, &optindex)) != -1)
{
switch (c)
{
case 1: /* listeners */
num_listen_threads = atoi(optarg);
if (num_listen_threads < 1)
{
fprintf(stderr, "invalid --listeners argument\n");
exit(1);
}
break;
case 2: /* notifiers */
num_notify_threads = atoi(optarg);
if (num_notify_threads < 1)
{
fprintf(stderr, "invalid --notifiers argument\n");
exit(1);
}
break;
}
}
threads = malloc((num_notify_threads + num_listen_threads) * sizeof(pthread_t));
for (int i = 0; i < num_notify_threads; i++)
{
int s;
s = pthread_create(&threads[num_threads++], NULL,
¬ify_thread_main, NULL);
if (s != 0)
{
fprintf(stderr, "pthread_create failed\n");
exit(1);
}
}
for (int i = 0; i < num_listen_threads; i++)
{
int s;
s = pthread_create(&threads[num_threads++], NULL,
&listen_thread_main, NULL);
if (s != 0)
{
fprintf(stderr, "pthread_create failed\n");
exit(1);
}
}
start = pg_time_now();
for (;;)
{
double elapsed_sec;
sleep(1);
elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
fprintf(stderr, "%2.f s: %d sent, %d received\n",
elapsed_sec, num_notifies_sent, num_notifies_received);
}
return 0;
}