On 2020-04-04 03:14, Alexander Korotkov wrote:
I think that now we would be fine with single LSN and single TIMEOUT.
In future we may add multiple LSNs/TIMEOUTs or/and support for
expressions as LSNs/TIMEOUTs if we figure out it's necessary.

I also think it's good to couple waiting for lsn with beginning of
transaction is good idea.  Separate WAIT FOR LSN statement called in
the middle of transaction looks problematic for me. Imagine we have RR
isolation and already acquired the snapshot.  Then out snapshot can
block applying wal records, which we are waiting for.  That would be
implicit deadlock.  It would be nice to evade such deadlocks by
design.
Ok, here is a new version of patch with single LSN and TIMEOUT.

Synopsis
==========
BEGIN [ WORK | TRANSACTION ] [ transaction_mode [, ...] ] [WAIT FOR LSN 'lsn' [ TIMEOUT 'value']]
and
START TRANSACTION [ transaction_mode [, ...] ] [WAIT FOR LSN 'lsn' [ TIMEOUT 'value']]
     where lsn is result of pg_current_wal_flush_lsn on master.
     and value is uint time interval in milliseconds.
Description
==========
BEGIN/START...WAIT FOR - pause the start of transaction until a specified LSN has been replayed. (Don’t open transaction if lsn is not reached on timeout).

How to use it
==========
WAIT FOR LSN ‘LSN’ [, timeout in ms];

# Before starting transaction, wait until LSN 0/84832E8 is replayed. Wait time is
not limited here because a timeout was not specified
BEGIN WAIT FOR LSN '0/84832E8';

# Before starting transaction, wait until LSN 0/84832E8 is replayed. Limit the wait time with 10 seconds, and if LSN is not reached by then, don't start the transaction.
START TRANSACTION WAIT FOR LSN '0/8DFFB88' TIMEOUT 10000;

# Same as previous, but with transaction isolation level = REPEATABLE READ BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ WAIT FOR LSN '0/815C0F1' TIMEOUT 10000;

Notice: WAIT FOR will release on PostmasterDeath or Interruption events
if they come earlier than LSN or timeout.

Testing the implementation
======================
The implementation was tested with src/test/recovery/t/020_begin_wait.pl

--
Ivan Kartyshov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index c23bbfb4e71..7a71769cd8f 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] [ WAIT FOR LSN <replaceable class="parameter">lsn_value</replaceable> [TIMEOUT <replaceable class="parameter">number_of_milliseconds</replaceable> ] ]
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
@@ -63,6 +63,16 @@ BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</
    <xref linkend="sql-set-transaction"/>
    was executed.
   </para>
+
+  <para>
+   The <literal>WAIT FOR</literal> clause allows to wait for the target log
+   sequence number (<acronym>LSN</acronym>) to be replayed on standby before
+   starting the transaction in <productname>PostgreSQL</productname> databases
+   with master-standby asynchronous replication. Wait time can be limited by
+   specifying a timeout, which is measured in milliseconds and must be an
+   integer. Waiting can be interrupted using <literal>Ctrl+C</literal>, or by
+   shutting down the <literal>postgres</literal> server.
+  </para>
  </refsect1>
 
  <refsect1>
@@ -146,6 +156,10 @@ BEGIN;
    different purpose in embedded SQL. You are advised to be careful
    about the transaction semantics when porting database applications.
   </para>
+
+  <para>
+   There is no <command>WAIT FOR</command> clause in the SQL standard.
+  </para>
  </refsect1>
 
  <refsect1>
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index d6cd1d41779..f5412c2ca7b 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] [ WAIT FOR LSN <replaceable class="parameter">lsn_value</replaceable> [TIMEOUT <replaceable class="parameter">number_of_milliseconds</replaceable> ] ]
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
@@ -40,6 +40,16 @@ START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable
    characteristics, as if <xref linkend="sql-set-transaction"/> was executed. This is the same
    as the <xref linkend="sql-begin"/> command.
   </para>
+
+  <para>
+   The <literal>WAIT FOR</literal> clause allows to wait for the target log
+   sequence number (<acronym>LSN</acronym>) to be replayed on standby before
+   starting the transaction in <productname>PostgreSQL</productname> databases
+   with master-standby asynchronous replication. Wait time can be limited by
+   specifying a timeout, which is measured in milliseconds and must be an
+   integer. Waiting can be interrupted using <literal>Ctrl+C</literal>, or by
+   shutting down the <literal>postgres</literal> server.
+  </para>
  </refsect1>
 
  <refsect1>
