On 08/09/2018 12:45 PM, Andrew Dunstan wrote:


On 07/03/2018 07:52 PM, Andrew Dunstan wrote:


On 05/17/2018 01:23 AM, Thomas Munro wrote:
On Tue, Mar 27, 2018 at 9:23 AM, Rady, Doug <radyd...@amazon.com> wrote:
pgbench11-ppoll-v12.patch
Hi Doug,

FYI this patch is trying and failing to use ppoll() on Windows:

https://ci.appveyor.com/project/postgresql-cfbot/postgresql/build/1.0.30



It's still failing -  see <https://ci.appveyor.com/project/postgresql-cfbot/postgresql/build/1.0.4098>

I'm setting this back to "Waiting on Author" until that's fixed.



The author hasn't replied, but the attached seems to have cured the bitrot so that it at least applies. Let's see what the cfbot makes of it and then possibly fix any Windows issues.





And there's still a Windows problem which I think is cured in the attached patch

cheers

andrew


--
Andrew Dunstan                https://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

diff --git a/configure b/configure
index 2665213..4579003 100755
--- a/configure
+++ b/configure
@@ -14916,7 +14916,7 @@ fi
 LIBS_including_readline="$LIBS"
 LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
 
-for ac_func in cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate pstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimes wcstombs_l
+for ac_func in cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate pstat ppoll pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimes wcstombs_l
 do :
   as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh`
 ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var"
diff --git a/configure.in b/configure.in
index 397f6bc..bd08068 100644
--- a/configure.in
+++ b/configure.in
@@ -1540,7 +1540,7 @@ PGAC_FUNC_WCSTOMBS_L
 LIBS_including_readline="$LIBS"
 LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`
 
-AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate pstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimes wcstombs_l])
+AC_CHECK_FUNCS([cbrt clock_gettime dlopen fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate pstat ppoll pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimes wcstombs_l])
 
 AC_REPLACE_FUNCS(fseeko)
 case $host_os in
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 41b756c..3d378db 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -45,9 +45,18 @@
 #include <signal.h>
 #include <time.h>
 #include <sys/time.h>
+#ifndef PGBENCH_USE_SELECT			/* force use of select(2)? */
+#ifdef HAVE_PPOLL
+#define POLL_USING_PPOLL
+#include <poll.h>
+#endif
+#endif
+#ifndef POLL_USING_PPOLL
+#define POLL_USING_SELECT
 #ifdef HAVE_SYS_SELECT_H
 #include <sys/select.h>
 #endif
+#endif
 
 #ifdef HAVE_SYS_RESOURCE_H
 #include <sys/resource.h>		/* for getrlimit */
@@ -92,13 +101,19 @@ static int	pthread_join(pthread_t th, void **thread_return);
 
 /********************************************************************
  * some configurable parameters */
-
-/* max number of clients allowed */
+#ifdef POLL_USING_SELECT	/* using select(2) */
+#define SOCKET_WAIT_METHOD "select"
+typedef fd_set socket_set;
 #ifdef FD_SETSIZE
-#define MAXCLIENTS	(FD_SETSIZE - 10)
+#define MAXCLIENTS	(FD_SETSIZE - 10) /* system limited max number of clients allowed */
 #else
-#define MAXCLIENTS	1024
+#define MAXCLIENTS	1024		/* max number of clients allowed */
 #endif
+#else	/* using ppoll(2) */
+#define SOCKET_WAIT_METHOD "ppoll"
+typedef struct pollfd socket_set;
+#define MAXCLIENTS	-1		/* unlimited number of clients */
+#endif /* POLL_USING_SELECT */
 
 #define DEFAULT_INIT_STEPS "dtgvp"	/* default -I setting */
 
@@ -525,6 +540,13 @@ static void addScript(ParsedScript script);
 static void *threadRun(void *arg);
 static void setalarm(int seconds);
 static void finishCon(CState *st);
+static socket_set *alloc_socket_set(int count);
+static bool error_on_socket(socket_set *sa, int idx, PGconn *con);
+static void free_socket_set(socket_set *sa);
+static bool ignore_socket(socket_set *sa, int idx, PGconn *con);
+static void clear_socket_set(socket_set *sa, int count);
+static void set_socket(socket_set *sa, int fd, int idx);
+static int wait_on_socket_set(socket_set *sa, int nstate, int maxsock, int64 usec);
 
 
 /* callback functions for our flex lexer */
