On 2020-03-27 04:15, Kartyshov Ivan wrote:
Anna, feel free to work on this patch.
Ivan and I worked on this patch a bit more. We fixed the bugs that we
could find and cleaned up the code. For now, we've kept both options:
WAIT as a standalone statement and WAIT as a part of BEGIN. The new
patch is attached.
The syntax looks a bit different now:
- WAIT FOR [ANY | ALL] event [, ...]
- BEGIN [ WORK | TRANSACTION ] [ transaction_mode [, ...] ] [ WAIT FOR
[ANY | ALL] event [, ...]]
where event is one of:
LSN value
TIMEOUT number_of_milliseconds
timestamp
Now, one event cannot contain both an LSN and a TIMEOUT. With such
syntax, the behaviour seems to make sense. For the (default) WAIT FOR
ALL strategy, we pick the maximum LSN and maximum allowed timeout, and
wait for the LSN till the timeout is over. If no timeout is specified,
we wait forever. If no LSN is specified, we just wait for the time to
pass. For the WAIT FOR ANY strategy, it's the same but we pick minimum
LSN and timeout.
There are still some questions left:
1) Should we only keep the BEGIN option, or does the standalone command
have potential after all?
2) Should we change the grammar so that WAIT can be in any position of
the BEGIN statement, not necessarily in the end? Ivan and I haven't come
to a consensus about this, so more opinions would be helpful.
3) Since we added the "wait" attribute to TransactionStmt struct, do we
need to add something to _copyTransactionStmt() /
_equalTransactionStmt()?
--
Anna Akenteva
Postgres Professional:
The Russian Postgres Company
http://www.postgrespro.com
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 8d91f3529e6..8697f9807ff 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -187,6 +187,7 @@ Complete list of usable sgml source files in this directory.
<!ENTITY update SYSTEM "update.sgml">
<!ENTITY vacuum SYSTEM "vacuum.sgml">
<!ENTITY values SYSTEM "values.sgml">
+<!ENTITY wait SYSTEM "wait.sgml">
<!-- applications and utilities -->
<!ENTITY clusterdb SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index c23bbfb4e71..558ff668dbd 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,13 +21,21 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_for_event</replaceable>
<phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
READ WRITE | READ ONLY
[ NOT ] DEFERRABLE
+
+<phrase>and <replaceable class="parameter">wait_for_event</replaceable> is:</phrase>
+ WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, <replaceable class="parameter">event</replaceable> ...]
+
+<phrase>where <replaceable class="parameter">event</replaceable> is one of:</phrase>
+ LSN lsn_value
+ TIMEOUT number_of_milliseconds
+ timestamp
</synopsis>
</refsynopsisdiv>
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index d6cd1d41779..e5e2e15cdf0 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,13 +21,21 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_for_event</replaceable>
<phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
READ WRITE | READ ONLY
[ NOT ] DEFERRABLE
+
+<phrase>and <replaceable class="parameter">wait_for_event</replaceable> is:</phrase>
+ WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, <replaceable class="parameter">event</replaceable> ...]
+
+<phrase>where <replaceable class="parameter">event</replaceable> is one of:</phrase>
+ LSN lsn_value
+ TIMEOUT number_of_milliseconds
+ timestamp
</synopsis>
</refsynopsisdiv>
diff --git a/doc/src/sgml/ref/wait.sgml b/doc/src/sgml/ref/wait.sgml
new file mode 100644
index 00000000000..0e5bd2523d6
--- /dev/null
+++ b/doc/src/sgml/ref/wait.sgml
@@ -0,0 +1,146 @@
+<!--
+doc/src/sgml/ref/waitlsn.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="sql-waitlsn">
+ <indexterm zone="sql-waitlsn">
+ <primary>WAIT FOR</primary>
+ </indexterm>
+
+ <refmeta>
+ <refentrytitle>WAIT FOR</refentrytitle>
+ <manvolnum>7</manvolnum>
+ <refmiscinfo>SQL - Language Statements</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>WAIT FOR</refname>
+ <refpurpose>wait for the target <acronym>LSN</acronym> to be replayed</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, <replaceable class="parameter">event</replaceable> ...]
+
+<phrase>where <replaceable class="parameter">event</replaceable> is one of:</phrase>
+ LSN value
+ TIMEOUT number_of_milliseconds
+ timestamp
+
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>'
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>', TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR ALL LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>, TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR ANY LSN '<replaceable class="parameter">lsn_number</replaceable>', TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>Description</title>
+
+ <para>
+ <command>WAIT FOR</command> provides a simple interprocess communication
+ mechanism to wait for timestamp, timeout or the target log sequence number
+ (<acronym>LSN</acronym>) on standby in <productname>PostgreSQL</productname>
+ databases with master-standby asynchronous replication. When run with the
+ <replaceable>LSN</replaceable> option, the <command>WAIT FOR</command>
+ command waits for the specified <acronym>LSN</acronym> to be replayed.
+ If no timestamp or timeout was specified, wait time is unlimited.
+ Waiting can be interrupted using <literal>Ctrl+C</literal>, or
+ by shutting down the <literal>postgres</literal> server.
+ </para>
+
+ </refsect1>
+
+ <refsect1>
+ <title>Parameters</title>
+
+ <variablelist>
+ <varlistentry>
+ <term><replaceable class="parameter">LSN</replaceable></term>
+ <listitem>
+ <para>
+ Specify the target log sequence number to wait for.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>TIMEOUT <replaceable class="parameter">wait_timeout</replaceable></term>
+ <listitem>
+ <para>
+ Limit the time interval to wait for the LSN to be replayed.
+ The specified <replaceable>wait_timeout</replaceable> must be an integer
+ and is measured in milliseconds.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>UNTIL TIMESTAMP <replaceable class="parameter">wait_time</replaceable></term>
+ <listitem>
+ <para>
+ Limit the time to wait for the LSN to be replayed.
+ The specified <replaceable>wait_time</replaceable> must be timestamp.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </refsect1>
+
+ <refsect1>
+ <title>Examples</title>
+
+ <para>
+ Run <literal>WAIT FOR</literal> from <application>psql</application>,
+ limiting wait time to 10000 milliseconds:
+
+<screen>
+WAIT FOR LSN '0/3F07A6B1' TIMEOUT 10000;
+NOTICE: LSN is not reached. Try to increase wait time.
+LSN reached
+-------------
+ f
+(1 row)
+</screen>
+ </para>
+
+ <para>
+ Wait until the specified <acronym>LSN</acronym> is replayed:
+<screen>
+WAIT FOR LSN '0/3F07A611';
+LSN reached
+-------------
+ t
+(1 row)
+</screen>
+ </para>
+
+ <para>
+ Limit <acronym>LSN</acronym> wait time to 500000 milliseconds,
+ and then cancel the command if <acronym>LSN</acronym> was not reached:
+<screen>
+WAIT FOR LSN '0/3F0FF791' TIMEOUT 500000;
+^CCancel request sent
+NOTICE: LSN is not reached. Try to increase wait time.
+ERROR: canceling statement due to user request
+ LSN reached
+-------------
+ f
+(1 row)
+</screen>
+</para>
+ </refsect1>
+
+ <refsect1>
+ <title>Compatibility</title>
+
+ <para>
+ There is no <command>WAIT FOR</command> statement in the SQL
+ standard.
+ </para>
+ </refsect1>
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index cef09dd38b3..588e96aa143 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -215,6 +215,7 @@
&update;
&vacuum;
&values;
+ &wait;
</reference>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8fe92962b0d..b8e884ccc4f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -41,6 +41,7 @@
#include "catalog/pg_database.h"
#include "commands/progress.h"
#include "commands/tablespace.h"
+#include "commands/wait.h"
#include "common/controldata_utils.h"
#include "miscadmin.h"
#include "pg_trace.h"
@@ -7312,6 +7313,15 @@ StartupXLOG(void)
break;
}
+ /*
+ * If we replayed an LSN that someone was waiting for,
+ * set latches in shared memory array to notify the waiter.
+ */
+ if (XLogCtl->lastReplayedEndRecPtr >= GetMinWait())
+ {
+ WaitSetLatch(XLogCtl->lastReplayedEndRecPtr);
+ }
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogreader, LOG, false);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index d4815d3ce65..9b310926c12 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -57,6 +57,7 @@ OBJS = \
user.o \
vacuum.o \
variable.o \
- view.o
+ view.o \
+ wait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 00000000000..bd4f57630eb
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,410 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ * Implements WAIT FOR, which allows
+ * waiting for LSN to have been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ * src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <float.h>
+#include <math.h>
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlogdefs.h"
+#include "access/xlog.h"
+#include "catalog/pg_type.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/backendid.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shmem array */
+static void AddEvent(XLogRecPtr trg_lsn);
+static void DeleteEvent(void);
+
+/* Shared memory structures */
+typedef struct
+{
+ XLogRecPtr trg_lsn;
+ /*
+ * Left struct BIDState here for compatibility with
+ * a planned future patch that will allow waiting for XIDs.
+ */
+} BIDState;
+
+typedef struct
+{
+ int backend_maxid;
+ XLogRecPtr min_lsn;
+ slock_t mutex;
+ BIDState event_arr[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static volatile WaitState *state;
+
+/* Add event of the current backend to shmem array */
+static void
+AddEvent(XLogRecPtr trg_lsn)
+{
+ SpinLockAcquire(&state->mutex);
+ if (state->backend_maxid < MyBackendId)
+ state->backend_maxid = MyBackendId;
+
+ state->event_arr[MyBackendId].trg_lsn = trg_lsn;
+
+ if (trg_lsn < state->min_lsn)
+ state->min_lsn = trg_lsn;
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete event of the current backend from the shared array.
+ *
+ * TODO: Consider state cleanup on backend failure.
+ */
+static void
+DeleteEvent(void)
+{
+ int i;
+ XLogRecPtr trg_lsn = state->event_arr[MyBackendId].trg_lsn;
+
+ state->event_arr[MyBackendId].trg_lsn = InvalidXLogRecPtr;
+
+ SpinLockAcquire(&state->mutex);
+ /* Update state->min_lsn iff it is nessesary for choosing next min_lsn */
+ if (state->min_lsn == trg_lsn)
+ {
+ state->min_lsn = PG_UINT64_MAX;
+ for (i = 2; i <= state->backend_maxid; i++)
+ if (state->event_arr[i].trg_lsn != InvalidXLogRecPtr &&
+ state->event_arr[i].trg_lsn < state->min_lsn)
+ state->min_lsn = state->event_arr[i].trg_lsn;
+ }
+
+ if (state->backend_maxid == MyBackendId)
+ for (i = (MyBackendId); i >= 2; i--)
+ if (state->event_arr[i].trg_lsn != InvalidXLogRecPtr)
+ {
+ state->backend_maxid = i;
+ break;
+ }
+
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(WaitState, event_arr);
+ size = add_size(size, mul_size(MaxBackends + 1, sizeof(BIDState)));
+ return size;
+}
+
+/* Init array of events in shared memory */
+void
+WaitShmemInit(void)
+{
+ bool found;
+ uint32 i;
+
+ state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+ WaitShmemSize(),
+ &found);
+ if (!found)
+ {
+ SpinLockInit(&state->mutex);
+
+ for (i = 0; i < (MaxBackends+1); i++)
+ state->event_arr[i].trg_lsn = InvalidXLogRecPtr;
+
+ state->backend_maxid = 0;
+ state->min_lsn = PG_UINT64_MAX;
+ }
+}
+
+/* Set all latches in shared memory to signal that new LSN has been replayed */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+ uint32 i;
+ int backend_maxid;
+ PGPROC *backend;
+
+ SpinLockAcquire(&state->mutex);
+ backend_maxid = state->backend_maxid;
+ SpinLockRelease(&state->mutex);
+
+ for (i = 2; i <= backend_maxid; i++)
+ {
+ backend = BackendIdGetProc(i);
+ if (state->event_arr[i].trg_lsn != 0)
+ {
+ if (backend && state->event_arr[i].trg_lsn <= cur_lsn)
+ SetLatch(&backend->procLatch);
+ }
+ }
+}
+
+/* Get minimal LSN that will be next */
+XLogRecPtr
+GetMinWait(void)
+{
+ return state->min_lsn;
+}
+
+/*
+ * On WAIT use MyLatch to wait till LSN is replayed,
+ * postmaster dies or timeout happens.
+ */
+int
+WaitUtility(XLogRecPtr lsn, const float8 secs, DestReceiver *dest)
+{
+ XLogRecPtr trg_lsn = lsn;
+ XLogRecPtr cur_lsn = GetXLogReplayRecPtr(NULL);
+ int latch_events;
+ float8 endtime;
+ TupOutputState *tstate;
+ TupleDesc tupdesc;
+ char *value = "f";
+
+ latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+ if (lsn)
+ AddEvent(trg_lsn);
+
+#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
+ endtime = GetNowFloat() + secs;
+
+ for (;;)
+ {
+ int rc;
+ float8 delay = 0;
+ long delay_ms;
+
+ if (secs > 0)
+ delay = endtime - GetNowFloat();
+ else if (secs == 0) /* 1 minute timeout to check for Interupts */
+ delay = 10;
+ else
+ delay = 1;
+
+ if (delay > 0.0)
+ delay_ms = (long) ceil(delay * 1000.0);
+ else
+ break;
+
+ /*
+ * If received an interruption from CHECK_FOR_INTERRUPTS,
+ * then delete the current event from array.
+ */
+ if (InterruptPending)
+ {
+ if (lsn)
+ DeleteEvent();
+ ProcessInterrupts();
+ }
+
+ if (lsn && rc & WL_LATCH_SET)
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+
+ /* If LSN has been replayed */
+ if (lsn && trg_lsn <= cur_lsn)
+ break;
+
+ /* If postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ /* A little hack similar to SnapshotResetXmin to work out of snapshot */
+ MyPgXact->xmin = InvalidTransactionId;
+ rc = WaitLatch(MyLatch, latch_events, delay_ms,
+ WAIT_EVENT_CLIENT_READ);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+ }
+
+ DeleteEvent();
+
+ if (lsn && trg_lsn > cur_lsn)
+ elog(NOTICE,"LSN is not reached. Try to increase wait time.");
+ else
+ value = "t";
+
+ /* Need a tuple descriptor representing a single TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+ /* Prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+ /* Send it */
+ do_text_output_oneline(tstate, value);
+ end_tup_output(tstate);
+ return strcmp(value,"t");
+}
+
+void
+WaitTimeUtility(float8 delay)
+{
+ int latch_events;
+
+ if (delay <= 0)
+ return;
+
+ latch_events = WL_TIMEOUT | WL_POSTMASTER_DEATH;
+
+ MyPgXact->xmin = InvalidTransactionId;
+ WaitLatch(MyLatch, latch_events, (long) ceil(delay * 1000.0), WAIT_EVENT_CLIENT_READ);
+ ResetLatch(MyLatch);
+}
+
+/*
+ * Get the amount of seconds left till the specified time.
+ */
+float8
+WaitTimeResolve(Const *time)
+{
+ int ret;
+ float8 val;
+ Oid types[] = { time->consttype };
+ Datum values[] = { time->constvalue };
+ char nulls[] = { " " };
+ Datum result;
+ bool isnull;
+
+ SPI_connect();
+
+ if (time->consttype == 1083)
+ ret = SPI_execute_with_args("select extract (epoch from ($1 - now()::time))",
+ 1, types, values, nulls, true, 0);
+ else if (time->consttype == 1266)
+ ret = SPI_execute_with_args("select extract (epoch from (timezone('UTC',$1)::time - timezone('UTC', now()::timetz)::time))",
+ 1, types, values, nulls, true, 0);
+ else
+ ret = SPI_execute_with_args("select extract (epoch from ($1 - now()))",
+ 1, types, values, nulls, true, 0);
+
+ Assert(ret >= 0);
+ result = SPI_getbinval(SPI_tuptable->vals[0],
+ SPI_tuptable->tupdesc,
+ 1, &isnull);
+
+ Assert(!isnull);
+ val = DatumGetFloat8(result);
+
+ elog(INFO, "time: %f", val);
+
+ SPI_finish();
+ return val;
+}
+
+/* Implementation of WAIT FOR */
+int
+WaitMain(WaitStmt *stmt, DestReceiver *dest)
+{
+ ListCell *events;
+ float8 delay = 0;
+ float8 final_delay = 0;
+ XLogRecPtr lsn = InvalidXLogRecPtr;
+ XLogRecPtr final_lsn = InvalidXLogRecPtr;
+ bool has_lsn = false;
+ bool wait_forever = true;
+ int res = 1;
+
+ if (stmt->strategy == WAIT_FOR_ANY)
+ {
+ /* Prepare to find minimum LSN and delay */
+ final_delay = DBL_MAX;
+ final_lsn = PG_UINT64_MAX;
+ }
+
+ /* Extract options from the statement node tree */
+ foreach(events, stmt->events)
+ {
+ WaitStmt *event = (WaitStmt *) lfirst(events);
+
+ /* LSN to wait for */
+ if (event->lsn)
+ {
+ has_lsn = true;
+ lsn = DatumGetLSN(
+ DirectFunctionCall1(pg_lsn_in,
+ CStringGetDatum(event->lsn)));
+
+ /*
+ * When waiting for ALL, select max LSN to wait for.
+ * When waiting for ANY, select min LSN to wait for.
+ */
+ if ((stmt->strategy == WAIT_FOR_ALL && final_lsn <= lsn) ||
+ (stmt->strategy == WAIT_FOR_ANY && final_lsn > lsn))
+ {
+ final_lsn = lsn;
+ }
+ }
+
+ /* Time delay to wait for */
+ if (event->time || event->delay)
+ {
+ wait_forever = false;
+
+ if (event->delay)
+ delay = event->delay / 1000.0;
+
+ if (event->time)
+ {
+ Const *time = (Const *) event->time;
+ delay = WaitTimeResolve(time);
+ }
+
+ if (delay < 0)
+ delay = 0;
+
+ /*
+ * When waiting for ALL, select max delay to wait for.
+ * When waiting for ANY, select min delay to wait for.
+ */
+ if ((stmt->strategy == WAIT_FOR_ALL && final_delay <= delay) ||
+ (stmt->strategy == WAIT_FOR_ANY && final_delay > delay))
+ {
+ final_delay = delay;
+ }
+ }
+ }
+
+ if (!has_lsn)
+ {
+ WaitTimeUtility(final_delay);
+ res = 0;
+ }
+ else
+ res = WaitUtility(final_lsn, wait_forever ? 0 : final_delay, dest);
+
+ return res;
+}
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index e084c3f0699..c980c6d560e 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2760,6 +2760,18 @@ _outDefElem(StringInfo str, const DefElem *node)
WRITE_LOCATION_FIELD(location);
}
+static void
+_outWaitStmt(StringInfo str, const WaitStmt *node)
+{
+ WRITE_NODE_TYPE("WAITSTMT");
+
+ WRITE_STRING_FIELD(lsn);
+ WRITE_INT_FIELD(delay);
+ WRITE_NODE_FIELD(events);
+ WRITE_NODE_FIELD(time);
+ WRITE_ENUM_FIELD(strategy, WaitForStrategy);
+}
+
static void
_outTableLikeClause(StringInfo str, const TableLikeClause *node)
{
@@ -4305,6 +4317,9 @@ outNode(StringInfo str, const void *obj)
case T_PartitionRangeDatum:
_outPartitionRangeDatum(str, obj);
break;
+ case T_WaitStmt:
+ _outWaitStmt(str, obj);
+ break;
default:
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 6676412842b..bc2304a0ba1 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -78,6 +78,7 @@ static Query *transformCreateTableAsStmt(ParseState *pstate,
CreateTableAsStmt *stmt);
static Query *transformCallStmt(ParseState *pstate,
CallStmt *stmt);
+static void transformWaitForStmt(ParseState *pstate, WaitStmt *stmt);
static void transformLockingClause(ParseState *pstate, Query *qry,
LockingClause *lc, bool pushedDown);
#ifdef RAW_EXPRESSION_COVERAGE_TEST
@@ -326,6 +327,19 @@ transformStmt(ParseState *pstate, Node *parseTree)
result = transformCallStmt(pstate,
(CallStmt *) parseTree);
break;
+ case T_WaitStmt:
+ transformWaitForStmt(pstate, (WaitStmt *) parseTree);
+ result = makeNode(Query);
+ result->commandType = CMD_UTILITY;
+ result->utilityStmt = (Node *) parseTree;
+ break;
+ case T_TransactionStmt:
+ {
+ TransactionStmt *stmt = (TransactionStmt *) parseTree;
+ if ((stmt->kind == TRANS_STMT_BEGIN ||
+ stmt->kind == TRANS_STMT_START) && stmt->wait)
+ transformWaitForStmt(pstate, (WaitStmt *) stmt->wait);
+ }
default:
@@ -2981,6 +2995,18 @@ applyLockingClause(Query *qry, Index rtindex,
qry->rowMarks = lappend(qry->rowMarks, rc);
}
+static void
+transformWaitForStmt(ParseState *pstate, WaitStmt *stmt)
+{
+ ListCell *events;
+
+ foreach(events, stmt->events)
+ {
+ WaitStmt *event = (WaitStmt *) lfirst(events);
+ event->time = transformExpr(pstate, event->time, EXPR_KIND_OTHER);
+ }
+}
+
/*
* Coverage testing for raw_expression_tree_walker().
*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 7e384f956c8..63e7b7224c3 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -276,7 +276,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SecLabelStmt SelectStmt TransactionStmt TruncateStmt
UnlistenStmt UpdateStmt VacuumStmt
VariableResetStmt VariableSetStmt VariableShowStmt
- ViewStmt CheckPointStmt CreateConversionStmt
+ ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
DeallocateStmt PrepareStmt ExecuteStmt
DropOwnedStmt ReassignOwnedStmt
AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -487,7 +487,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <list> row explicit_row implicit_row type_list array_expr_list
%type <node> case_expr case_arg when_clause case_default
%type <list> when_clause_list
-%type <ival> sub_type opt_materialized
+%type <ival> sub_type wait_strategy opt_materialized
%type <value> NumericOnly
%type <list> NumericOnly_list
%type <alias> alias_clause opt_alias_clause
@@ -591,6 +591,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <partboundspec> PartitionBoundSpec
%type <list> hash_partbound
%type <defelt> hash_partbound_elem
+%type <list> wait_list
+%type <node> WaitEvent wait_for
/*
* Non-keyword token types. These are hard-wired into the "flex" lexer.
@@ -660,7 +662,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
- LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+ LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN
MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
@@ -690,7 +692,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P
TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
- TIES TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
+ TIES TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
TREAT TRIGGER TRIM TRUE_P
TRUNCATE TRUSTED TYPE_P TYPES_P
@@ -700,7 +702,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
VERBOSE VERSION_P VIEW VIEWS VOLATILE
- WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+ WAIT WHEN WHERE WHITESPACE_P WINDOW
+ WITH WITHIN WITHOUT WORK WRAPPER WRITE
XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -954,6 +957,7 @@ stmt :
| VariableSetStmt
| VariableShowStmt
| ViewStmt
+ | WaitStmt
| /*EMPTY*/
{ $$ = NULL; }
;
@@ -9930,18 +9934,20 @@ TransactionStmt:
n->chain = $3;
$$ = (Node *)n;
}
- | BEGIN_P opt_transaction transaction_mode_list_or_empty
+ | BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_BEGIN;
n->options = $3;
+ n->wait = $4;
$$ = (Node *)n;
}
- | START TRANSACTION transaction_mode_list_or_empty
+ | START TRANSACTION transaction_mode_list_or_empty wait_for
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_START;
n->options = $3;
+ n->wait = $4;
$$ = (Node *)n;
}
| COMMIT opt_transaction opt_transaction_chain
@@ -14147,6 +14153,74 @@ xml_passing_mech:
| BY VALUE_P
;
+/*****************************************************************************
+ *
+ * QUERY:
+ * WAIT FOR <event> [, <event> ...]
+ * event is one of:
+ * LSN value
+ * TIMEOUT delay
+ * timestamp
+ *
+ *****************************************************************************/
+WaitStmt:
+ WAIT FOR wait_strategy wait_list
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->strategy = $3;
+ n->events = $4;
+ $$ = (Node *)n;
+ }
+ ;
+wait_for:
+ WAIT FOR wait_strategy wait_list
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->strategy = $3;
+ n->events = $4;
+ $$ = (Node *)n;
+ }
+ | /* EMPTY */ { $$ = NULL; };
+
+wait_strategy:
+ ALL { $$ = WAIT_FOR_ALL; }
+ | ANY { $$ = WAIT_FOR_ANY; }
+ | /* EMPTY */ { $$ = WAIT_FOR_ALL; }
+ ;
+
+wait_list:
+ WaitEvent { $$ = list_make1($1); }
+ | wait_list ',' WaitEvent { $$ = lappend($1, $3); }
+ | wait_list WaitEvent { $$ = lappend($1, $2); }
+ ;
+
+WaitEvent:
+ LSN Sconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn = $2;
+ n->delay = 0;
+ n->time = NULL;
+ $$ = (Node *)n;
+ }
+ | TIMEOUT Iconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn = NULL;
+ n->delay = $2;
+ n->time = NULL;
+ $$ = (Node *)n;
+ }
+ | ConstDatetime Sconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn = NULL;
+ n->delay = 0;
+ n->time = makeStringConstCast($2, @2, $1);
+ $$ = (Node *)n;
+ }
+ ;
+
/*
* Aggregate decoration clauses
@@ -15291,6 +15365,7 @@ unreserved_keyword:
| LOCK_P
| LOCKED
| LOGGED
+ | LSN
| MAPPING
| MATCH
| MATERIALIZED
@@ -15413,6 +15488,7 @@ unreserved_keyword:
| TEMPORARY
| TEXT_P
| TIES
+ | TIMEOUT
| TRANSACTION
| TRANSFORM
| TRIGGER
@@ -15439,6 +15515,7 @@ unreserved_keyword:
| VIEW
| VIEWS
| VOLATILE
+ | WAIT
| WHITESPACE_P
| WITHIN
| WITHOUT
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 427b0d59cde..bb8af349808 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
#include "access/subtrans.h"
#include "access/twophase.h"
#include "commands/async.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -147,6 +148,7 @@ CreateSharedMemoryAndSemaphores(void)
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
+ size = add_size(size, WaitShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -264,6 +266,11 @@ CreateSharedMemoryAndSemaphores(void)
SyncScanShmemInit();
AsyncShmemInit();
+ /*
+ * Init array of Latches in shared memory for WAIT
+ */
+ WaitShmemInit();
+
#ifdef EXEC_BACKEND
/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index b1f7f6e2d01..daec0551717 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -15,6 +15,7 @@
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include <float.h>
#include "access/htup_details.h"
#include "access/reloptions.h"
@@ -57,6 +58,7 @@
#include "commands/user.h"
#include "commands/vacuum.h"
#include "commands/view.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
@@ -70,6 +72,9 @@
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+#include "utils/pg_lsn.h"
/* Hook for plugins to get control in ProcessUtility() */
ProcessUtility_hook_type ProcessUtility_hook = NULL;
@@ -268,6 +273,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
case T_LoadStmt:
case T_PrepareStmt:
case T_UnlistenStmt:
+ case T_WaitStmt:
case T_VariableSetStmt:
{
/*
@@ -591,6 +597,11 @@ standard_ProcessUtility(PlannedStmt *pstmt,
case TRANS_STMT_START:
{
ListCell *lc;
+ WaitStmt *waitstmt = (WaitStmt *) stmt->wait;
+
+ /* If we have Wait event, but it not reached */
+ if (stmt->wait && WaitMain(waitstmt, dest) != 0)
+ break;
BeginTransactionBlock();
foreach(lc, stmt->options)
@@ -1062,6 +1073,13 @@ standard_ProcessUtility(PlannedStmt *pstmt,
break;
}
+ case T_WaitStmt:
+ {
+ WaitStmt *stmt = (WaitStmt *) parsetree;
+ WaitMain(stmt, dest);
+ break;
+ }
+
default:
/* All other statement types have event trigger support */
ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2718,6 +2736,10 @@ CreateCommandTag(Node *parsetree)
tag = CMDTAG_NOTIFY;
break;
+ case T_WaitStmt:
+ tag = CMDTAG_WAIT;
+ break;
+
case T_ListenStmt:
tag = CMDTAG_LISTEN;
break;
@@ -3357,6 +3379,10 @@ GetCommandLogLevel(Node *parsetree)
lev = LOGSTMT_ALL;
break;
+ case T_WaitStmt:
+ lev = LOGSTMT_ALL;
+ break;
+
case T_ListenStmt:
lev = LOGSTMT_ALL;
break;
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 00000000000..11115b9dab6
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,27 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ * prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2016, Regents of PostgresPRO
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+
+extern int WaitUtility(XLogRecPtr lsn, const float8 delay, DestReceiver *dest);
+extern void WaitTimeUtility(float8 delay);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWait(void);
+extern float8 WaitTimeResolve(Const *time);
+extern int WaitMain(WaitStmt *stmt, DestReceiver *dest);
+
+#endif /* WAIT_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 8a76afe8ccb..348de76c5f4 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -488,6 +488,7 @@ typedef enum NodeTag
T_DropReplicationSlotCmd,
T_StartReplicationCmd,
T_TimeLineHistoryCmd,
+ T_WaitStmt,
T_SQLCmd,
/*
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 2039b424499..30af2cfd505 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3054,6 +3054,7 @@ typedef struct TransactionStmt
char *savepoint_name; /* for savepoint commands */
char *gid; /* for two-phase-commit related commands */
bool chain; /* AND CHAIN option */
+ Node *wait; /* Wait for event node or NULL */
} TransactionStmt;
/* ----------------------
@@ -3563,4 +3564,25 @@ typedef struct DropSubscriptionStmt
DropBehavior behavior; /* RESTRICT or CASCADE behavior */
} DropSubscriptionStmt;
+/* ----------------------
+ * Wait Statement
+ * ----------------------
+ */
+
+typedef enum WaitForStrategy
+{
+ WAIT_FOR_ANY = 0,
+ WAIT_FOR_ALL
+} WaitForStrategy;
+
+typedef struct WaitStmt
+{
+ NodeTag type;
+ WaitForStrategy strategy;
+ List *events; /* option */
+ char *lsn; /* Target LSN to wait for */
+ int delay; /* Timeout when waiting for LSN, in msec */
+ Node *time; /* Wait for timestamp */
+} WaitStmt;
+
#endif /* PARSENODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index b1184c2d158..dd22e358b9a 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -243,6 +243,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
+PG_KEYWORD("lsn", LSN, UNRESERVED_KEYWORD)
PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)
PG_KEYWORD("materialized", MATERIALIZED, UNRESERVED_KEYWORD)
@@ -404,6 +405,7 @@ PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD)
PG_KEYWORD("then", THEN, RESERVED_KEYWORD)
PG_KEYWORD("ties", TIES, UNRESERVED_KEYWORD)
PG_KEYWORD("time", TIME, COL_NAME_KEYWORD)
+PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD)
PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD)
PG_KEYWORD("to", TO, RESERVED_KEYWORD)
PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD)
@@ -444,6 +446,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD)
PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)
diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h
index 8ef0f55e748..430bb5c7171 100644
--- a/src/include/tcop/cmdtaglist.h
+++ b/src/include/tcop/cmdtaglist.h
@@ -216,3 +216,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false)
PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false)
PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true)
PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false)
+PG_CMDTAG(CMDTAG_WAIT, "WAIT FOR", false, false, false)
diff --git a/src/test/recovery/t/018_waitfor.pl b/src/test/recovery/t/018_waitfor.pl
new file mode 100644
index 00000000000..42900dd8867
--- /dev/null
+++ b/src/test/recovery/t/018_waitfor.pl
@@ -0,0 +1,106 @@
+# Checks WAIT FOR
+
+# TODO:
+# - test WAIT FOR ALL (many LSNs without time, many LSNs with time)
+# - test WAIT FOR ANY (many LSNs without time, many LSNs with time)
+# - test WAIT FOR TIMESTAMP (with LSN + without LSN)
+# - test WAIT FOR TIMEOUT (with LSN + without LSN)
+
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 2;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+
+# And some content
+$node_master->safe_psql('postgres',
+ "CREATE TABLE tab_int AS SELECT generate_series(1, 10) AS a");
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create streaming standby from backup
+my $node_standby = get_new_node('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_master, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf(
+ 'postgresql.conf', qq(
+recovery_min_apply_delay = '${delay}s'
+));
+$node_standby->start;
+
+# Make new content on master and check its presence in standby depending
+# on the delay applied above. Before doing the insertion, get the
+# current timestamp that will be used as a comparison base. Even on slow
+# machines, this allows to have a predictable behavior when comparing the
+# delay between data insertion moment on master and replay time on standby.
+my $master_insert_time = time();
+$node_master->safe_psql('postgres',
+ "INSERT INTO tab_int VALUES (generate_series(11, 20))");
+
+# Now wait for replay to complete on standby. We're done waiting when the
+# standby has replayed up to the previously saved master LSN.
+my $lsn1 =
+ $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Check that waitlsn is able to setup infinite waiting loop and exit
+# it without timeouts.
+$node_standby->safe_psql('postgres',
+ "WAIT FOR LSN '$lsn1'")
+ or die "standby never caught up";
+
+# This test is successful iff the LSN has been applied with at least
+# the configured apply delay.
+my $time_waited = time() - $master_insert_time;
+ok($time_waited >= $delay, "standby applies WAL only after replication delay");
+
+$node_master->safe_psql('postgres',
+ "INSERT INTO tab_int VALUES (generate_series(21, 30))");
+
+# Check that waitlsn can return result immediately with NOWAIT.
+$node_standby->poll_query_until('postgres',
+ "WAIT FOR LSN '$lsn1' TIMEOUT 1", 't')
+ or die "standby never caught up";
+
+# Check that timeouts work on their own, without an LSN specified
+my $timeout = 1000; # 1 second
+my $start_time = time();
+$node_standby->safe_psql('postgres', "WAIT FOR TIMEOUT $timeout");
+$time_waited = time() - $start_time;
+note("Asked for timeout $timeout ms, got delayed for $time_waited second(s)");
+ok($time_waited >= $timeout / 1000, "WAIT FOR TIMEOUT works without LSN");
+
+# ==============================================================================
+# TODO: Check that isolation level doesn't get broken due to WAIT
+# TODO: Add more tests for WAIT FOR ALL / WAIT FOR ANY / WAIT FOR DATETIME
+# TODO: Cleanup tests
+# ==============================================================================
+$node_master->safe_psql('postgres',
+ "INSERT INTO tab_int VALUES (generate_series(31, 40))");
+
+my $lsn2 =
+ $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+$node_master->safe_psql('postgres',
+ "INSERT INTO tab_int VALUES (generate_series(41, 500000))");
+
+my $standby_max =
+ $node_standby->safe_psql('postgres',
+ "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ WAIT FOR LSN '$lsn2'; SELECT pg_sleep_for('2s'); SELECT max(a) FROM tab_int; COMMIT;");
+
+my $lsn_master =
+ $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $lsn_standby =
+ $node_standby->safe_psql('postgres', "SELECT pg_last_wal_replay_lsn()");
+note("Waited for LSN $lsn2, got LSN $lsn_standby on standby (current master LSN = $lsn_master), standby max = $standby_max");
+
+$node_master->stop;
+$node_standby->stop;