On Wed, 2 Aug 2023 16:37:53 +0900 Yugo NAGATA <nag...@sraoss.co.jp> wrote:
> Hello Fabien, > > On Fri, 14 Jul 2023 20:32:01 +0900 > Yugo NAGATA <nag...@sraoss.co.jp> wrote: > > I attached the updated patch. I'm sorry. I forgot to attach the patch. Regards, Yugo Nagata > > > Hello Fabien, > > > > Thank you for your review! > > > > On Mon, 3 Jul 2023 20:39:23 +0200 (CEST) > > Fabien COELHO <coe...@cri.ensmp.fr> wrote: > > > > > > > > Yugo-san, > > > > > > Some feedback about v1 of this patch. > > > > > > Patch applies cleanly, compiles. > > > > > > There are no tests, could there be one? ISTM that one could be done with > > > a > > > "SELECT pg_sleep(...)" script?? > > > > Agreed. I will add the test. > > I added a TAP test. > > > > > > The global name "all_state" is quite ambiguous, what about > > > "client_states" > > > instead? Or maybe it could be avoided, see below. > > > > > > Instead of renaming "state" to "all_state" (or client_states as suggested > > > above), I'd suggest to minimize the patch by letting "state" inside the > > > main and adding a "client_states = state;" just after the allocation, or > > > another approach, see below. > > > > Ok, I'll fix to add a global variable "client_states" and make this point to > > "state" instead of changing "state" to global. > > Done. > > > > > > Should PQfreeCancel be called on deconnections, in finishCon? I think > > > that > > > there may be a memory leak with the current implementation?? > > > > Agreed. I'll fix. > > Done. > > Regards, > Yugo Nagata > > > > > > Maybe it should check that cancel is not NULL before calling PQcancel? > > > > I think this is already checked as below, but am I missing something? > > > > + if (all_state[i].cancel != NULL) > > + (void) PQcancel(all_state[i].cancel, errbuf, sizeof(errbuf)); > > > > > After looking at the code, I'm very unclear whether they may be some > > > underlying race conditions, or not, depending on when the cancel is > > > triggered. I think that some race conditions are still likely given the > > > current thread 0 implementation, and dealing with them with a barrier or > > > whatever is not desirable at all. > > > > > > In order to work around this issue, ISTM that we should go away from the > > > simple and straightforward thread 0 approach, and the only clean way is > > > that the cancelation should be managed by each thread for its own client. > > > > > > I'd suggest to have the advanceState to call PQcancel when > > > CancelRequested > > > is set and switch to CSTATE_ABORTED to end properly. This means that > > > there > > > would be no need for the global client states pointer, so the patch > > > should > > > be smaller and simpler. Possibly there would be some shortcuts added here > > > and there to avoid lingering after the control-C, in threadRun. > > > > I am not sure this approach is simpler than mine. > > > > In multi-threads, only one thread can catches the signal and other threads > > continue to run. Therefore, if Ctrl+C is pressed while threads are waiting > > responses from the backend in wait_on_socket_set, only one thread can be > > interrupted and return, but other threads will continue to wait and cannot > > check CancelRequested. So, for implementing your suggestion, we need any > > hack > > to make all threads return from wait_on_socket_set when the event occurs, > > but > > I don't have idea to do this in simpler way. > > > > In my patch, all threads can return from wait_on_socket_set at Ctrl+C > > because when thread #0 cancels all connections, the following error is > > sent to all sessions: > > > > ERROR: canceling statement due to user request > > > > and all threads will receive the response from the backend. > > > > Regards, > > Yugo Nagata > > > > -- > > Yugo NAGATA <nag...@sraoss.co.jp> > > > > > > > -- > Yugo NAGATA <nag...@sraoss.co.jp> > > -- Yugo NAGATA <nag...@sraoss.co.jp>
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 539c2795e2..68278d2b18 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -596,6 +596,7 @@ typedef enum typedef struct { PGconn *con; /* connection handle to DB */ + PGcancel *cancel; /* query cancel */ int id; /* client No. */ ConnectionStateEnum state; /* state machine's current state. */ ConditionalStack cstack; /* enclosing conditionals state */ @@ -638,6 +639,8 @@ typedef struct * here */ } CState; +CState *client_states; /* status of all clients */ + /* * Thread state */ @@ -3631,6 +3634,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) st->state = CSTATE_ABORTED; break; } + st->cancel = PQgetCancel(st->con); /* reset now after connection */ now = pg_time_now(); @@ -4662,6 +4666,18 @@ disconnect_all(CState *state, int length) finishCon(&state[i]); } +/* send cancel requests to all connections */ +static void +cancel_all() +{ + for (int i = 0; i < nclients; i++) + { + char errbuf[1]; + if (client_states[i].cancel != NULL) + (void) PQcancel(client_states[i].cancel, errbuf, sizeof(errbuf)); + } +} + /* * Remove old pgbench tables, if any exist */ @@ -7133,6 +7149,9 @@ main(int argc, char **argv) } } + /* enable threads to access the status of all clients */ + client_states = state; + /* other CState initializations */ for (i = 0; i < nclients; i++) { @@ -7345,6 +7364,22 @@ threadRun(void *arg) StatsData last, aggs; + if (thread->tid == 0) + setup_cancel_handler(NULL); + +#if defined(ENABLE_THREAD_SAFETY) && !defined(WIN32) + if (thread->tid > 0 && false) + { + sigset_t sigint_sigset; + sigset_t osigset; + sigemptyset(&sigint_sigset); + sigaddset(&sigint_sigset, SIGINT); + + /* Block SIGINT in all threads except one. */ + pthread_sigmask(SIG_BLOCK, &sigint_sigset, &osigset); + } +#endif + /* open log file if requested */ if (use_log) { @@ -7387,6 +7422,7 @@ threadRun(void *arg) pg_fatal("could not create connection for client %d", state[i].id); } + state[i].cancel = PQgetCancel(state[i].con); } } @@ -7414,6 +7450,17 @@ threadRun(void *arg) pg_time_usec_t min_usec; pg_time_usec_t now = 0; /* set this only if needed */ + /* + * If pgbench is cancelled, sent cancel request to all connections + * and exit he benchmark. + */ + if (CancelRequested) + { + if (thread->tid == 0) + cancel_all(); + goto done; + } + /* * identify which client sockets should be checked for input, and * compute the nearest time (if any) at which we need to wake up. @@ -7613,6 +7660,8 @@ finishCon(CState *st) { PQfinish(st->con); st->con = NULL; + PQfreeCancel(st->cancel); + st->cancel = NULL; } } diff --git a/src/bin/pgbench/t/003_pgbench_cancel.pl b/src/bin/pgbench/t/003_pgbench_cancel.pl new file mode 100644 index 0000000000..f5619a687c --- /dev/null +++ b/src/bin/pgbench/t/003_pgbench_cancel.pl @@ -0,0 +1,84 @@ + +# Copyright (c) 2021-2023, PostgreSQL Global Development Group + +use strict; +use warnings; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; +use Time::HiRes qw(usleep); + +my $tempdir = PostgreSQL::Test::Utils::tempdir; + +my $node = PostgreSQL::Test::Cluster->new('main'); +$node->init; +$node->start; + +# Test query canceling by sending SIGINT to a running pgbench +# +# There is, as of this writing, no documented way to get the PID of +# the process from IPC::Run. As a workaround, we have pgbench print its +# own PID (which is the parent of the shell launched by pgbench) to a +# file. +SKIP: +{ + skip "cancel test requires a Unix shell", 2 if $windows_os; + + local %ENV = $node->_get_env(); + + my ($stdin, $stdout, $stderr, @file); + + # Test whether shell supports $PPID. It's part of POSIX, but some + # pre-/non-POSIX shells don't support it (e.g., NetBSD). + $stdin = "\\! echo \$PPID"; + IPC::Run::run([ 'psql', '-X', '-v', 'ON_ERROR_STOP=1' ], + '<', \$stdin, '>', \$stdout, '2>', \$stderr); + $stdout =~ /^\d+$/ or skip "shell apparently does not support \$PPID", 2; + + @file = $node->_pgbench_make_files( + { + '003_pgbench_cancel' => qq{ +\\shell echo \$PPID >$tempdir/pgbench.pid +select pg_sleep($PostgreSQL::Test::Utils::timeout_default); + }}); + + # Now start the real test + my $h = IPC::Run::start([ 'pgbench', '-c', '4', '-j', '4', '-T', "$PostgreSQL::Test::Utils::timeout_default", @file ], + \$stdin, \$stdout, \$stderr); + + # Get the PID + my $count; + my $pgbench_pid; + until ( + -s "$tempdir/pgbench.pid" + and ($pgbench_pid = + PostgreSQL::Test::Utils::slurp_file("$tempdir/pgbench.pid")) =~ + /^\d+\n/s) + { + ($count++ < 100 * $PostgreSQL::Test::Utils::timeout_default) + or die "pid file did not appear"; + usleep(10_000); + } + + $node->poll_query_until('postgres', + q{SELECT (SELECT count(*) FROM pg_stat_activity WHERE query ~ '^select pg_sleep') = 4;} + ) or die "timed out"; + + # Send cancel request + kill 'INT', $pgbench_pid; + + my $result = finish $h; + + ok(!$result, 'query failed as expected'); + like( + $stderr, + qr/Run was aborted; the above results are incomplete/, + 'pgbench was canceled'); + + is($node->safe_psql('postgres', + q{SELECT count(*) FROM pg_stat_activity WHERE query ~ '^select pg_sleep'}), + '0', 'all queries were canceled'); +} + +done_testing();