@@ -78,6 +88,10 @@ START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable
    omitted.
   </para>
 
+  <para>
+   There is no <command>WAIT FOR</command> clause in the SQL standard.
+  </para>
+
   <para>
    See also the compatibility section of <xref linkend="sql-set-transaction"/>.
   </para>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index abf954ba392..d2856c88943 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -42,6 +42,7 @@
 #include "catalog/pg_database.h"
 #include "commands/progress.h"
 #include "commands/tablespace.h"
+#include "commands/wait.h"
 #include "common/controldata_utils.h"
 #include "executor/instrument.h"
 #include "miscadmin.h"
@@ -7332,6 +7333,15 @@ StartupXLOG(void)
 					break;
 				}
 
+				/*
+				 * If we replayed an LSN that someone was waiting for,
+				 * set latches in shared memory array to notify the waiter.
+				 */
+				if (XLogCtl->lastReplayedEndRecPtr >= GetMinWait())
+				{
+					WaitSetLatch(XLogCtl->lastReplayedEndRecPtr);
+				}
+
 				/* Else, try to fetch the next WAL record */
 				record = ReadRecord(xlogreader, LOG, false);
 			} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index d4815d3ce65..9b310926c12 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -57,6 +57,7 @@ OBJS = \
 	user.o \
 	vacuum.o \
 	variable.o \
