I made some improvements over old implementation WAIT FOR.
Synopsis
==========
WAIT FOR [ANY | SOME | ALL] event [, event ...]
and event is:
LSN value options
TIMESTAMP value
and options is:
TIMEOUT delay
UNTIL TIMESTAMP timestamp
ALL - option used by default.
P.S. Now I testing BEGIN base WAIT prototype as discussed earlier.
--
Ivan Kartyshov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 8d91f3529e..8697f9807f 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -187,6 +187,7 @@ Complete list of usable sgml source files in this directory.
<!ENTITY update SYSTEM "update.sgml">
<!ENTITY vacuum SYSTEM "vacuum.sgml">
<!ENTITY values SYSTEM "values.sgml">
+<!ENTITY wait SYSTEM "wait.sgml">
<!-- applications and utilities -->
<!ENTITY clusterdb SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/wait.sgml b/doc/src/sgml/ref/wait.sgml
new file mode 100644
index 0000000000..9a79524779
--- /dev/null
+++ b/doc/src/sgml/ref/wait.sgml
@@ -0,0 +1,138 @@
+<!--
+doc/src/sgml/ref/waitlsn.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="sql-waitlsn">
+ <indexterm zone="sql-waitlsn">
+ <primary>WAIT FOR</primary>
+ </indexterm>
+
+ <refmeta>
+ <refentrytitle>WAIT FOR</refentrytitle>
+ <manvolnum>7</manvolnum>
+ <refmiscinfo>SQL - Language Statements</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>WAIT FOR</refname>
+ <refpurpose>wait for the target <acronym>LSN</acronym> to be replayed</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>'
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>' UNTIL TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR ALL LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>, TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR ANY LSN '<replaceable class="parameter">lsn_number</replaceable>', TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>Description</title>
+
+ <para>
+ <command>WAIT FOR</command> provides a simple
+ interprocess communication mechanism to wait for timestamp or the target log sequence
+ number (<acronym>LSN</acronym>) on standby in <productname>PostgreSQL</productname>
+ databases with master-standby asynchronous replication. When run with the
+ <replaceable>LSN</replaceable> option, the <command>WAIT FOR</command> command
+ waits for the specified <acronym>LSN</acronym> to be replayed. By default, wait
+ time is unlimited. Waiting can be interrupted using <literal>Ctrl+C</literal>, or
+ by shutting down the <literal>postgres</literal> server. You can also limit the wait
+ time using the <option>TIMEOUT</option> option.
+ </para>
+
+ </refsect1>
+
+ <refsect1>
+ <title>Parameters</title>
+
+ <variablelist>
+ <varlistentry>
+ <term><replaceable class="parameter">LSN</replaceable></term>
+ <listitem>
+ <para>
+ Specify the target log sequence number to wait for.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>TIMEOUT <replaceable class="parameter">wait_timeout</replaceable></term>
+ <listitem>
+ <para>
+ Limit the time interval to wait for the LSN to be replayed.
+ The specified <replaceable>wait_timeout</replaceable> must be an integer
+ and is measured in milliseconds.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>UNTIL TIMESTAMP <replaceable class="parameter">wait_time</replaceable></term>
+ <listitem>
+ <para>
+ Limit the time to wait for the LSN to be replayed.
+ The specified <replaceable>wait_time</replaceable> must be timestamp.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </refsect1>
+
+ <refsect1>
+ <title>Examples</title>
+
+ <para>
+ Run <literal>WAIT FOR</literal> from <application>psql</application>,
+ limiting wait time to 10000 milliseconds:
+
+<screen>
+WAIT FOR LSN '0/3F07A6B1' TIMEOUT 10000;
+NOTICE: LSN is not reached. Try to increase wait time.
+LSN reached
+-------------
+ f
+(1 row)
+</screen>
+ </para>
+
+ <para>
+ Wait until the specified <acronym>LSN</acronym> is replayed:
+<screen>
+WAIT FOR LSN '0/3F07A611';
+LSN reached
+-------------
+ t
+(1 row)
+</screen>
+ </para>
+
+ <para>
+ Limit <acronym>LSN</acronym> wait time to 500000 milliseconds, and then cancel the command:
+<screen>
+WAIT FOR LSN '0/3F0FF791' TIMEOUT 500000;
+^CCancel request sent
+NOTICE: LSN is not reached. Try to increase wait time.
+ERROR: canceling statement due to user request
+ LSN reached
+-------------
+ f
+(1 row)
+</screen>
+</para>
+ </refsect1>
+
+ <refsect1>
+ <title>Compatibility</title>
+
+ <para>
+ There is no <command>WAIT FOR</command> statement in the SQL
+ standard.
+ </para>
+ </refsect1>
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index cef09dd38b..588e96aa14 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -215,6 +215,7 @@
&update;
&vacuum;
&values;
+ &wait;
</reference>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 4361568882..f7f5a76216 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -41,6 +41,7 @@
#include "catalog/pg_database.h"
#include "commands/progress.h"
#include "commands/tablespace.h"
+#include "commands/wait.h"
#include "common/controldata_utils.h"
#include "miscadmin.h"
#include "pg_trace.h"
@@ -7285,6 +7286,15 @@ StartupXLOG(void)
break;
}
+ /*
+ * If lastReplayedEndRecPtr was updated,
+ * set latches in SHMEM array.
+ */
+ if (XLogCtl->lastReplayedEndRecPtr >= GetMinWait())
+ {
+ WaitSetLatch(XLogCtl->lastReplayedEndRecPtr);
+ }
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogreader, LOG, false);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index d4815d3ce6..9b310926c1 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -57,6 +57,7 @@ OBJS = \
user.o \
vacuum.o \
variable.o \
- view.o
+ view.o \
+ wait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 0000000000..17a201b31c
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,308 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ * Implements WAIT - a utility command that allows
+ * waiting for LSN to have been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ * src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlogdefs.h"
+#include "access/xlog.h"
+#include "catalog/pg_type.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/backendid.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shmem array */
+static void AddEvent(XLogRecPtr trg_lsn);
+static void DeleteEvent(void);
+
+/* Shared memory structures */
+typedef struct
+{
+ XLogRecPtr trg_lsn;
+ /*
+ * Left struct BIDState here for compatibility with
+ * a planned future patch that will allow waiting for XIDs.
+ */
+} BIDState;
+
+typedef struct
+{
+ int backend_maxid;
+ XLogRecPtr min_lsn;
+ slock_t mutex;
+ BIDState event_arr[FLEXIBLE_ARRAY_MEMBER];
+} GlobState;
+
+static volatile GlobState *state;
+
+/* Add event of the current backend to shmem array */
+static void
+AddEvent(XLogRecPtr trg_lsn)
+{
+ SpinLockAcquire(&state->mutex);
+ if (state->backend_maxid < MyBackendId)
+ state->backend_maxid = MyBackendId;
+
+ state->event_arr[MyBackendId].trg_lsn = trg_lsn;
+
+ if (trg_lsn < state->min_lsn)
+ state->min_lsn = trg_lsn;
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete event of the current backend from the shared array.
+ *
+ * TODO: Consider state cleanup on backend failure.
+ */
+static void
+DeleteEvent(void)
+{
+ int i;
+ XLogRecPtr trg_lsn = state->event_arr[MyBackendId].trg_lsn;
+
+ state->event_arr[MyBackendId].trg_lsn = InvalidXLogRecPtr;
+
+ SpinLockAcquire(&state->mutex);
+ /* Update state->min_lsn iff it is nessesary for choosing next min_lsn */
+ if (state->min_lsn == trg_lsn)
+ {
+ state->min_lsn = PG_UINT64_MAX;
+ for (i = 2; i <= state->backend_maxid; i++)
+ if (state->event_arr[i].trg_lsn != InvalidXLogRecPtr &&
+ state->event_arr[i].trg_lsn < state->min_lsn)
+ state->min_lsn = state->event_arr[i].trg_lsn;
+ }
+
+ if (state->backend_maxid == MyBackendId)
+ for (i = (MyBackendId); i >=2; i--)
+ if (state->event_arr[i].trg_lsn != InvalidXLogRecPtr)
+ {
+ state->backend_maxid = i;
+ break;
+ }
+
+ SpinLockRelease(&state->mutex);
+}
+
+/* Get size of shared memory for GlobState */
+Size
+WaitShmemSize(void)
+{
+ return offsetof(GlobState, event_arr) + sizeof(BIDState) * (MaxBackends+1);
+}
+
+/* Init array of events in shared memory */
+void
+WaitShmemInit(void)
+{
+ bool found;
+ uint32 i;
+
+ state = (GlobState *) ShmemInitStruct("pg_wait_lsn",
+ WaitShmemSize(),
+ &found);
+ if (!found)
+ {
+ SpinLockInit(&state->mutex);
+
+ for (i = 0; i < (MaxBackends+1); i++)
+ state->event_arr[i].trg_lsn = InvalidXLogRecPtr;
+
+ state->backend_maxid = 0;
+ state->min_lsn = PG_UINT64_MAX;
+ }
+}
+
+/* Set all latches in shared memory to signal that new LSN has been replayed */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+ uint32 i;
+ int backend_maxid;
+ PGPROC *backend;
+
+ SpinLockAcquire(&state->mutex);
+ backend_maxid = state->backend_maxid;
+ SpinLockRelease(&state->mutex);
+
+ for (i = 2; i <= backend_maxid; i++)
+ {
+ backend = BackendIdGetProc(i);
+ if (state->event_arr[i].trg_lsn != 0)
+ {
+ if (state->event_arr[i].trg_lsn <= cur_lsn)
+ SetLatch(&backend->procLatch);
+ }
+ }
+}
+
+/* Get minimal LSN that will be next */
+XLogRecPtr
+GetMinWait(void)
+{
+ return state->min_lsn;
+}
+
+/*
+ * On WAIT use MyLatch to wait till LSN is replayed,
+ * postmaster dies or timeout happens.
+ */
+void
+WaitUtility(XLogRecPtr lsn, const int delay, DestReceiver *dest)
+{
+ XLogRecPtr trg_lsn = lsn;
+ XLogRecPtr cur_lsn;
+ int latch_events;
+ uint64 tdelay = delay;
+ long secs;
+ int microsecs;
+ TimestampTz timer = GetCurrentTimestamp();
+ TupOutputState *tstate;
+ TupleDesc tupdesc;
+ char *value = "f";
+
+ if (delay > 0)
+ latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
+ else
+ latch_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
+
+ AddEvent(trg_lsn);
+
+ for (;;)
+ {
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+
+ /* If LSN has been replayed */
+ if (trg_lsn <= cur_lsn)
+ break;
+
+ /* If postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ /* If delay time is over */
+ if (latch_events & WL_TIMEOUT)
+ {
+ if (TimestampDifferenceExceeds(timer,GetCurrentTimestamp(),delay))
+ break;
+ TimestampDifference(timer,GetCurrentTimestamp(),&secs, µsecs);
+ tdelay = delay - (secs*1000 + microsecs/1000);
+ }
+
+ /* A little hack similar to SnapshotResetXmin to work out of snapshot */
+ MyPgXact->xmin = InvalidTransactionId;
+ WaitLatch(MyLatch, latch_events, tdelay, WAIT_EVENT_CLIENT_READ);
+ ResetLatch(MyLatch);
+
+ /*
+ * If received an interruption from CHECK_FOR_INTERRUPTS,
+ * then delete the current event from array.
+ */
+ if (InterruptPending)
+ {
+ DeleteEvent();
+ ProcessInterrupts();
+ }
+
+ }
+
+ DeleteEvent();
+
+ if (trg_lsn > cur_lsn)
+ elog(NOTICE,"LSN is not reached. Try to increase wait time.");
+ else
+ value = "t";
+
+ /* Need a tuple descriptor representing a single TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+ /* Prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+ /* Send it */
+ do_text_output_oneline(tstate, value);
+ end_tup_output(tstate);
+}
+
+void
+WaitTimeUtility(const int delay)
+{
+ int latch_events;
+
+ if (delay < 0)
+ return ;
+
+ latch_events = WL_TIMEOUT | WL_POSTMASTER_DEATH;
+
+ MyPgXact->xmin = InvalidTransactionId;
+ WaitLatch(MyLatch, latch_events, delay, WAIT_EVENT_CLIENT_READ);
+ ResetLatch(MyLatch);
+}
+
+/* Get universal time */
+float8
+WaitTimeResolve(Const *time)
+{
+ int ret;
+ float8 val;
+
+ Oid types[] = { time->consttype };
+ Datum values[] = { time->constvalue };
+ char nulls[] = { " " };
+
+ Datum result;
+ bool isnull;
+
+ SPI_connect();
+
+ if (time->consttype == 1083)
+ ret = SPI_execute_with_args("select extract (epoch from ($1 - now()::time))",
+ 1, types, values, nulls, true, 0);
+ else if (time->consttype == 1266)
+ ret = SPI_execute_with_args("select extract (epoch from (timezone('UTC',$1)::time - timezone('UTC', now()::timetz)::time))",
+ 1, types, values, nulls, true, 0);
+ else
+ ret = SPI_execute_with_args("select extract (epoch from ($1 - now()))",
+ 1, types, values, nulls, true, 0);
+
+ Assert(ret >= 0);
+ result = SPI_getbinval(SPI_tuptable->vals[0],
+ SPI_tuptable->tupdesc,
+ 1, &isnull);
+
+ Assert(!isnull);
+ val = DatumGetFloat8(result);
+
+ elog(INFO, "time: %f", val);
+
+ SPI_finish();
+ return val;
+}
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index e084c3f069..cc8d20a91b 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2760,6 +2760,19 @@ _outDefElem(StringInfo str, const DefElem *node)
WRITE_LOCATION_FIELD(location);
}
+static void
+_outWaitStmt(StringInfo str, const WaitStmt *node)
+{
+ WRITE_NODE_TYPE("WAITSTMT");
+
+ WRITE_STRING_FIELD(lsn);
+ WRITE_INT_FIELD(delay);
+ WRITE_NODE_FIELD(events);
+ WRITE_NODE_FIELD(time);
+ WRITE_ENUM_FIELD(wait_type, WaitType);
+ WRITE_ENUM_FIELD(strategy, WaitForStrategy);
+}
+
static void
_outTableLikeClause(StringInfo str, const TableLikeClause *node)
{
@@ -4305,6 +4318,9 @@ outNode(StringInfo str, const void *obj)
case T_PartitionRangeDatum:
_outPartitionRangeDatum(str, obj);
break;
+ case T_WaitStmt:
+ _outWaitStmt(str, obj);
+ break;
default:
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 6676412842..4543fa1b9f 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -78,6 +78,7 @@ static Query *transformCreateTableAsStmt(ParseState *pstate,
CreateTableAsStmt *stmt);
static Query *transformCallStmt(ParseState *pstate,
CallStmt *stmt);
+static Query *transformWaitForStmt(ParseState *pstate, WaitStmt *stmt);
static void transformLockingClause(ParseState *pstate, Query *qry,
LockingClause *lc, bool pushedDown);
#ifdef RAW_EXPRESSION_COVERAGE_TEST
@@ -326,6 +327,9 @@ transformStmt(ParseState *pstate, Node *parseTree)
result = transformCallStmt(pstate,
(CallStmt *) parseTree);
break;
+ case T_WaitStmt:
+ result = transformWaitForStmt(pstate, (WaitStmt *) parseTree);
+ break;
default:
@@ -2981,6 +2985,25 @@ applyLockingClause(Query *qry, Index rtindex,
qry->rowMarks = lappend(qry->rowMarks, rc);
}
+static Query *
+transformWaitForStmt(ParseState *pstate, WaitStmt *stmt)
+{
+ Query *result;
+ ListCell *events;
+
+ foreach(events, stmt->events)
+ {
+ WaitStmt *event = (WaitStmt *) lfirst(events);
+ event->time = transformExpr(pstate, event->time, EXPR_KIND_OTHER);
+ }
+
+ result = makeNode(Query);
+ result->commandType = CMD_UTILITY;
+ result->utilityStmt = (Node *) stmt;
+
+ return result;
+}
+
/*
* Coverage testing for raw_expression_tree_walker().
*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 96e7fdbcfe..8b8144d12c 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -276,7 +276,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SecLabelStmt SelectStmt TransactionStmt TruncateStmt
UnlistenStmt UpdateStmt VacuumStmt
VariableResetStmt VariableSetStmt VariableShowStmt
- ViewStmt CheckPointStmt CreateConversionStmt
+ ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
DeallocateStmt PrepareStmt ExecuteStmt
DropOwnedStmt ReassignOwnedStmt
AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -487,7 +487,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <list> row explicit_row implicit_row type_list array_expr_list
%type <node> case_expr case_arg when_clause case_default
%type <list> when_clause_list
-%type <ival> sub_type opt_materialized
+%type <ival> sub_type wait_strategy opt_materialized
%type <value> NumericOnly
%type <list> NumericOnly_list
%type <alias> alias_clause opt_alias_clause
@@ -591,6 +591,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <partboundspec> PartitionBoundSpec
%type <list> hash_partbound
%type <defelt> hash_partbound_elem
+%type <list> wait_list
+%type <node> WaitEvent
/*
* Non-keyword token types. These are hard-wired into the "flex" lexer.
@@ -660,7 +662,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
- LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+ LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN
MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
@@ -690,7 +692,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P
TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
- TIES TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
+ TIES TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
TREAT TRIGGER TRIM TRUE_P
TRUNCATE TRUSTED TYPE_P TYPES_P
@@ -700,7 +702,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
VERBOSE VERSION_P VIEW VIEWS VOLATILE
- WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+ WAIT WHEN WHERE WHITESPACE_P WINDOW
+ WITH WITHIN WITHOUT WORK WRAPPER WRITE
XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -953,6 +956,7 @@ stmt :
| VariableSetStmt
| VariableShowStmt
| ViewStmt
+ | WaitStmt
| /*EMPTY*/
{ $$ = NULL; }
;
@@ -14128,6 +14132,82 @@ xml_passing_mech:
| BY VALUE_P
;
+/*****************************************************************************
+ *
+ * QUERY:
+ * WAIT FOR <event> [, <event> ...]
+ * event [option]:
+ * LSN value
+ * TIMEOUT value
+ * TIMESTAMP timestamp
+ * option:
+ * TIMEOUT delay
+ * UNTIL TIMESTAMP timestamp
+ *
+ *****************************************************************************/
+WaitStmt:
+ WAIT FOR wait_strategy wait_list
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->wait_type = WAIT_EVENT_NONE;
+ n->strategy = $3;
+ n->events = $4;
+ $$ = (Node *)n;
+ }
+ ;
+
+wait_strategy:
+ ALL { $$ = WAIT_FOR_ALL; }
+ | ANY { $$ = WAIT_FOR_ANY; }
+ | /* EMPTY */ { $$ = WAIT_FOR_ALL; }
+ ;
+
+wait_list:
+ WaitEvent { $$ = list_make1($1); }
+ | wait_list ',' WaitEvent { $$ = lappend($1, $3); }
+ | wait_list WaitEvent { $$ = lappend($1, $2); }
+ ;
+
+WaitEvent:
+ LSN Sconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->wait_type = WAIT_EVENT_LSN;
+ n->lsn = $2;
+ n->delay = 0;
+ n->time = NULL;
+ $$ = (Node *)n;
+ }
+
+ | LSN Sconst TIMEOUT Iconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->wait_type = WAIT_EVENT_MIX;
+ n->lsn = $2;
+ n->delay = $4;
+ n->time = NULL;
+ $$ = (Node *)n;
+ }
+ | LSN Sconst UNTIL ConstDatetime Sconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->wait_type = WAIT_EVENT_MIX;
+ n->lsn = $2;
+ n->delay = 0;
+ n->time = makeStringConstCast($5, @5, $4);
+ $$ = (Node *)n;
+ }
+ | ConstDatetime Sconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->wait_type = WAIT_EVENT_TIME;
+ n->lsn = NULL;
+ n->delay = 0;
+ n->time = makeStringConstCast($2, @2, $1);
+ $$ = (Node *)n;
+ }
+ ;
+
/*
* Aggregate decoration clauses
@@ -15272,6 +15352,7 @@ unreserved_keyword:
| LOCK_P
| LOCKED
| LOGGED
+ | LSN
| MAPPING
| MATCH
| MATERIALIZED
@@ -15394,6 +15475,7 @@ unreserved_keyword:
| TEMPORARY
| TEXT_P
| TIES
+ | TIMEOUT
| TRANSACTION
| TRANSFORM
| TRIGGER
@@ -15420,6 +15502,7 @@ unreserved_keyword:
| VIEW
| VIEWS
| VOLATILE
+ | WAIT
| WHITESPACE_P
| WITHIN
| WITHOUT
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 427b0d59cd..8c3d196a9a 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
#include "access/subtrans.h"
#include "access/twophase.h"
#include "commands/async.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -147,6 +148,7 @@ CreateSharedMemoryAndSemaphores(void)
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
+ size = add_size(size, WaitShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -264,6 +266,11 @@ CreateSharedMemoryAndSemaphores(void)
SyncScanShmemInit();
AsyncShmemInit();
+ /*
+ * Init array of Latches in SHMEM for Wait
+ */
+ WaitShmemInit();
+
#ifdef EXEC_BACKEND
/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 1b460a2612..b3e6dcf492 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -15,6 +15,7 @@
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include <float.h>
#include "access/htup_details.h"
#include "access/reloptions.h"
@@ -57,6 +58,7 @@
#include "commands/user.h"
#include "commands/vacuum.h"
#include "commands/view.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
@@ -70,6 +72,9 @@
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+#include "utils/pg_lsn.h"
/* Hook for plugins to get control in ProcessUtility() */
ProcessUtility_hook_type ProcessUtility_hook = NULL;
@@ -267,6 +272,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
case T_LoadStmt:
case T_PrepareStmt:
case T_UnlistenStmt:
+ case T_WaitStmt:
case T_VariableSetStmt:
{
/*
@@ -1061,6 +1067,104 @@ standard_ProcessUtility(PlannedStmt *pstmt,
break;
}
+ case T_WaitStmt:
+ {
+ WaitStmt *stmt = (WaitStmt *) parsetree;
+ float8 time_val = 0;
+ float8 val = 0;
+ ListCell *events;
+ XLogRecPtr lsn = InvalidXLogRecPtr;
+ XLogRecPtr trg_lsn = InvalidXLogRecPtr;
+
+ if (stmt->strategy == WAIT_FOR_ANY)
+ {
+ time_val = DBL_MAX;
+ lsn = PG_UINT64_MAX;
+ }
+
+ /* Extract options from the statement node tree */
+ foreach(events, stmt->events)
+ {
+ WaitStmt *event = (WaitStmt *) lfirst(events);
+
+ if (event->lsn)
+ {
+ int32 res;
+ trg_lsn = DatumGetLSN(
+ DirectFunctionCall1(pg_lsn_in,
+ CStringGetDatum(event->lsn)));
+ res = DatumGetUInt32(
+ DirectFunctionCall2(pg_lsn_cmp,
+ lsn, trg_lsn));
+
+ /* Nice behavour on LSN from past */
+ if (stmt->strategy == WAIT_FOR_ALL)
+ {
+ if (res <= 0)
+ lsn = trg_lsn;
+ }
+ else
+ {
+ if (res > 0)
+ lsn = trg_lsn;
+ }
+
+ if (stmt->wait_type == WAIT_EVENT_TIME)
+ stmt->wait_type = WAIT_EVENT_MIX;
+ else if (stmt->wait_type == WAIT_EVENT_NONE)
+ stmt->wait_type = WAIT_EVENT_LSN;
+
+ if (event->delay)
+ {
+ if (stmt->strategy == WAIT_FOR_ANY)
+ {
+ if (event->delay < time_val)
+ time_val = event->delay / 1000;
+ }
+ else
+ {
+ if (event->delay >= time_val)
+ time_val = event->delay / 1000;
+ }
+ }
+ }
+
+ if (event->time)
+ {
+ Const *time = (Const *) event->time;
+ val = WaitTimeResolve(time);
+
+ if (stmt->wait_type == WAIT_EVENT_LSN)
+ stmt->wait_type = WAIT_EVENT_MIX;
+ else if (stmt->wait_type == WAIT_EVENT_NONE)
+ stmt->wait_type = WAIT_EVENT_TIME;
+
+ /* if val == 0 ?? */
+ if (stmt->strategy == WAIT_FOR_ALL)
+ {
+ if (time_val <= val)
+ time_val = val;
+ }
+ else
+ {
+ if (time_val > val)
+ time_val = val;
+ }
+ }
+
+ }
+
+ /* Time <= 0 iff time event is passed */
+ if (time_val <= 0)
+ time_val = 1;
+
+ if (stmt->wait_type == WAIT_EVENT_TIME)
+ WaitTimeUtility(time_val * 1000);
+ else
+ WaitUtility(lsn, (int)(time_val * 1000), dest);
+ }
+ break;
+
default:
/* All other statement types have event trigger support */
ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2713,6 +2817,10 @@ CreateCommandTag(Node *parsetree)
tag = CMDTAG_NOTIFY;
break;
+ case T_WaitStmt:
+ tag = CMDTAG_WAIT;
+ break;
+
case T_ListenStmt:
tag = CMDTAG_LISTEN;
break;
@@ -3344,6 +3452,10 @@ GetCommandLogLevel(Node *parsetree)
lev = LOGSTMT_ALL;
break;
+ case T_WaitStmt:
+ lev = LOGSTMT_ALL;
+ break;
+
case T_ListenStmt:
lev = LOGSTMT_ALL;
break;
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 0000000000..6aebf67459
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ * prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2016, Regents of PostgresPRO
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+
+extern void WaitUtility(XLogRecPtr lsn, const int delay, DestReceiver *dest);
+extern void WaitTimeUtility(const int delay);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWait(void);
+extern float8 WaitTimeResolve(Const *time);
+
+#endif /* WAIT_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index baced7eec0..a3cfa92ab2 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -487,6 +487,7 @@ typedef enum NodeTag
T_DropReplicationSlotCmd,
T_StartReplicationCmd,
T_TimeLineHistoryCmd,
+ T_WaitStmt,
T_SQLCmd,
/*
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index da0706add5..b7a4fa8bcc 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3554,4 +3554,34 @@ typedef struct DropSubscriptionStmt
DropBehavior behavior; /* RESTRICT or CASCADE behavior */
} DropSubscriptionStmt;
+/* ----------------------
+ * Wait Statement
+ * ----------------------
+ */
+
+typedef enum WaitType
+{
+ WAIT_EVENT_NONE = 0,
+ WAIT_EVENT_LSN,
+ WAIT_EVENT_TIME,
+ WAIT_EVENT_MIX
+} WaitType;
+
+typedef enum WaitForStrategy
+{
+ WAIT_FOR_ANY = 0,
+ WAIT_FOR_ALL
+} WaitForStrategy;
+
+typedef struct WaitStmt
+{
+ NodeTag type;
+ WaitType wait_type;
+ WaitForStrategy strategy;
+ List *events; /* option */
+ char *lsn; /* Target LSN to wait for */
+ int delay; /* Timeout when waiting for LSN, in msec */
+ Node *time; /* Wait for timestamp */
+} WaitStmt;
+
#endif /* PARSENODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index b1184c2d15..dd22e358b9 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -243,6 +243,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
+PG_KEYWORD("lsn", LSN, UNRESERVED_KEYWORD)
PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)
PG_KEYWORD("materialized", MATERIALIZED, UNRESERVED_KEYWORD)
@@ -404,6 +405,7 @@ PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD)
PG_KEYWORD("then", THEN, RESERVED_KEYWORD)
PG_KEYWORD("ties", TIES, UNRESERVED_KEYWORD)
PG_KEYWORD("time", TIME, COL_NAME_KEYWORD)
+PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD)
PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD)
PG_KEYWORD("to", TO, RESERVED_KEYWORD)
PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD)
@@ -444,6 +446,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD)
PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)
diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h
index d28145a50d..0e36f1049e 100644
--- a/src/include/tcop/cmdtaglist.h
+++ b/src/include/tcop/cmdtaglist.h
@@ -216,3 +216,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false)
PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false)
PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true)
PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false)
+PG_CMDTAG(CMDTAG_WAIT, "WAIT FOR", false, false, false)
diff --git a/src/test/recovery/t/018_waitfor.pl b/src/test/recovery/t/018_waitfor.pl
new file mode 100644
index 0000000000..6817431e9c
--- /dev/null
+++ b/src/test/recovery/t/018_waitfor.pl
@@ -0,0 +1,64 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+
+# And some content
+$node_master->safe_psql('postgres',
+ "CREATE TABLE tab_int AS SELECT generate_series(1, 10) AS a");
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create streaming standby from backup
+my $node_standby = get_new_node('standby');
+my $delay = 4;
+$node_standby->init_from_backup($node_master, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf(
+ 'postgresql.conf', qq(
+recovery_min_apply_delay = '${delay}s'
+));
+$node_standby->start;
+
+# Make new content on master and check its presence in standby depending
+# on the delay applied above. Before doing the insertion, get the
+# current timestamp that will be used as a comparison base. Even on slow
+# machines, this allows to have a predictable behavior when comparing the
+# delay between data insertion moment on master and replay time on standby.
+my $master_insert_time = time();
+$node_master->safe_psql('postgres',
+ "INSERT INTO tab_int VALUES (generate_series(11, 20))");
+
+# Now wait for replay to complete on standby. We're done waiting when the
+# standby has replayed up to the previously saved master LSN.
+my $until_lsn =
+ $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+$node_master->safe_psql('postgres',
+ "INSERT INTO tab_int VALUES (generate_series(21, 30))");
+
+# Check that waitlsn is able to setup infinite waiting loop and exit
+# it without timeouts.
+$node_standby->safe_psql('postgres',
+ "WAIT FOR LSN '$until_lsn'", 't')
+ or die "standby never caught up";
+
+# Check that waitlsn can return result immediately with NOWAIT.
+$node_standby->poll_query_until('postgres',
+ "WAIT FOR LSN '$until_lsn' TIMEOUT 1", 't')
+ or die "standby never caught up";
+
+# This test is successful if and only if the LSN has been applied with at least
+# the configured apply delay.
+my $time_waited = time() - $master_insert_time;
+ok($time_waited >= $delay,"standby applies WAL only after replication delay");