On Wed, 2 Aug 2023 16:37:53 +0900
Yugo NAGATA <nag...@sraoss.co.jp> wrote:

> Hello Fabien,
> 
> On Fri, 14 Jul 2023 20:32:01 +0900
> Yugo NAGATA <nag...@sraoss.co.jp> wrote:
> 
> I attached the updated patch.

I'm sorry. I forgot to attach the patch.

Regards,
Yugo Nagata

> 
> > Hello Fabien,
> > 
> > Thank you for your review!
> > 
> > On Mon, 3 Jul 2023 20:39:23 +0200 (CEST)
> > Fabien COELHO <coe...@cri.ensmp.fr> wrote:
> > 
> > > 
> > > Yugo-san,
> > > 
> > > Some feedback about v1 of this patch.
> > > 
> > > Patch applies cleanly, compiles.
> > > 
> > > There are no tests, could there be one? ISTM that one could be done with 
> > > a 
> > > "SELECT pg_sleep(...)" script??
> > 
> > Agreed. I will add the test.
> 
> I added a TAP test.
> 
> > 
> > > The global name "all_state" is quite ambiguous, what about 
> > > "client_states" 
> > > instead? Or maybe it could be avoided, see below.
> > > 
> > > Instead of renaming "state" to "all_state" (or client_states as suggested 
> > > above), I'd suggest to minimize the patch by letting "state" inside the 
> > > main and adding a "client_states = state;" just after the allocation, or 
> > > another approach, see below.
> > 
> > Ok, I'll fix to add a global variable "client_states" and make this point to
> > "state" instead of changing "state" to global.
> 
> Done.
> 
> >  
> > > Should PQfreeCancel be called on deconnections, in finishCon? I think 
> > > that 
> > > there may be a memory leak with the current implementation??
> > 
> > Agreed. I'll fix.
> 
> Done.
> 
> Regards,
> Yugo Nagata
> 
> >  
> > > Maybe it should check that cancel is not NULL before calling PQcancel?
> > 
> > I think this is already checked as below, but am I missing something?
> > 
> > +       if (all_state[i].cancel != NULL)
> > +           (void) PQcancel(all_state[i].cancel, errbuf, sizeof(errbuf));
> > 
> > > After looking at the code, I'm very unclear whether they may be some 
> > > underlying race conditions, or not, depending on when the cancel is 
> > > triggered. I think that some race conditions are still likely given the 
> > > current thread 0 implementation, and dealing with them with a barrier or 
> > > whatever is not desirable at all.
> > > 
> > > In order to work around this issue, ISTM that we should go away from the 
> > > simple and straightforward thread 0 approach, and the only clean way is 
> > > that the cancelation should be managed by each thread for its own client.
> > > 
> > > I'd suggest to have the advanceState to call PQcancel when 
> > > CancelRequested 
> > > is set and switch to CSTATE_ABORTED to end properly. This means that 
> > > there 
> > > would be no need for the global client states pointer, so the patch 
> > > should 
> > > be smaller and simpler. Possibly there would be some shortcuts added here 
> > > and there to avoid lingering after the control-C, in threadRun.
> > 
> > I am not sure this approach is simpler than mine. 
> > 
> > In multi-threads, only one thread can catches the signal and other threads
> > continue to run. Therefore, if Ctrl+C is pressed while threads are waiting
> > responses from the backend in wait_on_socket_set, only one thread can be
> > interrupted and return, but other threads will continue to wait and cannot
> > check CancelRequested. So, for implementing your suggestion, we need any 
> > hack
> > to make all threads return from wait_on_socket_set when the event occurs, 
> > but
> > I don't have idea to do this in simpler way. 
> > 
> > In my patch, all threads can return from wait_on_socket_set at Ctrl+C
> > because when thread #0 cancels all connections, the following error is
> > sent to all sessions:
> > 
> >  ERROR:  canceling statement due to user request
> > 
> > and all threads will receive the response from the backend.
> > 
> > Regards,
> > Yugo Nagata
> > 
> > -- 
> > Yugo NAGATA <nag...@sraoss.co.jp>
> > 
> > 
> 
> 
> -- 
> Yugo NAGATA <nag...@sraoss.co.jp>
> 
> 


-- 
Yugo NAGATA <nag...@sraoss.co.jp>
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 539c2795e2..68278d2b18 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -596,6 +596,7 @@ typedef enum
 typedef struct
 {
 	PGconn	   *con;			/* connection handle to DB */
+	PGcancel   *cancel;			/* query cancel */
 	int			id;				/* client No. */
 	ConnectionStateEnum state;	/* state machine's current state. */
 	ConditionalStack cstack;	/* enclosing conditionals state */
@@ -638,6 +639,8 @@ typedef struct
 								 * here */
 } CState;
 
+CState	*client_states;		/* status of all clients */
+
 /*
  * Thread state
  */
@@ -3631,6 +3634,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 						st->state = CSTATE_ABORTED;
 						break;
 					}
+					st->cancel = PQgetCancel(st->con);
 
 					/* reset now after connection */
 					now = pg_time_now();
@@ -4662,6 +4666,18 @@ disconnect_all(CState *state, int length)
 		finishCon(&state[i]);
 }
 
+/* send cancel requests to all connections */
+static void
+cancel_all()
+{
+	for (int i = 0; i < nclients; i++)
+	{
+		char errbuf[1];
+		if (client_states[i].cancel != NULL)
+			(void) PQcancel(client_states[i].cancel, errbuf, sizeof(errbuf));
+	}
+}
+
 /*
  * Remove old pgbench tables, if any exist
  */