-	view.o
+	view.o \
+	wait.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 00000000000..66245d43882
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,279 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ *	  Implements WAIT FOR, which allows waiting for events such as
+ *	  time passing or LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <float.h>
+#include <math.h>
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlogdefs.h"
+#include "access/xlog.h"
+#include "catalog/pg_type.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/backendid.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void AddEvent(XLogRecPtr lsn_to_wait);
+static void DeleteEvent(void);
+
+/* Shared memory structure */
+typedef struct
+{
+	int			backend_maxid;
+	XLogRecPtr	min_lsn;
+	slock_t		mutex;
+	XLogRecPtr	waited_lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static volatile WaitState *state;
+
+/* Add the event of the current backend to the shared memory array */
+static void
+AddEvent(XLogRecPtr lsn_to_wait)
+{
+	SpinLockAcquire(&state->mutex);
+	if (state->backend_maxid < MyBackendId)
+		state->backend_maxid = MyBackendId;
+
+	state->waited_lsn[MyBackendId] = lsn_to_wait;
+
+	if (lsn_to_wait < state->min_lsn)
+		state->min_lsn = lsn_to_wait;
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete event of the current backend from the shared memory array.
+ *
+ * TODO: Consider state cleanup on backend failure.
+ * Check:
+ * 1) nomal|smart|fast|immediate stop
+ * 2) SIGKILL and SIGTERM
+ */
+static void
+DeleteEvent(void)
+{
+	int			i;
+	XLogRecPtr	lsn_to_delete = state->waited_lsn[MyBackendId];
+
+	state->waited_lsn[MyBackendId] = InvalidXLogRecPtr;
+
+	SpinLockAcquire(&state->mutex);
+
+	/* If we need to choose the next min_lsn, update state->min_lsn */
+	if (state->min_lsn == lsn_to_delete)
+	{
+		state->min_lsn = PG_UINT64_MAX;
+		for (i = 2; i <= state->backend_maxid; i++)
+			if (state->waited_lsn[i] != InvalidXLogRecPtr &&
+				state->waited_lsn[i] < state->min_lsn)
+				state->min_lsn = state->waited_lsn[i];
+	}
+
+	if (state->backend_maxid == MyBackendId)
+		for (i = (MyBackendId); i >= 2; i--)
+			if (state->waited_lsn[i] != InvalidXLogRecPtr)
+			{
+				state->backend_maxid = i;
+				break;
+			}
+
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+	Size		size;
+
+	size = offsetof(WaitState, waited_lsn);
+	size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+	return size;
+}
+
+/* Init array of events in shared memory */
+void
+WaitShmemInit(void)
+{
+	bool		found;
+	uint32		i;
+
+	state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+										  WaitShmemSize(),
+										  &found);
+	if (!found)
+	{
+		SpinLockInit(&state->mutex);
+
+		for (i = 0; i < (MaxBackends + 1); i++)
+			state->waited_lsn[i] = InvalidXLogRecPtr;
+
+		state->backend_maxid = 0;
+		state->min_lsn = PG_UINT64_MAX;
+	}
+}
+
+/* Set all latches in shared memory to signal that new LSN has been replayed */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+	uint32		i;
+	int 		backend_maxid;
+	PGPROC	   *backend;
+
+	SpinLockAcquire(&state->mutex);
+	backend_maxid = state->backend_maxid;
+	SpinLockRelease(&state->mutex);
+
+	for (i = 2; i <= backend_maxid; i++)
+	{
+		backend = BackendIdGetProc(i);
+		if (state->waited_lsn[i] != 0)
+		{
+			if (backend && state->waited_lsn[i] <= cur_lsn)
+				SetLatch(&backend->procLatch);
+		}
+	}
+}
+
+/* Get minimal LSN that will be next */
+XLogRecPtr
+GetMinWait(void)
+{
+	return state->min_lsn;
+}
+
+/*
+ * On WAIT use MyLatch to wait till LSN is replayed,
+ * postmaster dies or timeout happens.
+ */
+int
+WaitUtility(XLogRecPtr lsn, const float8 secs)
+{
+	XLogRecPtr	cur_lsn = GetXLogReplayRecPtr(NULL);
+	int			latch_events;
+	float8		endtime;
+	uint		res = 0;
+
+#define GetNowFloat()	((float8) GetCurrentTimestamp() / 1000000.0)
+	endtime = GetNowFloat() + secs;
+
+latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+	/* Just check if we reached */
+	if (lsn <= cur_lsn)
+		return (lsn <= cur_lsn);
+
+	AddEvent(lsn);
+
+	for (;;)
+	{
+		int			rc;
+		float8		delay = 0;
+		long		delay_ms;
+
+		if (secs > 0)
+			delay = endtime - GetNowFloat();
+		else if (secs == 0)
+			/*
+			* If we wait forever, then 1 minute timeout to check
+			* for Interupts.
+			*/
+			delay = 60;
+
+		if (delay > 0.0)
+			delay_ms = (long) ceil(delay * 1000.0);
+		else
+			break;
+
+		/*
+		 * If received an interruption from CHECK_FOR_INTERRUPTS,
+		 * then delete the current event from array.
+		 */
+		if (InterruptPending)
+		{
+			DeleteEvent();
+			ProcessInterrupts();
+		}
+
+		/* If postmaster dies, finish immediately */
+		if (!PostmasterIsAlive())
+			break;
+
+		rc = WaitLatch(MyLatch, latch_events, delay_ms,
+					   WAIT_EVENT_CLIENT_READ);
+
+		ResetLatch(MyLatch);
+
+		if (rc & WL_LATCH_SET)
+			cur_lsn = GetXLogReplayRecPtr(NULL);
+
+		/* If LSN has been replayed */
+		if (lsn <= cur_lsn)
+			break;
+	}
+
+	DeleteEvent();
+
+	if (lsn > cur_lsn)
+		elog(NOTICE,"LSN is not reached. Try to increase wait time.");
+	else
+		res = 1;
+
+	return res;
+}
+
+/* Implementation of WAIT FOR */
+int
+WaitMain(WaitStmt *stmt, DestReceiver *dest)
+{
+	TupleDesc	tupdesc;
+	TupOutputState *tstate;
+	int			res = 0;
+
+	res = WaitUtility(DatumGetLSN(
+				  DirectFunctionCall1(pg_lsn_in,CStringGetDatum(stmt->lsn))),
+				  (float8)stmt->delay/1000);
+
+	/* Need a tuple descriptor representing a single TEXT column */
+	tupdesc = CreateTemplateTupleDesc(1);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+	/* Prepare for projection of tuples */
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+	/* Send it */
+	do_text_output_oneline(tstate, res?"t":"f");
+	end_tup_output(tstate);
+	return res;
+}
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index f4aecdcbcda..b3160eb204a 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2762,6 +2762,28 @@ _outDefElem(StringInfo str, const DefElem *node)
 	WRITE_LOCATION_FIELD(location);
 }
 
