Hi Fabien,

On Thu, 10 Aug 2023 12:32:44 +0900
Yugo NAGATA <nag...@sraoss.co.jp> wrote:

> On Wed, 9 Aug 2023 11:18:43 +0200 (CEST)
> Fabien COELHO <coe...@cri.ensmp.fr> wrote:
> 
> > 
> > I forgot, about the test:
> > 
> > I think that it should be integrated in the existing 
> > "001_pgbench_with_server.pl" script, because a TAP script is pretty 
> > expensive as it creates a cluster, starts it… before running the test.
> 
> Ok. I'll integrate the test into 001.
>  
> > I'm surprise that IPC::Run does not give any access to the process number. 
> > Anyway, its object interface seems to allow sending signal:
> > 
> >     $h->signal("...")
> > 
> > So the code could be simplified to use that after a small delay.
> 
> Thank you for your information.
> 
> I didn't know $h->signal() and  I mimicked the way of
> src/bin/psql/t/020_cancel.pl to send SIGINT to a running program. I don't
> know why the psql test doesn't use the interface, I'll investigate whether
> this can be used in our purpose, anyway.

I attached the updated patch v3. The changes since the previous
patch includes the following;

I removed the unnecessary condition (&& false) that you
pointed out in [1].

The test was rewritten by using IPC::Run signal() and integrated
to "001_pgbench_with_server.pl". This test is skipped on Windows
because SIGINT causes to terminate the test itself as discussed
in [2] about query cancellation test in psql.

I added some comments to describe how query cancellation is
handled as I explained in [1].

Also, I found the previous patch didn't work on Windows so fixed it.
On non-Windows system, a thread waiting a response of long query can
be interrupted by SIGINT, but on Windows, threads do not return from
waiting until queries they are running are cancelled. This is because,
when the signal is received, the system just creates a new thread to
execute the callback function specified by setup_cancel_handler, and
other thread continue to run[3]. Therefore, the queries have to be
cancelled in the callback function.

[1] 
https://www.postgresql.org/message-id/a58388ac-5411-4760-ea46-71324d8324cb%40mines-paristech.fr
[2] 
https://www.postgresql.org/message-id/20230906004524.2fd6ee049f8a6c6f2690b99c%40sraoss.co.jp
[3] https://learn.microsoft.com/en-us/windows/console/handlerroutine

Regards,
Yugo Nagata

> 
> -- 
> Yugo NAGATA <nag...@sraoss.co.jp>
> 
> 


-- 
Yugo NAGATA <nag...@sraoss.co.jp>
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 713e8a06bb..1e396865f7 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -596,6 +596,7 @@ typedef enum
 typedef struct
 {
 	PGconn	   *con;			/* connection handle to DB */
+	PGcancel   *cancel;			/* query cancel */
 	int			id;				/* client No. */
 	ConnectionStateEnum state;	/* state machine's current state. */
 	ConditionalStack cstack;	/* enclosing conditionals state */
@@ -638,6 +639,8 @@ typedef struct
 								 * here */
 } CState;
 
+CState	*client_states;		/* status of all clients */
+
 /*
  * Thread state
  */
@@ -837,6 +840,10 @@ static void add_socket_to_set(socket_set *sa, int fd, int idx);
 static int	wait_on_socket_set(socket_set *sa, int64 usecs);
 static bool socket_has_input(socket_set *sa, int fd, int idx);
 
+#ifdef WIN32
+static void pgbench_cancel_callback(void);
+#endif
+
 /* callback used to build rows for COPY during data loading */
 typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
 
@@ -3639,6 +3646,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 						st->state = CSTATE_ABORTED;
 						break;
 					}
+					st->cancel = PQgetCancel(st->con);
 
 					/* reset now after connection */
 					now = pg_time_now();
@@ -4670,6 +4678,18 @@ disconnect_all(CState *state, int length)
 		finishCon(&state[i]);
 }
 
+/* send cancel requests to all connections */
+static void
+cancel_all()
+{
+	for (int i = 0; i < nclients; i++)
+	{
+		char errbuf[1];
+		if (client_states[i].cancel != NULL)
+			(void) PQcancel(client_states[i].cancel, errbuf, sizeof(errbuf));
+	}
+}
+
 /*
  * Remove old pgbench tables, if any exist
  */
@@ -7146,6 +7166,9 @@ main(int argc, char **argv)
 		}
 	}
 