@@ -7133,6 +7149,9 @@ main(int argc, char **argv)
 		}
 	}
 
+	/* enable threads to access the status of all clients */
+	client_states = state;
+
 	/* other CState initializations */
 	for (i = 0; i < nclients; i++)
 	{
@@ -7345,6 +7364,22 @@ threadRun(void *arg)
 	StatsData	last,
 				aggs;
 
+	if (thread->tid == 0)
+		setup_cancel_handler(NULL);
+
+#if defined(ENABLE_THREAD_SAFETY) && !defined(WIN32)
+	if (thread->tid > 0 && false)
+	{
+		sigset_t	sigint_sigset;
+		sigset_t	osigset;
+		sigemptyset(&sigint_sigset);
+		sigaddset(&sigint_sigset, SIGINT);
+
+		/* Block SIGINT in all threads except one. */
+		pthread_sigmask(SIG_BLOCK, &sigint_sigset, &osigset);
+	}
+#endif
+
 	/* open log file if requested */
 	if (use_log)
 	{
@@ -7387,6 +7422,7 @@ threadRun(void *arg)
 				pg_fatal("could not create connection for client %d",
 						 state[i].id);
 			}
+			state[i].cancel = PQgetCancel(state[i].con);
 		}
 	}
 
@@ -7414,6 +7450,17 @@ threadRun(void *arg)
 		pg_time_usec_t min_usec;
 		pg_time_usec_t now = 0; /* set this only if needed */
 
+		/*
+		 * If pgbench is cancelled, sent cancel request to all connections
+		 * and exit he benchmark.
+		 */
+		if (CancelRequested)
+		{
+			if (thread->tid == 0)
+				cancel_all();
+			goto done;
+		}
+
 		/*
 		 * identify which client sockets should be checked for input, and
 		 * compute the nearest time (if any) at which we need to wake up.
@@ -7613,6 +7660,8 @@ finishCon(CState *st)
 	{
 		PQfinish(st->con);
 		st->con = NULL;
+		PQfreeCancel(st->cancel);
+		st->cancel = NULL;
 	}
 }
 
diff --git a/src/bin/pgbench/t/003_pgbench_cancel.pl b/src/bin/pgbench/t/003_pgbench_cancel.pl
new file mode 100644
index 0000000000..f5619a687c
--- /dev/null
+++ b/src/bin/pgbench/t/003_pgbench_cancel.pl
@@ -0,0 +1,84 @@
+
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+my $tempdir = PostgreSQL::Test::Utils::tempdir;
+
+my $node = PostgreSQL::Test::Cluster->new('main');
+$node->init;
+$node->start;
+
+# Test query canceling by sending SIGINT to a running pgbench
+#
+# There is, as of this writing, no documented way to get the PID of
+# the process from IPC::Run.  As a workaround, we have pgbench print its
+# own PID (which is the parent of the shell launched by pgbench) to a
+# file.
+SKIP:
+{
+	skip "cancel test requires a Unix shell", 2 if $windows_os;
+
+	local %ENV = $node->_get_env();
+
+	my ($stdin, $stdout, $stderr, @file);
+
+	# Test whether shell supports $PPID.  It's part of POSIX, but some
+	# pre-/non-POSIX shells don't support it (e.g., NetBSD).
+	$stdin = "\\! echo \$PPID";
+	IPC::Run::run([ 'psql', '-X', '-v', 'ON_ERROR_STOP=1' ],
+		'<', \$stdin, '>', \$stdout, '2>', \$stderr);
+	$stdout =~ /^\d+$/ or skip "shell apparently does not support \$PPID", 2;
+
+	@file = $node->_pgbench_make_files(
+		{
+			'003_pgbench_cancel' => qq{
+\\shell echo \$PPID >$tempdir/pgbench.pid
+select pg_sleep($PostgreSQL::Test::Utils::timeout_default);
+		}});
+
+	# Now start the real test
+	my $h = IPC::Run::start([ 'pgbench', '-c', '4', '-j', '4', '-T', "$PostgreSQL::Test::Utils::timeout_default", @file ],
+		\$stdin, \$stdout, \$stderr);
+
+	# Get the PID
+	my $count;
+	my $pgbench_pid;
+	until (
+		-s "$tempdir/pgbench.pid"
+		  and ($pgbench_pid =
+			PostgreSQL::Test::Utils::slurp_file("$tempdir/pgbench.pid")) =~
+		  /^\d+\n/s)
+	{
+		($count++ < 100 * $PostgreSQL::Test::Utils::timeout_default)
+		  or die "pid file did not appear";
+		usleep(10_000);
+	}
+
+	$node->poll_query_until('postgres',
+		q{SELECT (SELECT count(*) FROM pg_stat_activity WHERE query ~ '^select pg_sleep') = 4;}
+	) or die "timed out";
+
+	# Send cancel request
+	kill 'INT', $pgbench_pid;
+
+	my $result = finish $h;
+
+	ok(!$result, 'query failed as expected');
+	like(
+		$stderr,
+		qr/Run was aborted; the above results are incomplete/,
+		'pgbench was canceled');
+
+	is($node->safe_psql('postgres',
+		q{SELECT count(*) FROM pg_stat_activity WHERE query ~ '^select pg_sleep'}),
+		'0', 'all queries were canceled');
+}
+
+done_testing();

Reply via email to