+static void
+_outWaitStmt(StringInfo str, const WaitStmt *node)
+{
+	WRITE_NODE_TYPE("WAITSTMT");
+
+	WRITE_STRING_FIELD(lsn);
+	WRITE_UINT_FIELD(delay);
+}
+
+static void
+_outTransactionStmt(StringInfo str, const TransactionStmt *node)
+{
+	WRITE_NODE_TYPE("TRANSACTIONSTMT");
+
+	WRITE_STRING_FIELD(savepoint_name);
+	WRITE_STRING_FIELD(gid);
+	WRITE_NODE_FIELD(options);
+	WRITE_BOOL_FIELD(chain);
+	WRITE_ENUM_FIELD(kind, TransactionStmtKind);
+	WRITE_NODE_FIELD(wait);
+}
+
 static void
 _outTableLikeClause(StringInfo str, const TableLikeClause *node)
 {
@@ -4308,6 +4330,12 @@ outNode(StringInfo str, const void *obj)
 			case T_PartitionRangeDatum:
 				_outPartitionRangeDatum(str, obj);
 				break;
+			case T_WaitStmt:
+				_outWaitStmt(str, obj);
+				break;
+			case T_TransactionStmt:
+				_outTransactionStmt(str, obj);
+				break;
 
 			default:
 
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 6676412842b..8eba11c6221 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -326,7 +326,6 @@ transformStmt(ParseState *pstate, Node *parseTree)
 			result = transformCallStmt(pstate,
 									   (CallStmt *) parseTree);
 			break;
-
 		default:
 
 			/*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 3449c26bd11..156878d8f73 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -592,6 +592,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <partboundspec> PartitionBoundSpec
 %type <list>		hash_partbound
 %type <defelt>		hash_partbound_elem
+%type <ival>		wait_time
+%type <node>		wait_for
 
 /*
  * Non-keyword token types.  These are hard-wired into the "flex" lexer.
@@ -661,7 +663,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 	LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
 	LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
-	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN
 
 	MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
 
@@ -692,7 +694,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P
 
 	TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
-	TIES TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
+	TIES TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
 	TREAT TRIGGER TRIM TRUE_P
 	TRUNCATE TRUSTED TYPE_P TYPES_P
 
@@ -702,7 +704,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
 	VERBOSE VERSION_P VIEW VIEWS VOLATILE
 
-	WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+	WAIT WHEN WHERE WHITESPACE_P WINDOW
+	WITH WITHIN WITHOUT WORK WRAPPER WRITE
 
 	XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
 	XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -9946,18 +9949,20 @@ TransactionStmt:
 					n->chain = $3;
 					$$ = (Node *)n;
 				}
-			| BEGIN_P opt_transaction transaction_mode_list_or_empty
+			| BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
 					n->kind = TRANS_STMT_BEGIN;
 					n->options = $3;
+					n->wait = $4;
 					$$ = (Node *)n;
 				}
-			| START TRANSACTION transaction_mode_list_or_empty
+			| START TRANSACTION transaction_mode_list_or_empty wait_for
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
 					n->kind = TRANS_STMT_START;
 					n->options = $3;
+					n->wait = $4;
 					$$ = (Node *)n;
 				}
 			| COMMIT opt_transaction opt_transaction_chain
@@ -14187,6 +14192,31 @@ xml_passing_mech:
 			| BY VALUE_P
 		;
 
+/*****************************************************************************
+ *
+ *		SUBQUERY:
+ *				WAIT FOR <event>
+ *				event is one of:
+ *					LSN value TIMEOUT delay
+ *					TIMEOUT delay
+ *
+ *****************************************************************************/
+wait_for:
+			WAIT FOR LSN Sconst wait_time
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->lsn = $4;
+					n->delay = $5;
+					$$ = (Node *)n;
+				}
+			| /* EMPTY */		{ $$ = NULL; }
+		;
+
+wait_time:
+			TIMEOUT Iconst		{ $$ = $2; }
+			| /* EMPTY */		{ $$ = 0; }
+		;
+
 
 /*
  * Aggregate decoration clauses
@@ -15338,6 +15368,7 @@ unreserved_keyword:
 			| LOCK_P
 			| LOCKED
 			| LOGGED
+			| LSN
 			| MAPPING
 			| MATCH
 			| MATERIALIZED
@@ -15465,6 +15496,7 @@ unreserved_keyword:
 			| TEMPORARY
 			| TEXT_P
 			| TIES
+			| TIMEOUT
 			| TRANSACTION
 			| TRANSFORM
 			| TRIGGER
@@ -15491,6 +15523,7 @@ unreserved_keyword:
 			| VIEW
 			| VIEWS
 			| VOLATILE
+			| WAIT
 			| WHITESPACE_P
 			| WITHIN
 			| WITHOUT
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 427b0d59cde..bb8af349808 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
 #include "access/subtrans.h"
 #include "access/twophase.h"
 #include "commands/async.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -147,6 +148,7 @@ CreateSharedMemoryAndSemaphores(void)
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
+		size = add_size(size, WaitShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -264,6 +266,11 @@ CreateSharedMemoryAndSemaphores(void)
 	SyncScanShmemInit();
 	AsyncShmemInit();
 
+	/*
+	 * Init array of Latches in shared memory for WAIT
+	 */
+	WaitShmemInit();
+
 #ifdef EXEC_BACKEND
 
 	/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index b1f7f6e2d01..7345513de55 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -15,6 +15,7 @@
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
+#include <float.h>
 
 #include "access/htup_details.h"
 #include "access/reloptions.h"