+	/* enable threads to access the status of all clients */
+	client_states = state;
+
 	/* other CState initializations */
 	for (i = 0; i < nclients; i++)
 	{
@@ -7358,6 +7381,37 @@ threadRun(void *arg)
 	StatsData	last,
 				aggs;
 
+	/*
+	 * Query cancellation is handled only in thread #0.
+	 *
+	 * On Windows, a callback function is set in which query cancel requests
+	 * are sent to all benchmark queries running in the backend.
+	 *
+	 * On non-Windows, any callback function is not set. When SIGINT is
+	 * received, CancelRequested is just set, and only thread #0 is interrupted
+	 * and returns from waiting input from the backend. After that, the thread
+	 * sends cancel requests to all benchmark queries.
+	 */
+	if (thread->tid == 0)
+#ifdef WIN32
+		setup_cancel_handler(pgbench_cancel_callback);
+#else
+		setup_cancel_handler(NULL);
+#endif
+
+#if defined(ENABLE_THREAD_SAFETY) && !defined(WIN32)
+	if (thread->tid > 0)
+	{
+		sigset_t	sigint_sigset;
+		sigset_t	osigset;
+		sigemptyset(&sigint_sigset);
+		sigaddset(&sigint_sigset, SIGINT);
+
+		/* Block SIGINT in all threads except one. */
+		pthread_sigmask(SIG_BLOCK, &sigint_sigset, &osigset);
+	}
+#endif
+
 	/* open log file if requested */
 	if (use_log)
 	{
@@ -7400,6 +7454,7 @@ threadRun(void *arg)
 				pg_fatal("could not create connection for client %d",
 						 state[i].id);
 			}
+			state[i].cancel = PQgetCancel(state[i].con);
 		}
 	}
 
@@ -7427,6 +7482,26 @@ threadRun(void *arg)
 		pg_time_usec_t min_usec;
 		pg_time_usec_t now = 0; /* set this only if needed */
 
+		/*
+		 * If pgbench is cancelled, send cancel requests to all connections
+		 * and exit the benchmark.
+		 *
+		 * Note that only thread #0 can be interrupted by SIGINT while waiting
+		 * the result from the backend. Other threads will return from waiting
+		 * just after queries they running are cancelled by thread #0.
+		 *
+		 * On Windows, cancel requests are sent in the callback function, so
+		 * do nothing but exit the benchmark.
+		 */
+		if (CancelRequested)
+		{
+#ifndef WIN32
+			if (thread->tid == 0)
+				cancel_all();
+#endif
+			goto done;
+		}
+
 		/*
 		 * identify which client sockets should be checked for input, and
 		 * compute the nearest time (if any) at which we need to wake up.
@@ -7650,6 +7725,8 @@ finishCon(CState *st)
 	{
 		PQfinish(st->con);
 		st->con = NULL;
+		PQfreeCancel(st->cancel);
+		st->cancel = NULL;
 	}
 }
 
@@ -7867,3 +7944,15 @@ socket_has_input(socket_set *sa, int fd, int idx)
 }
 
 #endif							/* POLL_USING_SELECT */
+
+#ifdef WIN32
+/*
+ * query cancellation callback for Windows
+ */
+static void
+pgbench_cancel_callback(void)
+{
+	/* send cancel requests to all connections */
+	cancel_all();
+}
+#endif
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index 96be529d6b..43eadfbe9f 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -7,6 +7,7 @@ use warnings;
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
+use Time::HiRes qw(usleep);
 
 # Check the initial state of the data generated.  Tables for tellers and
 # branches use NULL for their filler attribute.  The table accounts uses
@@ -1502,6 +1503,47 @@ update counter set i = i+1 returning i \gset
 # Clean up
 $node->safe_psql('postgres', 'DROP TABLE counter;');
 
+# Test query canceling by sending SIGINT to a running pgbench
+SKIP:
+{
+	skip "sending SIGINT on Windows terminates the test itself", 3
+	  if $windows_os;
+
+	my ($stdin, $stdout, $stderr, @file);
+
+	@file = $node->_pgbench_make_files(
+		{
+			'003_pgbench_cancel' => qq{
+select pg_sleep($PostgreSQL::Test::Utils::timeout_default);
+		}});
+
+	local %ENV = $node->_get_env();
+
+	my $h = IPC::Run::start(
+		[ 'pgbench', '-c', '2', '-j', '2',
+		  '-T', "$PostgreSQL::Test::Utils::timeout_default", @file ],
+		\$stdin, \$stdout, \$stderr);
+
+	$node->poll_query_until('postgres',
+		q{SELECT (SELECT count(*) FROM pg_stat_activity WHERE query ~ '^select pg_sleep') = 2;}
+	) or die "timed out";
+
+	# Send cancel request
+	$h->signal('INT');
+
+	my $result = finish $h;
+
+	ok(!$result, 'pgbench failed as expected');
+	like(
+		$stderr,
+		qr/Run was aborted; the above results are incomplete/,
+		'pgbench was canceled');
+
+	is($node->safe_psql('postgres',
+		q{SELECT count(*) FROM pg_stat_activity WHERE query ~ '^select pg_sleep'}),
+		'0', 'all queries were canceled');
+}
+
 # done
 $node->safe_psql('postgres', 'DROP TABLESPACE regress_pgbench_tap_1_ts');
 $node->stop;

Reply via email to