On Wed, 6 Sep 2023 20:13:34 +0900 Yugo NAGATA <nag...@sraoss.co.jp> wrote: > I attached the updated patch v3. The changes since the previous > patch includes the following; > > I removed the unnecessary condition (&& false) that you > pointed out in [1]. > > The test was rewritten by using IPC::Run signal() and integrated > to "001_pgbench_with_server.pl". This test is skipped on Windows > because SIGINT causes to terminate the test itself as discussed > in [2] about query cancellation test in psql. > > I added some comments to describe how query cancellation is > handled as I explained in [1]. > > Also, I found the previous patch didn't work on Windows so fixed it. > On non-Windows system, a thread waiting a response of long query can > be interrupted by SIGINT, but on Windows, threads do not return from > waiting until queries they are running are cancelled. This is because, > when the signal is received, the system just creates a new thread to > execute the callback function specified by setup_cancel_handler, and > other thread continue to run[3]. Therefore, the queries have to be > cancelled in the callback function. > > [1] > https://www.postgresql.org/message-id/a58388ac-5411-4760-ea46-71324d8324cb%40mines-paristech.fr > [2] > https://www.postgresql.org/message-id/20230906004524.2fd6ee049f8a6c6f2690b99c%40sraoss.co.jp > [3] https://learn.microsoft.com/en-us/windows/console/handlerroutine
I found that --disable-thread-safety option was removed in 68a4b58eca0329. So, I removed codes involving ENABLE_THREAD_SAFETY from the patch. Also, I wrote a commit log draft. Attached is the updated version, v4. Regards, Yugo Nagata -- Yugo NAGATA <nag...@sraoss.co.jp>
>From 10578e5cf02b1a0f7f0988bc7a877dcbeb5448b6 Mon Sep 17 00:00:00 2001 From: Yugo Nagata <nag...@sraoss.co.jp> Date: Mon, 24 Jul 2023 21:53:28 +0900 Subject: [PATCH v4] Allow pgbnech to cancel queries during benchmark Previously, Ctrl+C during benchmark killed pgbench immediately, but queries running at that time were not cancelled. The commit fixes this so that cancel requests are sent for all connections before pgbench exits. In thread #0, setup_cancel_handler is called before the benchmark so that CancelRequested is set when SIGINT is sent. When SIGINT is sent during the benchmark, on non-Windows, thread #0 will be interrupted, return from I/O wait, and send cancel requests to all connections. After queries are cancelled, other threads also be interrupted and pgbench will exit at the end. On Windows, cancel requests are sent in the callback function specified by setup_cancel_hander. --- src/bin/pgbench/pgbench.c | 89 ++++++++++++++++++++ src/bin/pgbench/t/001_pgbench_with_server.pl | 42 +++++++++ 2 files changed, 131 insertions(+) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 713e8a06bb..5adf099b76 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -596,6 +596,7 @@ typedef enum typedef struct { PGconn *con; /* connection handle to DB */ + PGcancel *cancel; /* query cancel */ int id; /* client No. */ ConnectionStateEnum state; /* state machine's current state. */ ConditionalStack cstack; /* enclosing conditionals state */ @@ -638,6 +639,8 @@ typedef struct * here */ } CState; +CState *client_states; /* status of all clients */ + /* * Thread state */ @@ -837,6 +840,10 @@ static void add_socket_to_set(socket_set *sa, int fd, int idx); static int wait_on_socket_set(socket_set *sa, int64 usecs); static bool socket_has_input(socket_set *sa, int fd, int idx); +#ifdef WIN32 +static void pgbench_cancel_callback(void); +#endif + /* callback used to build rows for COPY during data loading */ typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr); @@ -3639,6 +3646,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) st->state = CSTATE_ABORTED; break; } + st->cancel = PQgetCancel(st->con); /* reset now after connection */ now = pg_time_now(); @@ -4670,6 +4678,18 @@ disconnect_all(CState *state, int length) finishCon(&state[i]); } +/* send cancel requests to all connections */ +static void +cancel_all() +{ + for (int i = 0; i < nclients; i++) + { + char errbuf[1]; + if (client_states[i].cancel != NULL) + (void) PQcancel(client_states[i].cancel, errbuf, sizeof(errbuf)); + } +} + /* * Remove old pgbench tables, if any exist */ @@ -7146,6 +7166,9 @@ main(int argc, char **argv) } } + /* enable threads to access the status of all clients */ + client_states = state; + /* other CState initializations */ for (i = 0; i < nclients; i++) { @@ -7358,6 +7381,37 @@ threadRun(void *arg) StatsData last, aggs; + /* + * Query cancellation is handled only in thread #0. + * + * On Windows, a callback function is set in which query cancel requests + * are sent to all benchmark queries running in the backend. + * + * On non-Windows, any callback function is not set. When SIGINT is + * received, CancelRequested is just set, and only thread #0 is interrupted + * and returns from waiting input from the backend. After that, the thread + * sends cancel requests to all benchmark queries. + */ + if (thread->tid == 0) +#ifdef WIN32 + setup_cancel_handler(pgbench_cancel_callback); +#else + setup_cancel_handler(NULL); +#endif + +#ifndef WIN32 + if (thread->tid > 0) + { + sigset_t sigint_sigset; + sigset_t osigset; + sigemptyset(&sigint_sigset); + sigaddset(&sigint_sigset, SIGINT); + + /* Block SIGINT in all threads except one. */ + pthread_sigmask(SIG_BLOCK, &sigint_sigset, &osigset); + } +#endif + /* open log file if requested */ if (use_log) { @@ -7400,6 +7454,7 @@ threadRun(void *arg) pg_fatal("could not create connection for client %d", state[i].id); } + state[i].cancel = PQgetCancel(state[i].con); } } @@ -7427,6 +7482,26 @@ threadRun(void *arg) pg_time_usec_t min_usec; pg_time_usec_t now = 0; /* set this only if needed */ + /* + * If pgbench is cancelled, send cancel requests to all connections + * and exit the benchmark. + * + * Note that only thread #0 can be interrupted by SIGINT while waiting + * the result from the backend. Other threads will return from waiting + * just after queries they running are cancelled by thread #0. + * + * On Windows, cancel requests are sent in the callback function, so + * do nothing but exit the benchmark. + */ + if (CancelRequested) + { +#ifndef WIN32 + if (thread->tid == 0) + cancel_all(); +#endif + goto done; + } + /* * identify which client sockets should be checked for input, and * compute the nearest time (if any) at which we need to wake up. @@ -7650,6 +7725,8 @@ finishCon(CState *st) { PQfinish(st->con); st->con = NULL; + PQfreeCancel(st->cancel); + st->cancel = NULL; } } @@ -7867,3 +7944,15 @@ socket_has_input(socket_set *sa, int fd, int idx) } #endif /* POLL_USING_SELECT */ + +#ifdef WIN32 +/* + * query cancellation callback for Windows + */ +static void +pgbench_cancel_callback(void) +{ + /* send cancel requests to all connections */ + cancel_all(); +} +#endif diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index 96be529d6b..43eadfbe9f 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -7,6 +7,7 @@ use warnings; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +use Time::HiRes qw(usleep); # Check the initial state of the data generated. Tables for tellers and # branches use NULL for their filler attribute. The table accounts uses @@ -1502,6 +1503,47 @@ update counter set i = i+1 returning i \gset # Clean up $node->safe_psql('postgres', 'DROP TABLE counter;'); +# Test query canceling by sending SIGINT to a running pgbench +SKIP: +{ + skip "sending SIGINT on Windows terminates the test itself", 3 + if $windows_os; + + my ($stdin, $stdout, $stderr, @file); + + @file = $node->_pgbench_make_files( + { + '003_pgbench_cancel' => qq{ +select pg_sleep($PostgreSQL::Test::Utils::timeout_default); + }}); + + local %ENV = $node->_get_env(); + + my $h = IPC::Run::start( + [ 'pgbench', '-c', '2', '-j', '2', + '-T', "$PostgreSQL::Test::Utils::timeout_default", @file ], + \$stdin, \$stdout, \$stderr); + + $node->poll_query_until('postgres', + q{SELECT (SELECT count(*) FROM pg_stat_activity WHERE query ~ '^select pg_sleep') = 2;} + ) or die "timed out"; + + # Send cancel request + $h->signal('INT'); + + my $result = finish $h; + + ok(!$result, 'pgbench failed as expected'); + like( + $stderr, + qr/Run was aborted; the above results are incomplete/, + 'pgbench was canceled'); + + is($node->safe_psql('postgres', + q{SELECT count(*) FROM pg_stat_activity WHERE query ~ '^select pg_sleep'}), + '0', 'all queries were canceled'); +} + # done $node->safe_psql('postgres', 'DROP TABLESPACE regress_pgbench_tap_1_ts'); $node->stop; -- 2.25.1