@@ -1143,6 +1165,7 @@ doConnect(void)
 			!have_password)
 		{
 			PQfinish(conn);
+			conn = NULL;
 			simple_prompt("Password: ", password, sizeof(password), false);
 			have_password = true;
 			new_pass = true;
@@ -4903,7 +4926,7 @@ main(int argc, char **argv)
 			case 'c':
 				benchmarking_option_set = true;
 				nclients = atoi(optarg);
-				if (nclients <= 0 || nclients > MAXCLIENTS)
+				if (nclients <= 0 || (MAXCLIENTS != -1 && nclients > MAXCLIENTS))
 				{
 					fprintf(stderr, "invalid number of clients: \"%s\"\n",
 							optarg);
@@ -5614,6 +5637,7 @@ threadRun(void *arg)
 	int64		next_report = last_report + (int64) progress * 1000000;
 	StatsData	last,
 				aggs;
+	socket_set	*sockets = alloc_socket_set(nstate);
 
 	/*
 	 * Initialize throttling rate target for all of the thread's clients.  It
@@ -5657,6 +5681,7 @@ threadRun(void *arg)
 		{
 			if ((state[i].con = doConnect()) == NULL)
 				goto done;
+			set_socket(sockets, PQsocket(state[i].con), i);
 		}
 	}
 
@@ -5673,13 +5698,12 @@ threadRun(void *arg)
 	/* loop till all clients have terminated */
 	while (remains > 0)
 	{
-		fd_set		input_mask;
 		int			maxsock;	/* max socket number to be waited for */
 		int64		min_usec;
 		int64		now_usec = 0;	/* set this only if needed */
 
 		/* identify which client sockets should be checked for input */
-		FD_ZERO(&input_mask);
+		clear_socket_set(sockets, nstate);
 		maxsock = -1;
 		min_usec = PG_INT64_MAX;
 		for (i = 0; i < nstate; i++)
@@ -5728,7 +5752,7 @@ threadRun(void *arg)
 					goto done;
 				}
 
-				FD_SET(sock, &input_mask);
+				set_socket(sockets, sock, i);
 				if (maxsock < sock)
 					maxsock = sock;
 			}
@@ -5765,7 +5789,7 @@ threadRun(void *arg)
 		/*
 		 * If no clients are ready to execute actions, sleep until we receive
 		 * data from the server, or a nap-time specified in the script ends,
-		 * or it's time to print a progress report.  Update input_mask to show
+		 * or it's time to print a progress report.  Update sockets to show
 		 * which client(s) received data.
 		 */
 		if (min_usec > 0)
@@ -5776,11 +5800,7 @@ threadRun(void *arg)
 			{
 				if (maxsock != -1)
 				{
-					struct timeval timeout;
-
-					timeout.tv_sec = min_usec / 1000000;
-					timeout.tv_usec = min_usec % 1000000;
-					nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
+					nsocks = wait_on_socket_set(sockets, nstate, maxsock, min_usec);
 				}
 				else			/* nothing active, simple sleep */
 				{
@@ -5789,7 +5809,7 @@ threadRun(void *arg)
 			}
 			else				/* no explicit delay, select without timeout */
 			{
-				nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
+				nsocks = wait_on_socket_set(sockets, nstate, maxsock, 0);
 			}
 
 			if (nsocks < 0)
@@ -5800,7 +5820,7 @@ threadRun(void *arg)
 					continue;
 				}
 				/* must be something wrong */
-				fprintf(stderr, "select() failed: %s\n", strerror(errno));
+				fprintf(stderr, "%s() failed: %s\n", SOCKET_WAIT_METHOD, strerror(errno));
 				goto done;
 			}
 		}
@@ -5809,7 +5829,7 @@ threadRun(void *arg)
 			/* min_usec == 0, i.e. something needs to be executed */
 
 			/* If we didn't call select(), don't try to read any data */
-			FD_ZERO(&input_mask);
+			clear_socket_set(sockets, nstate);
 		}
 
 		/* ok, advance the state machine of each connection */
@@ -5820,16 +5840,11 @@ threadRun(void *arg)
 			if (st->state == CSTATE_WAIT_RESULT)
 			{
 				/* don't call doCustom unless data is available */
-				int			sock = PQsocket(st->con);
 
-				if (sock < 0)
-				{
-					fprintf(stderr, "invalid socket: %s",
-							PQerrorMessage(st->con));
+				if (error_on_socket(sockets, i, st->con))
 					goto done;
-				}
 
-				if (!FD_ISSET(sock, &input_mask))
+				if (ignore_socket(sockets, i, st->con))
 					continue;
 			}
 			else if (st->state == CSTATE_FINISHED ||
@@ -5967,6 +5982,8 @@ done:
 		fclose(thread->logfile);
 		thread->logfile = NULL;
 	}