@@ -57,6 +58,7 @@
 #include "commands/user.h"
 #include "commands/vacuum.h"
 #include "commands/view.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "parser/parse_utilcmd.h"
 #include "postmaster/bgwriter.h"
@@ -70,6 +72,9 @@
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+#include "utils/pg_lsn.h"
 
 /* Hook for plugins to get control in ProcessUtility() */
 ProcessUtility_hook_type ProcessUtility_hook = NULL;
@@ -591,6 +596,11 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 					case TRANS_STMT_START:
 						{
 							ListCell   *lc;
+							WaitStmt   *waitstmt = (WaitStmt *) stmt->wait;
+
+							/* If needed to WAIT FOR something but failed */
+							if (stmt->wait && WaitMain(waitstmt, dest) == 0)
+								break;
 
 							BeginTransactionBlock();
 							foreach(lc, stmt->options)
@@ -2718,6 +2728,10 @@ CreateCommandTag(Node *parsetree)
 			tag = CMDTAG_NOTIFY;
 			break;
 
+		case T_WaitStmt:
+			tag = CMDTAG_WAIT;
+			break;
+
 		case T_ListenStmt:
 			tag = CMDTAG_LISTEN;
 			break;
@@ -3357,6 +3371,10 @@ GetCommandLogLevel(Node *parsetree)
 			lev = LOGSTMT_ALL;
 			break;
 
+		case T_WaitStmt:
+			lev = LOGSTMT_ALL;
+			break;
+
 		case T_ListenStmt:
 			lev = LOGSTMT_ALL;
 			break;
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 00000000000..e612eb6138c
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,25 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ *	  prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group       
+ * Portions Copyright (c) 2020, Regents of PostgresPro 
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+
+extern int WaitUtility(XLogRecPtr lsn, const float8 delay);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWait(void);
+extern int WaitMain(WaitStmt *stmt, DestReceiver *dest);
+
+#endif   /* WAIT_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 8a76afe8ccb..348de76c5f4 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -488,6 +488,7 @@ typedef enum NodeTag
 	T_DropReplicationSlotCmd,
 	T_StartReplicationCmd,
 	T_TimeLineHistoryCmd,
+	T_WaitStmt,
 	T_SQLCmd,
 
 	/*
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index cd6f1be6435..306b2ef4df9 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3058,6 +3058,7 @@ typedef struct TransactionStmt
 	char	   *savepoint_name; /* for savepoint commands */
 	char	   *gid;			/* for two-phase-commit related commands */
 	bool		chain;			/* AND CHAIN option */
+	Node		*wait;			/* WAIT clause: list of events to wait for */
 } TransactionStmt;
 
 /* ----------------------
@@ -3567,4 +3568,17 @@ typedef struct DropSubscriptionStmt
 	DropBehavior behavior;		/* RESTRICT or CASCADE behavior */
 } DropSubscriptionStmt;
 
