I did some code cleanup and added tests - both for the standalone WAIT
FOR statement and for WAIT FOR as a part of BEGIN. The new patch is
attached.
On 2020-04-03 17:29, Alexey Kondratov wrote:
On 2020-04-01 02:26, Anna Akenteva wrote:
- WAIT FOR [ANY | ALL] event [, ...]
- BEGIN [ WORK | TRANSACTION ] [ transaction_mode [, ...] ] [ WAIT FOR
[ANY | ALL] event [, ...]]
where event is one of:
LSN value
TIMEOUT number_of_milliseconds
timestamp
Now, one event cannot contain both an LSN and a TIMEOUT.
In my understanding the whole idea of having TIMEOUT was to do
something like 'Do wait for this LSN to be replicated, but no longer
than TIMEOUT milliseconds'. What is the point of having plain TIMEOUT?
It seems to be equivalent to pg_sleep, doesn't it?
In the patch that I reviewed, you could do things like:
WAIT FOR
LSN lsn0,
LSN lsn1 TIMEOUT time1,
LSN lsn2 TIMEOUT time2;
and such a statement was in practice equivalent to
WAIT FOR LSN(max(lsn0, lsn1, lsn2)) TIMEOUT (max(time1, time2))
As you can see, even though grammatically lsn1 is grouped with time1 and
lsn2 is grouped with time2, both timeouts that we specified are not
connected to their respective LSN-s, and instead they kinda act like
global timeouts. Therefore, I didn't see a point in keeping TIMEOUT
necessarily grammatically connected to LSN.
In the new syntax our statement would look like this:
WAIT FOR LSN lsn0, LSN lsn1, LSN lsn2, TIMEOUT time1, TIMEOUT time2;
TIMEOUT-s are not forced to be grouped with LSN-s anymore, which makes
it more clear that all specified TIMEOUTs will be global and will apply
to all LSN-s at once.
The point of having TIMEOUT is still to let us limit the time of waiting
for LSNs. It's just that with the new syntax, we can also use TIMEOUT
without an LSN. You are right, such a case is equivalent to pg_sleep.
One way to avoid that is to prohibit waiting for TIMEOUT without
specifying an LSN. Do you think we should do that?
--
Anna Akenteva
Postgres Professional:
The Russian Postgres Company
http://www.postgrespro.com
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 8d91f3529e6..8697f9807ff 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -187,6 +187,7 @@ Complete list of usable sgml source files in this directory.
<!ENTITY update SYSTEM "update.sgml">
<!ENTITY vacuum SYSTEM "vacuum.sgml">
<!ENTITY values SYSTEM "values.sgml">
+<!ENTITY wait SYSTEM "wait.sgml">
<!-- applications and utilities -->
<!ENTITY clusterdb SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index c23bbfb4e71..cfee3c8f102 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,13 +21,21 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_for_event</replaceable>
<phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
READ WRITE | READ ONLY
[ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_for_event</replaceable> is:</phrase>
+ WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>and <replaceable class="parameter">event</replaceable> is one of:</phrase>
+ LSN lsn_value
+ TIMEOUT number_of_milliseconds
+ timestamp
</synopsis>
</refsynopsisdiv>
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index d6cd1d41779..0a2ea7e80be 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,13 +21,21 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_for_event</replaceable>
<phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
READ WRITE | READ ONLY
[ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_for_event</replaceable> is:</phrase>
+ WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>and <replaceable class="parameter">event</replaceable> is one of:</phrase>
+ LSN lsn_value
+ TIMEOUT number_of_milliseconds
+ timestamp
</synopsis>
</refsynopsisdiv>
diff --git a/doc/src/sgml/ref/wait.sgml b/doc/src/sgml/ref/wait.sgml
new file mode 100644
index 00000000000..26cae3ad859
--- /dev/null
+++ b/doc/src/sgml/ref/wait.sgml
@@ -0,0 +1,146 @@
+<!--
+doc/src/sgml/ref/wait.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="sql-wait">
+ <indexterm zone="sql-wait">
+ <primary>WAIT FOR</primary>
+ </indexterm>
+
+ <refmeta>
+ <refentrytitle>WAIT FOR</refentrytitle>
+ <manvolnum>7</manvolnum>
+ <refmiscinfo>SQL - Language Statements</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>WAIT FOR</refname>
+ <refpurpose>wait for the target <acronym>LSN</acronym> to be replayed or for specified time to pass</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>where <replaceable class="parameter">event</replaceable> is one of:</phrase>
+ LSN value
+ TIMEOUT number_of_milliseconds
+ timestamp
+
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>'
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>', TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR ALL LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>, TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR ANY LSN '<replaceable class="parameter">lsn_number</replaceable>', TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>Description</title>
+
+ <para>
+ <command>WAIT FOR</command> provides a simple interprocess communication
+ mechanism to wait for timestamp, timeout or the target log sequence number
+ (<acronym>LSN</acronym>) on standby in <productname>PostgreSQL</productname>
+ databases with master-standby asynchronous replication. When run with the
+ <replaceable>LSN</replaceable> option, the <command>WAIT FOR</command>
+ command waits for the specified <acronym>LSN</acronym> to be replayed.
+ If no timestamp or timeout was specified, wait time is unlimited.
+ Waiting can be interrupted using <literal>Ctrl+C</literal>, or
+ by shutting down the <literal>postgres</literal> server.
+ </para>
+
+ </refsect1>
+
+ <refsect1>
+ <title>Parameters</title>
+
+ <variablelist>
+ <varlistentry>
+ <term><replaceable class="parameter">LSN</replaceable></term>
+ <listitem>
+ <para>
+ Specify the target log sequence number to wait for.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>TIMEOUT <replaceable class="parameter">wait_timeout</replaceable></term>
+ <listitem>
+ <para>
+ Limit the time interval to wait for the LSN to be replayed.
+ The specified <replaceable>wait_timeout</replaceable> must be an integer
+ and is measured in milliseconds.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>UNTIL TIMESTAMP <replaceable class="parameter">wait_time</replaceable></term>
+ <listitem>
+ <para>
+ Limit the time to wait for the LSN to be replayed.
+ The specified <replaceable>wait_time</replaceable> must be timestamp.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </refsect1>
+
+ <refsect1>
+ <title>Examples</title>
+
+ <para>
+ Run <literal>WAIT FOR</literal> from <application>psql</application>,
+ limiting wait time to 10000 milliseconds:
+
+<screen>
+WAIT FOR LSN '0/3F07A6B1' TIMEOUT 10000;
+NOTICE: LSN is not reached. Try to increase wait time.
+LSN reached
+-------------
+ f
+(1 row)
+</screen>
+ </para>
+
+ <para>
+ Wait until the specified <acronym>LSN</acronym> is replayed:
+<screen>
+WAIT FOR LSN '0/3F07A611';
+LSN reached
+-------------
+ t
+(1 row)
+</screen>
+ </para>
+
+ <para>
+ Limit <acronym>LSN</acronym> wait time to 500000 milliseconds,
+ and then cancel the command if <acronym>LSN</acronym> was not reached:
+<screen>
+WAIT FOR LSN '0/3F0FF791' TIMEOUT 500000;
+^CCancel request sent
+NOTICE: LSN is not reached. Try to increase wait time.
+ERROR: canceling statement due to user request
+ LSN reached
+-------------
+ f
+(1 row)
+</screen>
+</para>
+ </refsect1>
+
+ <refsect1>
+ <title>Compatibility</title>
+
+ <para>
+ There is no <command>WAIT FOR</command> statement in the SQL
+ standard.
+ </para>
+ </refsect1>
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index cef09dd38b3..588e96aa143 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -215,6 +215,7 @@
&update;
&vacuum;
&values;
+ &wait;
</reference>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 977d448f502..c0e2c2141a8 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -42,6 +42,7 @@
#include "catalog/pg_database.h"
#include "commands/progress.h"
#include "commands/tablespace.h"
+#include "commands/wait.h"
#include "common/controldata_utils.h"
#include "miscadmin.h"
#include "pg_trace.h"
@@ -7376,6 +7377,15 @@ StartupXLOG(void)
break;
}
+ /*
+ * If we replayed an LSN that someone was waiting for,
+ * set latches in shared memory array to notify the waiter.
+ */
+ if (XLogCtl->lastReplayedEndRecPtr >= GetMinWait())
+ {
+ WaitSetLatch(XLogCtl->lastReplayedEndRecPtr);
+ }
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogreader, LOG, false);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index d4815d3ce65..9b310926c12 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -57,6 +57,7 @@ OBJS = \
user.o \
vacuum.o \
variable.o \
- view.o
+ view.o \
+ wait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 00000000000..01a90a12c8a
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,402 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ * Implements WAIT FOR, which allows waiting for events such as
+ * time passing or LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ * src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <float.h>
+#include <math.h>
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlogdefs.h"
+#include "access/xlog.h"
+#include "catalog/pg_type.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/backendid.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void AddEvent(XLogRecPtr lsn_to_wait);
+static void DeleteEvent(void);
+
+/* Shared memory structure */
+typedef struct
+{
+ int backend_maxid;
+ XLogRecPtr min_lsn;
+ slock_t mutex;
+ XLogRecPtr waited_lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static volatile WaitState *state;
+
+/* Add the event of the current backend to the shared memory array */
+static void
+AddEvent(XLogRecPtr lsn_to_wait)
+{
+ SpinLockAcquire(&state->mutex);
+ if (state->backend_maxid < MyBackendId)
+ state->backend_maxid = MyBackendId;
+
+ state->waited_lsn[MyBackendId] = lsn_to_wait;
+
+ if (lsn_to_wait < state->min_lsn)
+ state->min_lsn = lsn_to_wait;
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete event of the current backend from the shared memory array.
+ *
+ * TODO: Consider state cleanup on backend failure.
+ */
+static void
+DeleteEvent(void)
+{
+ int i;
+ XLogRecPtr lsn_to_delete = state->waited_lsn[MyBackendId];
+
+ state->waited_lsn[MyBackendId] = InvalidXLogRecPtr;
+
+ SpinLockAcquire(&state->mutex);
+
+ /* If we need to choose the next min_lsn, update state->min_lsn */
+ if (state->min_lsn == lsn_to_delete)
+ {
+ state->min_lsn = PG_UINT64_MAX;
+ for (i = 2; i <= state->backend_maxid; i++)
+ if (state->waited_lsn[i] != InvalidXLogRecPtr &&
+ state->waited_lsn[i] < state->min_lsn)
+ state->min_lsn = state->waited_lsn[i];
+ }
+
+ if (state->backend_maxid == MyBackendId)
+ for (i = (MyBackendId); i >= 2; i--)
+ if (state->waited_lsn[i] != InvalidXLogRecPtr)
+ {
+ state->backend_maxid = i;
+ break;
+ }
+
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(WaitState, waited_lsn);
+ size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+ return size;
+}
+
+/* Init array of events in shared memory */
+void
+WaitShmemInit(void)
+{
+ bool found;
+ uint32 i;
+
+ state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+ WaitShmemSize(),
+ &found);
+ if (!found)
+ {
+ SpinLockInit(&state->mutex);
+
+ for (i = 0; i < (MaxBackends + 1); i++)
+ state->waited_lsn[i] = InvalidXLogRecPtr;
+
+ state->backend_maxid = 0;
+ state->min_lsn = PG_UINT64_MAX;
+ }
+}
+
+/* Set all latches in shared memory to signal that new LSN has been replayed */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+ uint32 i;
+ int backend_maxid;
+ PGPROC *backend;
+
+ SpinLockAcquire(&state->mutex);
+ backend_maxid = state->backend_maxid;
+ SpinLockRelease(&state->mutex);
+
+ for (i = 2; i <= backend_maxid; i++)
+ {
+ backend = BackendIdGetProc(i);
+ if (state->waited_lsn[i] != 0)
+ {
+ if (backend && state->waited_lsn[i] <= cur_lsn)
+ SetLatch(&backend->procLatch);
+ }
+ }
+}
+
+/* Get minimal LSN that will be next */
+XLogRecPtr
+GetMinWait(void)
+{
+ return state->min_lsn;
+}
+
+/*
+ * On WAIT use MyLatch to wait till LSN is replayed,
+ * postmaster dies or timeout happens.
+ */
+int
+WaitUtility(XLogRecPtr lsn, const float8 secs, DestReceiver *dest)
+{
+ XLogRecPtr cur_lsn = GetXLogReplayRecPtr(NULL);
+ int latch_events;
+ float8 endtime;
+ TupOutputState *tstate;
+ TupleDesc tupdesc;
+ char *value = "f";
+
+ latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+ if (lsn != InvalidXLogRecPtr)
+ AddEvent(lsn);
+
+#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
+ endtime = GetNowFloat() + secs;
+
+ for (;;)
+ {
+ int rc;
+ float8 delay = 0;
+ long delay_ms;
+
+ if (secs > 0)
+ delay = endtime - GetNowFloat();
+ else if (secs == 0) /* 1 minute timeout to check for Interupts */
+ delay = 10;
+ else
+ delay = 1;
+
+ if (delay > 0.0)
+ delay_ms = (long) ceil(delay * 1000.0);
+ else
+ break;
+
+ /*
+ * If received an interruption from CHECK_FOR_INTERRUPTS,
+ * then delete the current event from array.
+ */
+ if (InterruptPending)
+ {
+ if (lsn != InvalidXLogRecPtr)
+ DeleteEvent();
+ ProcessInterrupts();
+ }
+
+ if (lsn != InvalidXLogRecPtr && rc & WL_LATCH_SET)
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+
+ /* If LSN has been replayed */
+ if (lsn != InvalidXLogRecPtr && lsn <= cur_lsn)
+ break;
+
+ /* If postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ /* A little hack similar to SnapshotResetXmin to work out of snapshot */
+ MyPgXact->xmin = InvalidTransactionId;
+ rc = WaitLatch(MyLatch, latch_events, delay_ms,
+ WAIT_EVENT_CLIENT_READ);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+ }
+
+ DeleteEvent();
+
+ if (lsn != InvalidXLogRecPtr && lsn > cur_lsn)
+ elog(NOTICE,"LSN is not reached. Try to increase wait time.");
+ else
+ value = "t";
+
+ /* Need a tuple descriptor representing a single TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+ /* Prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+ /* Send it */
+ do_text_output_oneline(tstate, value);
+ end_tup_output(tstate);
+ return strcmp(value,"t");
+}
+
+void
+WaitTimeUtility(float8 delay)
+{
+ int latch_events;
+
+ if (delay <= 0)
+ return;
+
+ latch_events = WL_TIMEOUT | WL_POSTMASTER_DEATH;
+
+ MyPgXact->xmin = InvalidTransactionId;
+ WaitLatch(MyLatch, latch_events, (long) ceil(delay * 1000.0),
+ WAIT_EVENT_CLIENT_READ);
+ ResetLatch(MyLatch);
+}
+
+/*
+ * Get the amount of seconds left till the specified time.
+ */
+float8
+WaitTimeResolve(Const *time)
+{
+ int ret;
+ float8 val;
+ Oid types[] = { time->consttype };
+ Datum values[] = { time->constvalue };
+ char nulls[] = { " " };
+ Datum result;
+ bool isnull;
+
+ SPI_connect();
+
+ if (time->consttype == 1083)
+ ret = SPI_execute_with_args("select extract (epoch from ($1 - now()::time))",
+ 1, types, values, nulls, true, 0);
+ else if (time->consttype == 1266)
+ ret = SPI_execute_with_args("select extract (epoch from (timezone('UTC',$1)::time - timezone('UTC', now()::timetz)::time))",
+ 1, types, values, nulls, true, 0);
+ else
+ ret = SPI_execute_with_args("select extract (epoch from ($1 - now()))",
+ 1, types, values, nulls, true, 0);
+
+ Assert(ret >= 0);
+ result = SPI_getbinval(SPI_tuptable->vals[0],
+ SPI_tuptable->tupdesc,
+ 1, &isnull);
+
+ Assert(!isnull);
+ val = DatumGetFloat8(result);
+
+ elog(INFO, "time: %f", val);
+
+ SPI_finish();
+ return val;
+}
+
+/* Implementation of WAIT FOR */
+int
+WaitMain(WaitStmt *stmt, DestReceiver *dest)
+{
+ ListCell *events;
+ float8 delay = 0;
+ float8 final_delay = 0;
+ XLogRecPtr lsn = InvalidXLogRecPtr;
+ XLogRecPtr final_lsn = InvalidXLogRecPtr;
+ bool has_lsn = false;
+ bool wait_forever = true;
+ int res = 1;
+
+ if (stmt->strategy == WAIT_FOR_ANY)
+ {
+ /* Prepare to find minimum LSN and delay */
+ final_delay = DBL_MAX;
+ final_lsn = PG_UINT64_MAX;
+ }
+
+ /* Extract options from the statement node tree */
+ foreach(events, stmt->events)
+ {
+ WaitStmt *event = (WaitStmt *) lfirst(events);
+
+ /* LSN to wait for */
+ if (event->lsn)
+ {
+ has_lsn = true;
+ lsn = DatumGetLSN(
+ DirectFunctionCall1(pg_lsn_in,
+ CStringGetDatum(event->lsn)));
+
+ /*
+ * When waiting for ALL, select max LSN to wait for.
+ * When waiting for ANY, select min LSN to wait for.
+ */
+ if ((stmt->strategy == WAIT_FOR_ALL && final_lsn <= lsn) ||
+ (stmt->strategy == WAIT_FOR_ANY && final_lsn > lsn))
+ {
+ final_lsn = lsn;
+ }
+ }
+
+ /* Time delay to wait for */
+ if (event->time || event->delay)
+ {
+ wait_forever = false;
+
+ if (event->delay)
+ delay = event->delay / 1000.0;
+
+ if (event->time)
+ {
+ Const *time = (Const *) event->time;
+ delay = WaitTimeResolve(time);
+ }
+
+ if (delay < 0)
+ delay = 0;
+
+ /*
+ * When waiting for ALL, select max delay to wait for.
+ * When waiting for ANY, select min delay to wait for.
+ */
+ if ((stmt->strategy == WAIT_FOR_ALL && final_delay <= delay) ||
+ (stmt->strategy == WAIT_FOR_ANY && final_delay > delay))
+ {
+ final_delay = delay;
+ }
+ }
+ }
+
+ if (!has_lsn)
+ {
+ WaitTimeUtility(final_delay);
+ res = 0;
+ }
+ else
+ res = WaitUtility(final_lsn, wait_forever ? 0 : final_delay, dest);
+
+ return res;
+}
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index eb168ffd6da..830bdbb6ab0 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2760,6 +2760,18 @@ _outDefElem(StringInfo str, const DefElem *node)
WRITE_LOCATION_FIELD(location);
}
+static void
+_outWaitStmt(StringInfo str, const WaitStmt *node)
+{
+ WRITE_NODE_TYPE("WAITSTMT");
+
+ WRITE_STRING_FIELD(lsn);
+ WRITE_INT_FIELD(delay);
+ WRITE_NODE_FIELD(events);
+ WRITE_NODE_FIELD(time);
+ WRITE_ENUM_FIELD(strategy, WaitForStrategy);
+}
+
static void
_outTableLikeClause(StringInfo str, const TableLikeClause *node)
{
@@ -4306,6 +4318,9 @@ outNode(StringInfo str, const void *obj)
case T_PartitionRangeDatum:
_outPartitionRangeDatum(str, obj);
break;
+ case T_WaitStmt:
+ _outWaitStmt(str, obj);
+ break;
default:
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 6676412842b..08e2649b9df 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -78,6 +78,7 @@ static Query *transformCreateTableAsStmt(ParseState *pstate,
CreateTableAsStmt *stmt);
static Query *transformCallStmt(ParseState *pstate,
CallStmt *stmt);
+static void transformWaitForStmt(ParseState *pstate, WaitStmt *stmt);
static void transformLockingClause(ParseState *pstate, Query *qry,
LockingClause *lc, bool pushedDown);
#ifdef RAW_EXPRESSION_COVERAGE_TEST
@@ -326,7 +327,20 @@ transformStmt(ParseState *pstate, Node *parseTree)
result = transformCallStmt(pstate,
(CallStmt *) parseTree);
break;
-
+ case T_WaitStmt:
+ transformWaitForStmt(pstate, (WaitStmt *) parseTree);
+ result = makeNode(Query);
+ result->commandType = CMD_UTILITY;
+ result->utilityStmt = (Node *) parseTree;
+ break;
+ case T_TransactionStmt:
+ {
+ TransactionStmt *stmt = (TransactionStmt *) parseTree;
+ if ((stmt->kind == TRANS_STMT_BEGIN ||
+ stmt->kind == TRANS_STMT_START) && stmt->wait)
+ transformWaitForStmt(pstate, (WaitStmt *) stmt->wait);
+ }
+ /* no break here - we want to fall through to the default */
default:
/*
@@ -2981,6 +2995,23 @@ applyLockingClause(Query *qry, Index rtindex,
qry->rowMarks = lappend(qry->rowMarks, rc);
}
+/*
+ * transformWaitForStmt -
+ * transform the WAIT FOR clause of the BEGIN statement
+ * transform the WAIT FOR statement (TODO: remove this line if we don't keep it)
+ */
+static void
+transformWaitForStmt(ParseState *pstate, WaitStmt *stmt)
+{
+ ListCell *events;
+
+ foreach(events, stmt->events)
+ {
+ WaitStmt *event = (WaitStmt *) lfirst(events);
+ event->time = transformExpr(pstate, event->time, EXPR_KIND_OTHER);
+ }
+}
+
/*
* Coverage testing for raw_expression_tree_walker().
*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index eb0bf12cd8b..4ce315f95d9 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -276,7 +276,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SecLabelStmt SelectStmt TransactionStmt TruncateStmt
UnlistenStmt UpdateStmt VacuumStmt
VariableResetStmt VariableSetStmt VariableShowStmt
- ViewStmt CheckPointStmt CreateConversionStmt
+ ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
DeallocateStmt PrepareStmt ExecuteStmt
DropOwnedStmt ReassignOwnedStmt
AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -487,7 +487,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <list> row explicit_row implicit_row type_list array_expr_list
%type <node> case_expr case_arg when_clause case_default
%type <list> when_clause_list
-%type <ival> sub_type opt_materialized
+%type <ival> sub_type wait_strategy opt_materialized
%type <value> NumericOnly
%type <list> NumericOnly_list
%type <alias> alias_clause opt_alias_clause
@@ -591,6 +591,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <partboundspec> PartitionBoundSpec
%type <list> hash_partbound
%type <defelt> hash_partbound_elem
+%type <list> wait_list
+%type <node> WaitEvent wait_for
/*
* Non-keyword token types. These are hard-wired into the "flex" lexer.
@@ -660,7 +662,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
- LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+ LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN
MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
@@ -690,7 +692,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P
TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
- TIES TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
+ TIES TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
TREAT TRIGGER TRIM TRUE_P
TRUNCATE TRUSTED TYPE_P TYPES_P
@@ -700,7 +702,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
VERBOSE VERSION_P VIEW VIEWS VOLATILE
- WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+ WAIT WHEN WHERE WHITESPACE_P WINDOW
+ WITH WITHIN WITHOUT WORK WRAPPER WRITE
XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -954,6 +957,7 @@ stmt :
| VariableSetStmt
| VariableShowStmt
| ViewStmt
+ | WaitStmt
| /*EMPTY*/
{ $$ = NULL; }
;
@@ -9940,18 +9944,20 @@ TransactionStmt:
n->chain = $3;
$$ = (Node *)n;
}
- | BEGIN_P opt_transaction transaction_mode_list_or_empty
+ | BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_BEGIN;
n->options = $3;
+ n->wait = $4;
$$ = (Node *)n;
}
- | START TRANSACTION transaction_mode_list_or_empty
+ | START TRANSACTION transaction_mode_list_or_empty wait_for
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_START;
n->options = $3;
+ n->wait = $4;
$$ = (Node *)n;
}
| COMMIT opt_transaction opt_transaction_chain
@@ -14157,6 +14163,74 @@ xml_passing_mech:
| BY VALUE_P
;
+/*****************************************************************************
+ *
+ * QUERY:
+ * WAIT FOR <event> [, <event> ...]
+ * event is one of:
+ * LSN value
+ * TIMEOUT delay
+ * timestamp
+ *
+ *****************************************************************************/
+WaitStmt:
+ WAIT FOR wait_strategy wait_list
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->strategy = $3;
+ n->events = $4;
+ $$ = (Node *)n;
+ }
+ ;
+wait_for:
+ WAIT FOR wait_strategy wait_list
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->strategy = $3;
+ n->events = $4;
+ $$ = (Node *)n;
+ }
+ | /* EMPTY */ { $$ = NULL; };
+
+wait_strategy:
+ ALL { $$ = WAIT_FOR_ALL; }
+ | ANY { $$ = WAIT_FOR_ANY; }
+ | /* EMPTY */ { $$ = WAIT_FOR_ALL; }
+ ;
+
+wait_list:
+ WaitEvent { $$ = list_make1($1); }
+ | wait_list ',' WaitEvent { $$ = lappend($1, $3); }
+ | wait_list WaitEvent { $$ = lappend($1, $2); }
+ ;
+
+WaitEvent:
+ LSN Sconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn = $2;
+ n->delay = 0;
+ n->time = NULL;
+ $$ = (Node *)n;
+ }
+ | TIMEOUT Iconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn = NULL;
+ n->delay = $2;
+ n->time = NULL;
+ $$ = (Node *)n;
+ }
+ | ConstDatetime Sconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn = NULL;
+ n->delay = 0;
+ n->time = makeStringConstCast($2, @2, $1);
+ $$ = (Node *)n;
+ }
+ ;
+
/*
* Aggregate decoration clauses
@@ -15301,6 +15375,7 @@ unreserved_keyword:
| LOCK_P
| LOCKED
| LOGGED
+ | LSN
| MAPPING
| MATCH
| MATERIALIZED
@@ -15423,6 +15498,7 @@ unreserved_keyword:
| TEMPORARY
| TEXT_P
| TIES
+ | TIMEOUT
| TRANSACTION
| TRANSFORM
| TRIGGER
@@ -15449,6 +15525,7 @@ unreserved_keyword:
| VIEW
| VIEWS
| VOLATILE
+ | WAIT
| WHITESPACE_P
| WITHIN
| WITHOUT
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 427b0d59cde..bb8af349808 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
#include "access/subtrans.h"
#include "access/twophase.h"
#include "commands/async.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -147,6 +148,7 @@ CreateSharedMemoryAndSemaphores(void)
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
+ size = add_size(size, WaitShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -264,6 +266,11 @@ CreateSharedMemoryAndSemaphores(void)
SyncScanShmemInit();
AsyncShmemInit();
+ /*
+ * Init array of Latches in shared memory for WAIT
+ */
+ WaitShmemInit();
+
#ifdef EXEC_BACKEND
/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index b1f7f6e2d01..ad85f106040 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -15,6 +15,7 @@
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include <float.h>
#include "access/htup_details.h"
#include "access/reloptions.h"
@@ -57,6 +58,7 @@
#include "commands/user.h"
#include "commands/vacuum.h"
#include "commands/view.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
@@ -70,6 +72,9 @@
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+#include "utils/pg_lsn.h"
/* Hook for plugins to get control in ProcessUtility() */
ProcessUtility_hook_type ProcessUtility_hook = NULL;
@@ -268,6 +273,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
case T_LoadStmt:
case T_PrepareStmt:
case T_UnlistenStmt:
+ case T_WaitStmt:
case T_VariableSetStmt:
{
/*
@@ -591,6 +597,11 @@ standard_ProcessUtility(PlannedStmt *pstmt,
case TRANS_STMT_START:
{
ListCell *lc;
+ WaitStmt *waitstmt = (WaitStmt *) stmt->wait;
+
+ /* If needed to WAIT FOR something but failed */
+ if (stmt->wait && WaitMain(waitstmt, dest) != 0)
+ break;
BeginTransactionBlock();
foreach(lc, stmt->options)
@@ -1062,6 +1073,13 @@ standard_ProcessUtility(PlannedStmt *pstmt,
break;
}
+ case T_WaitStmt:
+ {
+ WaitStmt *stmt = (WaitStmt *) parsetree;
+ WaitMain(stmt, dest);
+ break;
+ }
+
default:
/* All other statement types have event trigger support */
ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2718,6 +2736,10 @@ CreateCommandTag(Node *parsetree)
tag = CMDTAG_NOTIFY;
break;
+ case T_WaitStmt:
+ tag = CMDTAG_WAIT;
+ break;
+
case T_ListenStmt:
tag = CMDTAG_LISTEN;
break;
@@ -3357,6 +3379,10 @@ GetCommandLogLevel(Node *parsetree)
lev = LOGSTMT_ALL;
break;
+ case T_WaitStmt:
+ lev = LOGSTMT_ALL;
+ break;
+
case T_ListenStmt:
lev = LOGSTMT_ALL;
break;
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 00000000000..11115b9dab6
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,27 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ * prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2016, Regents of PostgresPRO
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+
+extern int WaitUtility(XLogRecPtr lsn, const float8 delay, DestReceiver *dest);
+extern void WaitTimeUtility(float8 delay);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWait(void);
+extern float8 WaitTimeResolve(Const *time);
+extern int WaitMain(WaitStmt *stmt, DestReceiver *dest);
+
+#endif /* WAIT_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 8a76afe8ccb..348de76c5f4 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -488,6 +488,7 @@ typedef enum NodeTag
T_DropReplicationSlotCmd,
T_StartReplicationCmd,
T_TimeLineHistoryCmd,
+ T_WaitStmt,
T_SQLCmd,
/*
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 77943f06376..971f343cf7a 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3055,6 +3055,7 @@ typedef struct TransactionStmt
char *savepoint_name; /* for savepoint commands */
char *gid; /* for two-phase-commit related commands */
bool chain; /* AND CHAIN option */
+ Node *wait; /* WAIT clause: list of events to wait for */
} TransactionStmt;
/* ----------------------
@@ -3564,4 +3565,26 @@ typedef struct DropSubscriptionStmt
DropBehavior behavior; /* RESTRICT or CASCADE behavior */
} DropSubscriptionStmt;
+/* ----------------------
+ * WAIT FOR Statement + WAIT FOR clause of BEGIN statement
+ * TODO: if we only pick one, remove the other
+ * ----------------------
+ */
+
+typedef enum WaitForStrategy
+{
+ WAIT_FOR_ANY = 0,
+ WAIT_FOR_ALL
+} WaitForStrategy;
+
+typedef struct WaitStmt
+{
+ NodeTag type;
+ WaitForStrategy strategy;
+ List *events; /* used as a pointer to the next WAIT event */
+ char *lsn; /* WAIT FOR LSN */
+ int delay; /* WAIT FOR TIMEOUT */
+ Node *time; /* WAIT FOR TIMESTAMP or TIME */
+} WaitStmt;
+
#endif /* PARSENODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index b1184c2d158..dd22e358b9a 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -243,6 +243,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
+PG_KEYWORD("lsn", LSN, UNRESERVED_KEYWORD)
PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)
PG_KEYWORD("materialized", MATERIALIZED, UNRESERVED_KEYWORD)
@@ -404,6 +405,7 @@ PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD)
PG_KEYWORD("then", THEN, RESERVED_KEYWORD)
PG_KEYWORD("ties", TIES, UNRESERVED_KEYWORD)
PG_KEYWORD("time", TIME, COL_NAME_KEYWORD)
+PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD)
PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD)
PG_KEYWORD("to", TO, RESERVED_KEYWORD)
PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD)
@@ -444,6 +446,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD)
PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)
diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h
index 8ef0f55e748..430bb5c7171 100644
--- a/src/include/tcop/cmdtaglist.h
+++ b/src/include/tcop/cmdtaglist.h
@@ -216,3 +216,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false)
PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false)
PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true)
PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false)
+PG_CMDTAG(CMDTAG_WAIT, "WAIT FOR", false, false, false)
diff --git a/src/test/recovery/t/020_begin_wait.pl b/src/test/recovery/t/020_begin_wait.pl
new file mode 100644
index 00000000000..9bec57ee8f4
--- /dev/null
+++ b/src/test/recovery/t/020_begin_wait.pl
@@ -0,0 +1,145 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 8;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+
+# And some content and take a backup
+$node_master->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = get_new_node('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_master, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that WAIT FOR LSN works: add new content to master and memorize
+# master's new LSN, then wait for master's LSN on standby. Prove that WAIT is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_master->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "BEGIN WAIT FOR LSN '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as master's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq 0, "standby reached the same LSN as master after WAIT");
+
+
+
+# Check that timeouts work on their own and let us wait for specified time (1s)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+# While we're at it, also make sure that the syntax with commas works fine and
+# that by default we use WAIT FOR ALL strategy, which means waiting for max time
+$node_standby->safe_psql('postgres',
+ "WAIT FOR TIMEOUT $one_second, TIMESTAMP '$current_time'");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $one_second, "WAIT FOR TIMEOUT waits for enough time");
+
+# Now, check that timeouts work as expected when waiting for LSN
+$node_master->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+ "BEGIN WAIT FOR LSN '$lsn2' TIMEOUT 1");
+ok($reached_lsn eq "f", "WAIT doesn't reach LSN if given too little wait time");
+
+
+#===============================================================================
+# TODO: remove this test if we remove the standalone "WAIT FOR" command
+#===============================================================================
+# We need to check that WAIT works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_master->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_master->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, wait for LSN which ensures a max value of 40.
+# Inside the transaction, wait for LSN that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+ 'postgres', qq[
+ BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ WAIT FOR LSN '$lsn3';
+ SELECT max(a) FROM wait_test;
+ BEGIN WAIT FOR LSN '$lsn4';
+ SELECT pg_last_wal_replay_lsn();
+ SELECT max(a) FROM wait_test;
+ COMMIT;
+]);
+
+# Make sure that we indeed reach master's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "WAIT FOR LSN works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after WAIT.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to WAIT");
+
+
+
+# Get multiple LSNs for testing WAIT FOR ANY / WAIT FOR ALL
+$node_master->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(51, 60))");
+my $lsn5 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_master->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(61, 70000))");
+my $lsn6 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_master->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(61, 800000))");
+my $lsn7 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Check that WAIT FOR ANY works fine
+$node_standby->safe_psql('postgres',
+ "BEGIN WAIT FOR ANY LSN '$lsn5' LSN '$lsn6' LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn5'::pg_lsn)");
+ok($compare_lsns ge 0,
+ "WAIT FOR ANY makes us reach at least the minimum LSN from the list");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+# TODO: Could this somehow fail due to the machine being very fast at applying LSN?
+ok($compare_lsns lt 0,
+ "WAIT FOR ANY didn't make us reach the maximum LSN from the list");
+
+# Check that WAIT FOR ALL works fine
+$node_standby->safe_psql('postgres',
+ "BEGIN WAIT FOR ALL LSN '$lsn5', LSN '$lsn6', LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+ok($compare_lsns eq 0,
+ "WAIT FOR ALL makes us reach the maximum LSN from the list");
+
+
+
+$node_standby->stop;
+$node_master->stop;
diff --git a/src/test/recovery/t/021_wait.pl b/src/test/recovery/t/021_wait.pl
new file mode 100644
index 00000000000..c270e785740
--- /dev/null
+++ b/src/test/recovery/t/021_wait.pl
@@ -0,0 +1,144 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 8;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+
+# And some content and take a backup
+$node_master->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = get_new_node('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_master, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that WAIT FOR LSN works: add new content to master and memorize
+# master's new LSN, then wait for master's LSN on standby. Prove that WAIT is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_master->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "WAIT FOR LSN '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as master's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq 0, "standby reached the same LSN as master after WAIT");
+
+
+
+# Check that timeouts work on their own and let us wait for specified time (1s)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+# While we're at it, also make sure that the syntax with commas works fine and
+# that by default we use WAIT FOR ALL strategy, which means waiting for max time
+$node_standby->safe_psql('postgres',
+ "WAIT FOR TIMEOUT $one_second, TIMESTAMP '$current_time'");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $one_second, "WAIT FOR TIMEOUT waits for enough time");
+
+# Now, check that timeouts work as expected when waiting for LSN
+$node_master->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+ "WAIT FOR LSN '$lsn2' TIMEOUT 1");
+ok($reached_lsn eq "f", "WAIT doesn't reach LSN if given too little wait time");
+
+
+
+# We need to check that WAIT works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_master->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_master->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, wait for LSN which ensures a max value of 40.
+# Inside the transaction, wait for LSN that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '$lsn3';
+ BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
+ SELECT max(a) FROM wait_test;
+ WAIT FOR LSN '$lsn4';
+ SELECT pg_last_wal_replay_lsn();
+ SELECT max(a) FROM wait_test;
+ COMMIT;
+]);
+
+# Make sure that we indeed reach master's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "WAIT FOR LSN works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after WAIT.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to WAIT");
+
+
+
+# Get multiple LSNs for testing WAIT FOR ANY / WAIT FOR ALL
+$node_master->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(51, 60))");
+my $lsn5 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_master->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(61, 70000))");
+my $lsn6 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_master->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(61, 800000))");
+my $lsn7 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Check that WAIT FOR ANY works fine
+$node_standby->safe_psql('postgres',
+ "WAIT FOR ANY LSN '$lsn5' LSN '$lsn6' LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn5'::pg_lsn)");
+ok($compare_lsns ge 0,
+ "WAIT FOR ANY makes us reach at least the minimum LSN from the list");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+# TODO: Could this somehow fail due to the machine being very fast at applying LSN?
+ok($compare_lsns lt 0,
+ "WAIT FOR ANY didn't make us reach the maximum LSN from the list");
+
+# Check that WAIT FOR ALL works fine
+$node_standby->safe_psql('postgres',
+ "WAIT FOR ALL LSN '$lsn5', LSN '$lsn6', LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+ok($compare_lsns eq 0,
+ "WAIT FOR ALL makes us reach the maximum LSN from the list");
+
+
+
+$node_standby->stop;
+$node_master->stop;