+	free_socket_set(sockets);
+	sockets = NULL;
 	return NULL;
 }
 
@@ -5980,6 +5997,135 @@ finishCon(CState *st)
 	}
 }
 
+#ifdef POLL_USING_SELECT	/* select(2) based socket polling */
+static socket_set *
+alloc_socket_set(int count)
+{
+	return (socket_set *) pg_malloc0(sizeof(socket_set));
+}
+
+static void
+free_socket_set(socket_set *sa)
+{
+	pg_free(sa);
+}
+
+static bool
+error_on_socket(socket_set *sa, int idx, PGconn *con)
+{
+	if (PQsocket(con) >= 0) return false;
+	fprintf(stderr, "invalid socket: %s", PQerrorMessage(con));
+	return true;
+}
+
+static bool
+ignore_socket(socket_set *sa, int idx, PGconn *con)
+{
+	return !(FD_ISSET(PQsocket(con), sa));
+}
+
+static void
+clear_socket_set(socket_set *sa, int count)
+{
+	FD_ZERO(sa);
+}
+
+static void
+set_socket(socket_set *sa, int fd, int idx)
+{
+	FD_SET(fd, sa);
+}
+
+static int
+wait_on_socket_set(socket_set *sa, int nstate, int maxsock, int64 usec)
+{
+	struct timeval timeout;
+
+	if (usec)
+	{
+		timeout.tv_sec = usec / 1000000;
+		timeout.tv_usec = usec % 1000000;
+		return select(maxsock + 1, sa, NULL, NULL, &timeout);
+	}
+	else
+	{
+		return select(maxsock + 1, sa, NULL, NULL, NULL);
+	}
+}
+#else	/* ppoll(2) based socket polling */
+/* ppoll() will block until timeout or one of POLL_EVENTS occurs. */
+#define POLL_EVENTS (POLLRDHUP|POLLIN|POLLPRI)
+/* ppoll() events returned that we do not want/expect to see. */
+#define POLL_UNWANTED (POLLRDHUP|POLLERR|POLLHUP|POLLNVAL)
+
+static socket_set *
+alloc_socket_set(int count)
+{
+	return (socket_set *) pg_malloc0(sizeof(socket_set) * count);
+}
+
+static void
+free_socket_set(socket_set *sa)
+{
+	pg_free(sa);
+}
+
+static bool
+error_on_socket(socket_set *sa, int idx, PGconn *con)
+{
+	/*
+	 * No error if socket not used or non-error status from PQsocket() and none
+	 * of the unwanted ppoll() return events.
+	 */
+	if (sa[idx].fd == -1 || (PQsocket(con) >= 0 && !(sa[idx].revents & POLL_UNWANTED)))
+		return false;
+	fprintf(stderr, "invalid socket: %s", PQerrorMessage(con));
+	if (debug)
+		fprintf(stderr, "ppoll() fail - errno: %d, socket: %d, events: %x\n",
+			errno, sa[idx].fd, (sa[idx].revents & POLL_UNWANTED));
+	return true;
+}
+
+static bool
+ignore_socket(socket_set *sa, int idx, PGconn *con)
+{
+	return (sa[idx].fd != -1 && !sa[idx].revents);
+}
+
+static void
+clear_socket_set(socket_set *sa, int count)
+{
+	int i = 0;
+	for (i = 0; i < count; i++)
+		set_socket(sa, -1, i);
+}
+
+static void
+set_socket(socket_set *sa, int fd, int idx)
+{
+	sa[idx].fd = fd;
+	sa[idx].events = POLL_EVENTS;
+	sa[idx].revents = 0;
+}
+
+static int
+wait_on_socket_set(socket_set *sa, int nstate, int maxsock, int64 usec)
+{
+	struct timespec timeout;
+
+	if (usec)
+	{
+		timeout.tv_sec = usec / 1000000;
+		timeout.tv_nsec = usec % 1000000000;
+		return ppoll(sa, nstate, &timeout, NULL);
+	}
+	else
+	{
+		return ppoll(sa, nstate, NULL, NULL);
+	}
+}
+#endif	/* PGBENCH_USE_SELECT */
+
 /*
  * Support for duration option: set timer_exceeded after so many seconds.
  */
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index b7e4696..5e0e5f3 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -434,6 +434,9 @@
 /* Define to 1 if the assembler supports PPC's LWARX mutex hint bit. */
 #undef HAVE_PPC_LWARX_MUTEX_HINT
 
+/* Define to 1 if you have the `ppoll' function. */
+#undef HAVE_PPOLL
+
 /* Define to 1 if you have the `pstat' function. */
 #undef HAVE_PSTAT
 

Reply via email to