+/* ----------------------
+ *		WAIT FOR Statement + WAIT FOR clause of BEGIN statement
+ *		TODO: if we only pick one, remove the other
+ * ----------------------
+ */
+
+typedef struct WaitStmt
+{
+	NodeTag		type;
+	char	   *lsn;		/* WAIT FOR LSN */
+	uint			delay;		/* WAIT FOR TIMESTAMP or TIME */
+} WaitStmt;
+
 #endif							/* PARSENODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 08f22ce211d..6e1848fe4cc 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -243,6 +243,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
 PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
 PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
+PG_KEYWORD("lsn", LSN, UNRESERVED_KEYWORD)
 PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
 PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)
 PG_KEYWORD("materialized", MATERIALIZED, UNRESERVED_KEYWORD)
@@ -410,6 +411,7 @@ PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("then", THEN, RESERVED_KEYWORD)
 PG_KEYWORD("ties", TIES, UNRESERVED_KEYWORD)
 PG_KEYWORD("time", TIME, COL_NAME_KEYWORD)
+PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD)
 PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD)
 PG_KEYWORD("to", TO, RESERVED_KEYWORD)
 PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD)
@@ -450,6 +452,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
 PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
 PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD)
 PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
 PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
 PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)
diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h
index 8ef0f55e748..430bb5c7171 100644
--- a/src/include/tcop/cmdtaglist.h
+++ b/src/include/tcop/cmdtaglist.h
@@ -216,3 +216,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false)
 PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false)
 PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true)
 PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false)
+PG_CMDTAG(CMDTAG_WAIT, "WAIT FOR", false, false, false)
diff --git a/src/test/recovery/t/020_begin_wait.pl b/src/test/recovery/t/020_begin_wait.pl
new file mode 100644
index 00000000000..033e65458bf
--- /dev/null
+++ b/src/test/recovery/t/020_begin_wait.pl
@@ -0,0 +1,121 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 6;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+
+# And some content and take a backup
+$node_master->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = get_new_node('standby');
+my $delay        = 1;
+$node_standby->init_from_backup($node_master, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that WAIT FOR LSN works: add new content to master and memorize
+# master's new LSN, then wait for master's LSN on standby. Prove that WAIT is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "BEGIN WAIT FOR LSN '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as master's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq 0, "standby reached the same LSN as master after WAIT");
+
+
+
+# Check that timeouts work on their own and let us wait for specified time (1s)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+# While we're at it, also make sure that the syntax with commas works fine and
+# that by default we use WAIT FOR ALL strategy, which means waiting for max time
+$node_standby->safe_psql('postgres',
+	"BEGIN WAIT FOR LSN '0/FFFFFFFF' TIMEOUT $one_second");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $one_second, "WAIT FOR TIMEOUT waits for enough time");
+
+# Now, check that timeouts work as expected when waiting for LSN
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+	"BEGIN WAIT FOR LSN '$lsn2' TIMEOUT 1");
+ok($reached_lsn eq "f", "WAIT doesn't reach LSN if given too little wait time");
+
+
+#===============================================================================
+# TODO: remove this test if we remove the standalone "WAIT FOR" command
+#===============================================================================
+# We need to check that WAIT works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+
+# Get multiple LSNs for testing WAIT FOR ANY / WAIT FOR ALL
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(51, 60))");
+my $lsn5 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(61, 70000))");
+my $lsn6 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_master->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(61, 800000))");
+my $lsn7 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Check that WAIT FOR works fine
+$node_standby->safe_psql('postgres',
+	"BEGIN WAIT FOR LSN '$lsn5'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn5'::pg_lsn)");
+ok($compare_lsns ge 0,
+	"WAIT FOR ANY makes us reach at least the minimum LSN from the list");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+# TODO: Could this somehow fail due to the machine being very fast at applying LSN?
+ok($compare_lsns lt 0,
+	"WAIT FOR ANY didn't make us reach the maximum LSN from the list");
+
+# Check that WAIT FOR works fine
+$node_standby->safe_psql('postgres',
+	"BEGIN WAIT FOR LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+ok($compare_lsns eq 0,
+	"WAIT FOR makes us reach the maximum LSN");
+
+
+
+$node_standby->stop;
+$node_master->stop;

Reply via email to