Add some fixes and rebase.
--
Ivan Kartyshov
Postgres Professional: www.postgrespro.com
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 4a42999b18..657a217e27 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -188,6 +188,7 @@ Complete list of usable sgml source files in this directory.
<!ENTITY update SYSTEM "update.sgml">
<!ENTITY vacuum SYSTEM "vacuum.sgml">
<!ENTITY values SYSTEM "values.sgml">
+<!ENTITY wait SYSTEM "wait.sgml">
<!-- applications and utilities -->
<!ENTITY clusterdb SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index 016b021487..a2794763b1 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,13 +21,16 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_event</replaceable>
<phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
READ WRITE | READ ONLY
[ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_event</replaceable> is:</phrase>
+ AFTER <replaceable class="parameter">lsn_value</replaceable> [ WITHIN number_of_milliseconds ]
</synopsis>
</refsynopsisdiv>
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index 74ccd7e345..46a3bcf1a8 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,13 +21,16 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_event</replaceable>
<phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
READ WRITE | READ ONLY
[ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_event</replaceable> is:</phrase>
+ AFTER <replaceable class="parameter">lsn_value</replaceable> [ WITHIN number_of_milliseconds ]
</synopsis>
</refsynopsisdiv>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index aa94f6adf6..04e17620dd 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -216,6 +216,7 @@
&update;
&vacuum;
&values;
+ &wait;
</reference>
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 1b48d7171a..8ab86a83b5 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
#include "backup/basebackup.h"
#include "catalog/pg_control.h"
#include "commands/tablespace.h"
+#include "commands/wait.h"
#include "common/file_utils.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -1784,6 +1785,15 @@ PerformWalRecovery(void)
break;
}
+ /*
+ * If we replayed an LSN that someone was waiting for,
+ * set latches in shared memory array to notify the waiter.
+ */
+ if (XLogRecoveryCtl->lastReplayedEndRecPtr >= GetMinWait())
+ {
+ WaitSetLatch(XLogRecoveryCtl->lastReplayedEndRecPtr);
+ }
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91..d8f6965d8c 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
vacuum.o \
vacuumparallel.o \
variable.o \
- view.o
+ view.o \
+ wait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index 6dd00a4abd..3f06dc5341 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -50,4 +50,5 @@ backend_sources += files(
'vacuumparallel.c',
'variable.c',
'view.c',
+ 'wait.c',
)
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 0000000000..880f141cf5
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,338 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ * Implements WAIT FOR, which allows waiting for events such as
+ * time passing or LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ * src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <float.h>
+#include <math.h>
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_type.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/backendid.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void AddEvent(XLogRecPtr lsn_to_wait);
+static void DeleteEvent(void);
+
+/* Shared memory structure */
+typedef struct
+{
+ int backend_maxid;
+ pg_atomic_uint64 min_lsn;
+ slock_t mutex;
+ XLogRecPtr waited_lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static volatile WaitState *state;
+
+/* Add the event of the current backend to the shared memory array */
+static void
+AddEvent(XLogRecPtr lsn_to_wait)
+{
+ SpinLockAcquire(&state->mutex);
+ if (state->backend_maxid < MyBackendId)
+ state->backend_maxid = MyBackendId;
+
+ state->waited_lsn[MyBackendId] = lsn_to_wait;
+
+ if (lsn_to_wait < pg_atomic_read_u64(&state->min_lsn))
+ pg_atomic_write_u64(&state->min_lsn,lsn_to_wait);
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete event of the current backend from the shared memory array.
+ *
+ * TODO: Consider state cleanup on backend failure.
+ * Check:
+ * 1) nomal|smart|fast|immediate stop
+ * 2) SIGKILL and SIGTERM
+ */
+static void
+DeleteEvent(void)
+{
+ int i;
+ XLogRecPtr lsn_to_delete = state->waited_lsn[MyBackendId];
+
+ state->waited_lsn[MyBackendId] = InvalidXLogRecPtr;
+
+ SpinLockAcquire(&state->mutex);
+
+ /* If we need to choose the next min_lsn, update state->min_lsn */
+ if (pg_atomic_read_u64(&state->min_lsn) == lsn_to_delete)
+ {
+ pg_atomic_write_u64(&state->min_lsn,PG_UINT64_MAX);
+ for (i = 2; i <= state->backend_maxid; i++)
+ if (state->waited_lsn[i] != InvalidXLogRecPtr &&
+ state->waited_lsn[i] < pg_atomic_read_u64(&state->min_lsn))
+ pg_atomic_write_u64(&state->min_lsn,state->waited_lsn[i]);
+ }
+
+ if (state->backend_maxid == MyBackendId)
+ for (i = (MyBackendId); i >= 2; i--)
+ if (state->waited_lsn[i] != InvalidXLogRecPtr)
+ {
+ state->backend_maxid = i;
+ break;
+ }
+
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(WaitState, waited_lsn);
+ size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+ return size;
+}
+
+/* Init array of events in shared memory */
+void
+WaitShmemInit(void)
+{
+ bool found;
+ uint32 i;
+
+ state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+ WaitShmemSize(),
+ &found);
+ if (!found)
+ {
+ SpinLockInit(&state->mutex);
+
+ for (i = 0; i < (MaxBackends + 1); i++)
+ state->waited_lsn[i] = InvalidXLogRecPtr;
+
+ state->backend_maxid = 0;
+ pg_atomic_init_u64(&state->min_lsn,PG_UINT64_MAX);
+ }
+}
+
+/* Set all latches in shared memory to signal that new LSN has been replayed */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+ uint32 i;
+ int backend_maxid;
+ PGPROC *backend;
+
+ SpinLockAcquire(&state->mutex);
+ backend_maxid = state->backend_maxid;
+ SpinLockRelease(&state->mutex);
+
+ for (i = 2; i <= backend_maxid; i++)
+ {
+ backend = BackendIdGetProc(i);
+ if (state->waited_lsn[i] != 0)
+ {
+ if (backend && state->waited_lsn[i] <= cur_lsn)
+ SetLatch(&backend->procLatch);
+ }
+ }
+}
+
+/* Get minimal LSN that will be next */
+XLogRecPtr
+GetMinWait(void)
+{
+ return pg_atomic_read_u64(&state->min_lsn);
+}
+
+/*
+ * On WAIT use MyLatch to wait till LSN is replayed,
+ * postmaster dies or timeout happens.
+ */
+int
+WaitUtility(XLogRecPtr lsn, const float8 secs)
+{
+ XLogRecPtr cur_lsn = GetXLogReplayRecPtr(NULL);
+ int latch_events;
+ float8 endtime;
+ uint res = 0;
+
+ if (!RecoveryInProgress())
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("Work only in standby mode")));
+ return false;
+ }
+
+#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
+ endtime = GetNowFloat() + secs;
+
+ latch_events = WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+ if (lsn != InvalidXLogRecPtr)
+ {
+ /* Just check if we reached */
+ if (lsn < cur_lsn || secs < 0)
+ return (lsn < cur_lsn);
+
+ latch_events |= WL_LATCH_SET;
+ AddEvent(lsn);
+ }
+ else if (!secs)
+ return 1;
+
+ for (;;)
+ {
+ int rc;
+ float8 delay = 0;
+ long delay_ms;
+
+ /* If LSN has been replayed */
+ if (lsn && lsn <= cur_lsn)
+ break;
+
+ if (secs > 0)
+ delay = endtime - GetNowFloat();
+ else if (secs == 0)
+ /*
+ * If we wait forever, then 1 minute timeout to check
+ * for Interupts.
+ */
+ delay = 60;
+
+ if (delay > 0.0)
+ delay_ms = (long) ceil(delay * 1000.0);
+ else
+ break;
+
+ /*
+ * If received an interruption from CHECK_FOR_INTERRUPTS,
+ * then delete the current event from array.
+ */
+ if (InterruptPending)
+ {
+ if (lsn != InvalidXLogRecPtr)
+ DeleteEvent();
+ ProcessInterrupts();
+ }
+
+ /* If postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ rc = WaitLatch(MyLatch, latch_events, delay_ms,
+ WAIT_EVENT_CLIENT_READ);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+
+ if (lsn && rc & WL_LATCH_SET)
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+ }
+
+ if (lsn != InvalidXLogRecPtr)
+ DeleteEvent();
+
+ if (lsn != InvalidXLogRecPtr && lsn > cur_lsn)
+ elog(NOTICE,"LSN is not reached. Try to increase wait time.");
+ else
+ res = 1;
+
+ return res;
+}
+
+/*
+ * Get the amount of seconds left till the specified time.
+ */
+float8
+WaitTimeResolve(Const *time)
+{
+ int ret;
+ float8 val;
+ Oid types[] = { time->consttype };
+ Datum values[] = { time->constvalue };
+ char nulls[] = { " " };
+ Datum result;
+ bool isnull;
+
+ SPI_connect();
+
+ if (time->consttype == 1083)
+ ret = SPI_execute_with_args("select extract (epoch from ($1 - now()::time))",
+ 1, types, values, nulls, true, 0);
+ else if (time->consttype == 1266)
+ ret = SPI_execute_with_args("select extract (epoch from (timezone('UTC',$1)::time - timezone('UTC', now()::timetz)::time))",
+ 1, types, values, nulls, true, 0);
+ else
+ ret = SPI_execute_with_args("select extract (epoch from ($1 - now()))",
+ 1, types, values, nulls, true, 0);
+
+ Assert(ret >= 0);
+ result = SPI_getbinval(SPI_tuptable->vals[0],
+ SPI_tuptable->tupdesc,
+ 1, &isnull);
+
+ Assert(!isnull);
+ val = DatumGetFloat8(result);
+
+ elog(INFO, "time: %f", val);
+
+ SPI_finish();
+ return val;
+}
+
+/* Implementation of WAIT FOR */
+int
+WaitMain(WaitStmt *stmt, DestReceiver *dest)
+{
+ TupleDesc tupdesc;
+ TupOutputState *tstate;
+ XLogRecPtr lsn = InvalidXLogRecPtr;
+ int res = 0;
+
+ lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+ CStringGetDatum(stmt->lsn)));
+ res = WaitUtility(lsn, stmt->delay);
+
+ /* Need a tuple descriptor representing a single TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+ /* Prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+ /* Send it */
+ do_text_output_oneline(tstate, res?"t":"f");
+ end_tup_output(tstate);
+ return res;
+}
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 06fc8ce98b..f1d86ff435 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -405,7 +405,6 @@ transformStmt(ParseState *pstate, Node *parseTree)
result = transformCallStmt(pstate,
(CallStmt *) parseTree);
break;
-
default:
/*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 3460fea56b..4587d5a23d 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -312,7 +312,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SecLabelStmt SelectStmt TransactionStmt TransactionStmtLegacy TruncateStmt
UnlistenStmt UpdateStmt VacuumStmt
VariableResetStmt VariableSetStmt VariableShowStmt
- ViewStmt CheckPointStmt CreateConversionStmt
+ ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
DeallocateStmt PrepareStmt ExecuteStmt
DropOwnedStmt ReassignOwnedStmt
AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -645,6 +645,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <partboundspec> PartitionBoundSpec
%type <list> hash_partbound
%type <defelt> hash_partbound_elem
+%type <ival> wait_time
+%type <node> wait_for
%type <node> json_format_clause
json_format_clause_opt
@@ -1103,6 +1105,7 @@ stmt:
| VariableSetStmt
| VariableShowStmt
| ViewStmt
+ | WaitStmt
| /*EMPTY*/
{ $$ = NULL; }
;
@@ -10927,12 +10930,13 @@ TransactionStmt:
n->location = -1;
$$ = (Node *) n;
}
- | START TRANSACTION transaction_mode_list_or_empty
+ | START TRANSACTION transaction_mode_list_or_empty wait_for
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_START;
n->options = $3;
+ n->wait = $4;
n->location = -1;
$$ = (Node *) n;
}
@@ -11031,12 +11035,13 @@ TransactionStmt:
;
TransactionStmtLegacy:
- BEGIN_P opt_transaction transaction_mode_list_or_empty
+ BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_BEGIN;
n->options = $3;
+ n->wait = $4;
n->location = -1;
$$ = (Node *) n;
}
@@ -15868,6 +15873,37 @@ xml_passing_mech:
| BY VALUE_P
;
+/*****************************************************************************
+ *
+ * QUERY:
+ * AFTER LSN_value [WITHIN delay timestamp]
+ *
+ *****************************************************************************/
+WaitStmt:
+ AFTER Sconst wait_time
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn = $2;
+ n->delay = $3;
+ $$ = (Node *)n;
+ }
+ ;
+wait_for:
+ AFTER Sconst wait_time
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn = $2;
+ n->delay = $3;
+ $$ = (Node *)n;
+ }
+ | /* EMPTY */ { $$ = NULL; }
+ ;
+
+wait_time:
+ WITHIN Iconst { $$ = $2; }
+ | /* EMPTY */ { $$ = 0; }
+ ;
+
/*
* Aggregate decoration clauses
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index e5119ed55d..ff6a3db6a6 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -26,6 +26,7 @@
#include "access/xlogprefetcher.h"
#include "access/xlogrecovery.h"
#include "commands/async.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -149,6 +150,7 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, AsyncShmemSize());
size = add_size(size, StatsShmemSize());
size = add_size(size, WaitEventExtensionShmemSize());
+ size = add_size(size, WaitShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -241,6 +243,11 @@ CreateSharedMemoryAndSemaphores(void)
/* Initialize subsystems */
CreateOrAttachShmemStructs();
+ /*
+ * Init array of Latches in shared memory for wait lsn
+ */
+ WaitShmemInit();
+
#ifdef EXEC_BACKEND
/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 8de821f960..e884828011 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -15,6 +15,7 @@
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include <float.h>
#include "access/htup_details.h"
#include "access/reloptions.h"
@@ -59,6 +60,7 @@
#include "commands/user.h"
#include "commands/vacuum.h"
#include "commands/view.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
@@ -72,6 +74,9 @@
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+#include "utils/pg_lsn.h"
/* Hook for plugins to get control in ProcessUtility() */
ProcessUtility_hook_type ProcessUtility_hook = NULL;
@@ -272,6 +277,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
case T_LoadStmt:
case T_PrepareStmt:
case T_UnlistenStmt:
+ case T_WaitStmt:
case T_VariableSetStmt:
{
/*
@@ -612,6 +618,11 @@ standard_ProcessUtility(PlannedStmt *pstmt,
case TRANS_STMT_START:
{
ListCell *lc;
+ WaitStmt *waitstmt = (WaitStmt *) stmt->wait;
+
+ /* If needed to WAIT FOR something but failed */
+ if (stmt->wait && WaitMain(waitstmt, dest) == 0)
+ break;
BeginTransactionBlock();
foreach(lc, stmt->options)
@@ -1069,6 +1080,13 @@ standard_ProcessUtility(PlannedStmt *pstmt,
break;
}
+ case T_WaitStmt:
+ {
+ WaitStmt *stmt = (WaitStmt *) parsetree;
+ WaitMain(stmt, dest);
+ break;
+ }
+
default:
/* All other statement types have event trigger support */
ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2847,6 +2865,10 @@ CreateCommandTag(Node *parsetree)
tag = CMDTAG_NOTIFY;
break;
+ case T_WaitStmt:
+ tag = CMDTAG_WAIT;
+ break;
+
case T_ListenStmt:
tag = CMDTAG_LISTEN;
break;
@@ -3495,6 +3517,10 @@ GetCommandLogLevel(Node *parsetree)
lev = LOGSTMT_ALL;
break;
+ case T_WaitStmt:
+ lev = LOGSTMT_ALL;
+ break;
+
case T_ListenStmt:
lev = LOGSTMT_ALL;
break;
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 0000000000..0270160d44
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ * prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2016, Regents of PostgresPRO
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+
+extern int WaitUtility(XLogRecPtr lsn, const float8 delay);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWait(void);
+extern float8 WaitTimeResolve(Const *time);
+extern int WaitMain(WaitStmt *stmt, DestReceiver *dest);
+
+#endif /* WAIT_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index b3181f34ae..98379ae97e 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3525,6 +3525,7 @@ typedef struct TransactionStmt
/* for two-phase-commit related commands */
char *gid pg_node_attr(query_jumble_ignore);
bool chain; /* AND CHAIN option */
+ Node *wait; /* wait lsn clause */
/* token location, or -1 if unknown */
int location pg_node_attr(query_jumble_location);
} TransactionStmt;
@@ -4076,4 +4077,16 @@ typedef struct DropSubscriptionStmt
DropBehavior behavior; /* RESTRICT or CASCADE behavior */
} DropSubscriptionStmt;
+/* ----------------------
+ * AFTER Statement + AFTER clause of BEGIN statement
+ * ----------------------
+ */
+
+typedef struct WaitStmt
+{
+ NodeTag type;
+ char *lsn; /* LSN */
+ int delay; /* TIMEOUT */
+} WaitStmt;
+
#endif /* PARSENODES_H */
diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h
index 7fdcec6dd9..567139963a 100644
--- a/src/include/tcop/cmdtaglist.h
+++ b/src/include/tcop/cmdtaglist.h
@@ -217,3 +217,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false)
PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false)
PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true)
PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false)
+PG_CMDTAG(CMDTAG_WAIT, "AFTER", false, false, false)
diff --git a/src/test/recovery/t/040_begin_after.pl b/src/test/recovery/t/040_begin_after.pl
new file mode 100644
index 0000000000..b22e63603c
--- /dev/null
+++ b/src/test/recovery/t/040_begin_after.pl
@@ -0,0 +1,86 @@
+# Checks waiting for lsn on standby AFTER
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that AFTER works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that AFTER is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "BEGIN AFTER '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq 0, "standby reached the same LSN as primary AFTER");
+
+
+
+#===============================================================================
+# TODO: remove this test if we remove the standalone "AFTER" command
+#===============================================================================
+# We need to check that AFTER works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, AFTER which ensures a max value of 40.
+# Inside the transaction, AFTER that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+ 'postgres', qq[
+ BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ AFTER '$lsn3';
+ SELECT max(a) FROM wait_test;
+ BEGIN AFTER '$lsn4';
+ SELECT pg_last_wal_replay_lsn();
+ SELECT max(a) FROM wait_test;
+ COMMIT;
+]);
+
+# Make sure that we indeed reach primary's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "AFTER works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after AFTER.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to AFTER");
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/src/test/recovery/t/041_after.pl b/src/test/recovery/t/041_after.pl
new file mode 100644
index 0000000000..480ba6b33b
--- /dev/null
+++ b/src/test/recovery/t/041_after.pl
@@ -0,0 +1,99 @@
+# Checks waiting for lsn on standby AFTER
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 2;
+$node_standby->init_from_backup($node_primary, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that AFTER works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that AFTER is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "AFTER '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq 0, "standby reached the same LSN as primary AFTER");
+
+
+
+# Check that timeouts work on their own and let us wait for specified time (1s)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+
+# Now, check that timeouts work as expected when waiting for LSN
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+ "AFTER '$lsn2' WITHIN 1");
+ok($reached_lsn eq "f", "AFTER doesn't reach LSN if given too little wait time");
+
+
+
+# We need to check that WAIT works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, AFTER which ensures a max value of 40.
+# Inside the transaction, AFTER that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+ 'postgres', qq[
+ AFTER '$lsn3';
+ BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
+ SELECT max(a) FROM wait_test;
+ AFTER '$lsn4';
+ SELECT pg_last_wal_replay_lsn();
+ SELECT max(a) FROM wait_test;
+ COMMIT;
+]);
+
+# Make sure that we indeed reach primary's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "AFTER works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after AFTER.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to AFTER");
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 4a42999b18..657a217e27 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -188,6 +188,7 @@ Complete list of usable sgml source files in this directory.
<!ENTITY update SYSTEM "update.sgml">
<!ENTITY vacuum SYSTEM "vacuum.sgml">
<!ENTITY values SYSTEM "values.sgml">
+<!ENTITY wait SYSTEM "wait.sgml">
<!-- applications and utilities -->
<!ENTITY clusterdb SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index 016b021487..b3af16c09f 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,13 +21,21 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_for_event</replaceable>
<phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
READ WRITE | READ ONLY
[ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_for_event</replaceable> is:</phrase>
+ WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>and <replaceable class="parameter">event</replaceable> is one of:</phrase>
+ LSN lsn_value
+ TIMEOUT number_of_milliseconds
+ timestamp
</synopsis>
</refsynopsisdiv>
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index 74ccd7e345..1b54ed2084 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,13 +21,21 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_for_event</replaceable>
<phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
READ WRITE | READ ONLY
[ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_for_event</replaceable> is:</phrase>
+ WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>and <replaceable class="parameter">event</replaceable> is one of:</phrase>
+ LSN lsn_value
+ TIMEOUT number_of_milliseconds
+ timestamp
</synopsis>
</refsynopsisdiv>
diff --git a/doc/src/sgml/ref/wait.sgml b/doc/src/sgml/ref/wait.sgml
new file mode 100644
index 0000000000..26cae3ad85
--- /dev/null
+++ b/doc/src/sgml/ref/wait.sgml
@@ -0,0 +1,146 @@
+<!--
+doc/src/sgml/ref/wait.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="sql-wait">
+ <indexterm zone="sql-wait">
+ <primary>WAIT FOR</primary>
+ </indexterm>
+
+ <refmeta>
+ <refentrytitle>WAIT FOR</refentrytitle>
+ <manvolnum>7</manvolnum>
+ <refmiscinfo>SQL - Language Statements</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>WAIT FOR</refname>
+ <refpurpose>wait for the target <acronym>LSN</acronym> to be replayed or for specified time to pass</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>where <replaceable class="parameter">event</replaceable> is one of:</phrase>
+ LSN value
+ TIMEOUT number_of_milliseconds
+ timestamp
+
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>'
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>', TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR ALL LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>, TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR ANY LSN '<replaceable class="parameter">lsn_number</replaceable>', TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>Description</title>
+
+ <para>
+ <command>WAIT FOR</command> provides a simple interprocess communication
+ mechanism to wait for timestamp, timeout or the target log sequence number
+ (<acronym>LSN</acronym>) on standby in <productname>PostgreSQL</productname>
+ databases with master-standby asynchronous replication. When run with the
+ <replaceable>LSN</replaceable> option, the <command>WAIT FOR</command>
+ command waits for the specified <acronym>LSN</acronym> to be replayed.
+ If no timestamp or timeout was specified, wait time is unlimited.
+ Waiting can be interrupted using <literal>Ctrl+C</literal>, or
+ by shutting down the <literal>postgres</literal> server.
+ </para>
+
+ </refsect1>
+
+ <refsect1>
+ <title>Parameters</title>
+
+ <variablelist>
+ <varlistentry>
+ <term><replaceable class="parameter">LSN</replaceable></term>
+ <listitem>
+ <para>
+ Specify the target log sequence number to wait for.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>TIMEOUT <replaceable class="parameter">wait_timeout</replaceable></term>
+ <listitem>
+ <para>
+ Limit the time interval to wait for the LSN to be replayed.
+ The specified <replaceable>wait_timeout</replaceable> must be an integer
+ and is measured in milliseconds.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>UNTIL TIMESTAMP <replaceable class="parameter">wait_time</replaceable></term>
+ <listitem>
+ <para>
+ Limit the time to wait for the LSN to be replayed.
+ The specified <replaceable>wait_time</replaceable> must be timestamp.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </refsect1>
+
+ <refsect1>
+ <title>Examples</title>
+
+ <para>
+ Run <literal>WAIT FOR</literal> from <application>psql</application>,
+ limiting wait time to 10000 milliseconds:
+
+<screen>
+WAIT FOR LSN '0/3F07A6B1' TIMEOUT 10000;
+NOTICE: LSN is not reached. Try to increase wait time.
+LSN reached
+-------------
+ f
+(1 row)
+</screen>
+ </para>
+
+ <para>
+ Wait until the specified <acronym>LSN</acronym> is replayed:
+<screen>
+WAIT FOR LSN '0/3F07A611';
+LSN reached
+-------------
+ t
+(1 row)
+</screen>
+ </para>
+
+ <para>
+ Limit <acronym>LSN</acronym> wait time to 500000 milliseconds,
+ and then cancel the command if <acronym>LSN</acronym> was not reached:
+<screen>
+WAIT FOR LSN '0/3F0FF791' TIMEOUT 500000;
+^CCancel request sent
+NOTICE: LSN is not reached. Try to increase wait time.
+ERROR: canceling statement due to user request
+ LSN reached
+-------------
+ f
+(1 row)
+</screen>
+</para>
+ </refsect1>
+
+ <refsect1>
+ <title>Compatibility</title>
+
+ <para>
+ There is no <command>WAIT FOR</command> statement in the SQL
+ standard.
+ </para>
+ </refsect1>
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index aa94f6adf6..04e17620dd 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -216,6 +216,7 @@
&update;
&vacuum;
&values;
+ &wait;
</reference>
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 1b48d7171a..d27105b7a3 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
#include "backup/basebackup.h"
#include "catalog/pg_control.h"
#include "commands/tablespace.h"
+#include "commands/wait.h"
#include "common/file_utils.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -1784,6 +1785,13 @@ PerformWalRecovery(void)
break;
}
+ /*
+ * If we replayed an LSN that someone was waiting for,
+ * set latches in shared memory array to notify the waiter.
+ */
+ if (XLogRecoveryCtl->lastReplayedEndRecPtr >= GetMinWait())
+ WaitSetLatch(XLogRecoveryCtl->lastReplayedEndRecPtr);
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91..d8f6965d8c 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
vacuum.o \
vacuumparallel.o \
variable.o \
- view.o
+ view.o \
+ wait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index 6dd00a4abd..3f06dc5341 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -50,4 +50,5 @@ backend_sources += files(
'vacuumparallel.c',
'variable.c',
'view.c',
+ 'wait.c',
)
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 0000000000..83f069b8dc
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,403 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ * Implements WAIT FOR, which allows waiting for events such as
+ * time passing or LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ * src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <float.h>
+#include <math.h>
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_type.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/backendid.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void AddEvent(XLogRecPtr lsn_to_wait);
+static void DeleteEvent(void);
+
+/* Shared memory structure */
+typedef struct
+{
+ int backend_maxid;
+ pg_atomic_uint64 min_lsn;
+ slock_t mutex;
+ XLogRecPtr waited_lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static volatile WaitState *state;
+
+/* Add the event of the current backend to the shared memory array */
+static void
+AddEvent(XLogRecPtr lsn_to_wait)
+{
+ SpinLockAcquire(&state->mutex);
+ if (state->backend_maxid < MyBackendId)
+ state->backend_maxid = MyBackendId;
+
+ state->waited_lsn[MyBackendId] = lsn_to_wait;
+
+ if (lsn_to_wait < pg_atomic_read_u64(&state->min_lsn))
+ pg_atomic_write_u64(&state->min_lsn,lsn_to_wait);
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete event of the current backend from the shared memory array.
+ *
+ * TODO: Consider state cleanup on backend failure.
+ * Check:
+ * 1) nomal|smart|fast|immediate stop
+ * 2) SIGKILL and SIGTERM
+ */
+static void
+DeleteEvent(void)
+{
+ int i;
+ XLogRecPtr lsn_to_delete = state->waited_lsn[MyBackendId];
+
+ state->waited_lsn[MyBackendId] = InvalidXLogRecPtr;
+
+ SpinLockAcquire(&state->mutex);
+
+ /* If we need to choose the next min_lsn, update state->min_lsn */
+ if (pg_atomic_read_u64(&state->min_lsn) == lsn_to_delete)
+ {
+ pg_atomic_write_u64(&state->min_lsn,PG_UINT64_MAX);
+ for (i = 2; i <= state->backend_maxid; i++)
+ if (state->waited_lsn[i] != InvalidXLogRecPtr &&
+ state->waited_lsn[i] < pg_atomic_read_u64(&state->min_lsn))
+ pg_atomic_write_u64(&state->min_lsn,state->waited_lsn[i]);
+ }
+
+ if (state->backend_maxid == MyBackendId)
+ for (i = (MyBackendId); i >= 2; i--)
+ if (state->waited_lsn[i] != InvalidXLogRecPtr)
+ {
+ state->backend_maxid = i;
+ break;
+ }
+
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(WaitState, waited_lsn);
+ size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+ return size;
+}
+
+/* Init array of events in shared memory */
+void
+WaitShmemInit(void)
+{
+ bool found;
+ uint32 i;
+
+ state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+ WaitShmemSize(),
+ &found);
+ if (!found)
+ {
+ SpinLockInit(&state->mutex);
+
+ for (i = 0; i < (MaxBackends + 1); i++)
+ state->waited_lsn[i] = InvalidXLogRecPtr;
+
+ state->backend_maxid = 0;
+ pg_atomic_init_u64(&state->min_lsn,PG_UINT64_MAX);
+ }
+}
+
+/* Set all latches in shared memory to signal that new LSN has been replayed */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+ uint32 i;
+ int backend_maxid;
+ PGPROC *backend;
+
+ SpinLockAcquire(&state->mutex);
+ backend_maxid = state->backend_maxid;
+ SpinLockRelease(&state->mutex);
+
+ for (i = 2; i <= backend_maxid; i++)
+ {
+ backend = BackendIdGetProc(i);
+ if (state->waited_lsn[i] != 0)
+ {
+ if (backend && state->waited_lsn[i] <= cur_lsn)
+ SetLatch(&backend->procLatch);
+ }
+ }
+}
+
+/* Get minimal LSN that will be next */
+XLogRecPtr
+GetMinWait(void)
+{
+ return pg_atomic_read_u64(&state->min_lsn);
+}
+
+/*
+ * On WAIT use MyLatch to wait till LSN is replayed,
+ * postmaster dies or timeout happens.
+ */
+int
+WaitUtility(XLogRecPtr lsn, const float8 secs)
+{
+ XLogRecPtr cur_lsn = GetXLogReplayRecPtr(NULL);
+ int latch_events;
+ float8 endtime;
+ uint res = 0;
+
+ if (!RecoveryInProgress())
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("Work only in standby mode")));
+ return false;
+ }
+
+#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
+ endtime = GetNowFloat() + secs;
+
+ latch_events = WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+ if (lsn != InvalidXLogRecPtr)
+ {
+ /* Just check if we reached */
+ if (lsn < cur_lsn || secs < 0)
+ return (lsn < cur_lsn);
+
+ latch_events |= WL_LATCH_SET;
+ AddEvent(lsn);
+ }
+ else if (!secs)
+ return 1;
+
+ for (;;)
+ {
+ int rc;
+ float8 delay = 0;
+ long delay_ms;
+
+ /* If LSN has been replayed */
+ if (lsn && lsn <= cur_lsn)
+ break;
+
+ if (secs > 0)
+ delay = endtime - GetNowFloat();
+ else if (secs == 0)
+ /*
+ * If we wait forever, then 1 minute timeout to check
+ * for Interupts.
+ */
+ delay = 60;
+
+ if (delay > 0.0)
+ delay_ms = (long) ceil(delay * 1000.0);
+ else
+ break;
+
+ /*
+ * If received an interruption from CHECK_FOR_INTERRUPTS,
+ * then delete the current event from array.
+ */
+ if (InterruptPending)
+ {
+ if (lsn != InvalidXLogRecPtr)
+ DeleteEvent();
+ ProcessInterrupts();
+ }
+
+ /* If postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ rc = WaitLatch(MyLatch, latch_events, delay_ms,
+ WAIT_EVENT_CLIENT_READ);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+
+ if (lsn && rc & WL_LATCH_SET)
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+ }
+
+ if (lsn != InvalidXLogRecPtr)
+ DeleteEvent();
+
+ if (lsn != InvalidXLogRecPtr && lsn > cur_lsn)
+ elog(NOTICE,"LSN is not reached. Try to increase wait time.");
+ else
+ res = 1;
+
+ return res;
+}
+
+/*
+ * Get the amount of seconds left till the specified time.
+ */
+float8
+WaitTimeResolve(Const *time)
+{
+ int ret;
+ float8 val;
+ Oid types[] = { time->consttype };
+ Datum values[] = { time->constvalue };
+ char nulls[] = { " " };
+ Datum result;
+ bool isnull;
+
+ SPI_connect();
+
+ if (time->consttype == 1083)
+ ret = SPI_execute_with_args("select extract (epoch from ($1 - now()::time))",
+ 1, types, values, nulls, true, 0);
+ else if (time->consttype == 1266)
+ ret = SPI_execute_with_args("select extract (epoch from (timezone('UTC',$1)::time - timezone('UTC', now()::timetz)::time))",
+ 1, types, values, nulls, true, 0);
+ else
+ ret = SPI_execute_with_args("select extract (epoch from ($1 - now()))",
+ 1, types, values, nulls, true, 0);
+
+ Assert(ret >= 0);
+ result = SPI_getbinval(SPI_tuptable->vals[0],
+ SPI_tuptable->tupdesc,
+ 1, &isnull);
+
+ Assert(!isnull);
+ val = DatumGetFloat8(result);
+
+ elog(INFO, "time: %f", val);
+
+ SPI_finish();
+ return val;
+}
+
+/* Implementation of WAIT FOR */
+int
+WaitMain(WaitStmt *stmt, DestReceiver *dest)
+{
+ ListCell *events;
+ TupleDesc tupdesc;
+ TupOutputState *tstate;
+ float8 delay = 0;
+ float8 final_delay = 0;
+ XLogRecPtr lsn = InvalidXLogRecPtr;
+ XLogRecPtr final_lsn = InvalidXLogRecPtr;
+ bool has_lsn = false;
+ bool wait_forever = true;
+ int res = 0;
+
+ if (stmt->strategy == WAIT_FOR_ANY)
+ {
+ /* Prepare to find minimum LSN and delay */
+ final_delay = DBL_MAX;
+ final_lsn = PG_UINT64_MAX;
+ }
+
+ /* Extract options from the statement node tree */
+ foreach(events, stmt->events)
+ {
+ WaitStmt *event = (WaitStmt *) lfirst(events);
+
+ /* LSN to wait for */
+ if (event->lsn)
+ {
+ has_lsn = true;
+ lsn = DatumGetLSN(
+ DirectFunctionCall1(pg_lsn_in,
+ CStringGetDatum(event->lsn)));
+
+ /*
+ * When waiting for ALL, select max LSN to wait for.
+ * When waiting for ANY, select min LSN to wait for.
+ */
+ if ((stmt->strategy == WAIT_FOR_ALL && final_lsn <= lsn) ||
+ (stmt->strategy == WAIT_FOR_ANY && final_lsn > lsn))
+ {
+ final_lsn = lsn;
+ }
+ }
+
+ /* Time delay to wait for */
+ if (event->time || event->delay)
+ {
+ wait_forever = false;
+
+ if (event->delay)
+ delay = event->delay / 1000.0;
+
+ if (event->time)
+ {
+ Const *time = (Const *) event->time;
+ delay = WaitTimeResolve(time);
+ }
+
+ /*
+ * When waiting for ALL, select max delay to wait for.
+ * When waiting for ANY, select min delay to wait for.
+ */
+ if ((stmt->strategy == WAIT_FOR_ALL && final_delay <= delay) ||
+ (stmt->strategy == WAIT_FOR_ANY && final_delay > delay))
+ {
+ final_delay = delay;
+ }
+ }
+ }
+ if (wait_forever)
+ final_delay = 0;
+ if (!has_lsn)
+ final_lsn = InvalidXLogRecPtr;
+
+ res = WaitUtility(final_lsn, final_delay);
+
+ /* Need a tuple descriptor representing a single TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+ /* Prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+ /* Send it */
+ do_text_output_oneline(tstate, res?"t":"f");
+ end_tup_output(tstate);
+ return res;
+}
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 06fc8ce98b..51de487431 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -85,6 +85,7 @@ static Query *transformCreateTableAsStmt(ParseState *pstate,
CreateTableAsStmt *stmt);
static Query *transformCallStmt(ParseState *pstate,
CallStmt *stmt);
+static void transformWaitForStmt(ParseState *pstate, WaitStmt *stmt);
static void transformLockingClause(ParseState *pstate, Query *qry,
LockingClause *lc, bool pushedDown);
#ifdef RAW_EXPRESSION_COVERAGE_TEST
@@ -405,7 +406,21 @@ transformStmt(ParseState *pstate, Node *parseTree)
result = transformCallStmt(pstate,
(CallStmt *) parseTree);
break;
-
+ case T_WaitStmt:
+ transformWaitForStmt(pstate, (WaitStmt *) parseTree);
+ result = makeNode(Query);
+ result->commandType = CMD_UTILITY;
+ result->utilityStmt = (Node *) parseTree;
+ break;
+ case T_TransactionStmt:
+ {
+ TransactionStmt *stmt = (TransactionStmt *) parseTree;
+ if ((stmt->kind == TRANS_STMT_BEGIN ||
+ stmt->kind == TRANS_STMT_START) && stmt->wait)
+ transformWaitForStmt(pstate, (WaitStmt *) stmt->wait);
+ }
+ /* no break here - we want to fall through to the default */
+ /* FALLTHROUGH */
default:
/*
@@ -3559,6 +3574,23 @@ applyLockingClause(Query *qry, Index rtindex,
qry->rowMarks = lappend(qry->rowMarks, rc);
}
+/*
+ * transformWaitForStmt -
+ * transform the WAIT FOR clause of the BEGIN statement
+ * transform the WAIT FOR statement (TODO: remove this line if we don't keep it)
+ */
+static void
+transformWaitForStmt(ParseState *pstate, WaitStmt *stmt)
+{
+ ListCell *events;
+
+ foreach(events, stmt->events)
+ {
+ WaitStmt *event = (WaitStmt *) lfirst(events);
+ event->time = transformExpr(pstate, event->time, EXPR_KIND_OTHER);
+ }
+}
+
/*
* Coverage testing for raw_expression_tree_walker().
*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 3460fea56b..343e5ba856 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -312,7 +312,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SecLabelStmt SelectStmt TransactionStmt TransactionStmtLegacy TruncateStmt
UnlistenStmt UpdateStmt VacuumStmt
VariableResetStmt VariableSetStmt VariableShowStmt
- ViewStmt CheckPointStmt CreateConversionStmt
+ ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
DeallocateStmt PrepareStmt ExecuteStmt
DropOwnedStmt ReassignOwnedStmt
AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -537,7 +537,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <node> case_expr case_arg when_clause case_default
%type <list> when_clause_list
%type <node> opt_search_clause opt_cycle_clause
-%type <ival> sub_type opt_materialized
+%type <ival> sub_type wait_strategy opt_materialized
%type <node> NumericOnly
%type <list> NumericOnly_list
%type <alias> alias_clause opt_alias_clause opt_alias_clause_for_join_using
@@ -645,6 +645,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <partboundspec> PartitionBoundSpec
%type <list> hash_partbound
%type <defelt> hash_partbound_elem
+%type <list> wait_list
+%type <node> WaitEvent wait_for
%type <node> json_format_clause
json_format_clause_opt
@@ -730,7 +732,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
- LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+ LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN
MAPPING MATCH MATCHED MATERIALIZED MAXVALUE MERGE METHOD
MINUTE_P MINVALUE MODE MONTH_P MOVE
@@ -764,7 +766,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P SYSTEM_USER
TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
- TIES TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
+ TIES TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
TREAT TRIGGER TRIM TRUE_P
TRUNCATE TRUSTED TYPE_P TYPES_P
@@ -774,7 +776,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
VERBOSE VERSION_P VIEW VIEWS VOLATILE
- WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+ WAIT WHEN WHERE WHITESPACE_P WINDOW
+ WITH WITHIN WITHOUT WORK WRAPPER WRITE
XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -1103,6 +1106,7 @@ stmt:
| VariableSetStmt
| VariableShowStmt
| ViewStmt
+ | WaitStmt
| /*EMPTY*/
{ $$ = NULL; }
;
@@ -10927,12 +10931,13 @@ TransactionStmt:
n->location = -1;
$$ = (Node *) n;
}
- | START TRANSACTION transaction_mode_list_or_empty
+ | START TRANSACTION transaction_mode_list_or_empty wait_for
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_START;
n->options = $3;
+ n->wait = $4;
n->location = -1;
$$ = (Node *) n;
}
@@ -11031,12 +11036,13 @@ TransactionStmt:
;
TransactionStmtLegacy:
- BEGIN_P opt_transaction transaction_mode_list_or_empty
+ BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_BEGIN;
n->options = $3;
+ n->wait = $4;
n->location = -1;
$$ = (Node *) n;
}
@@ -15868,6 +15874,74 @@ xml_passing_mech:
| BY VALUE_P
;
+/*****************************************************************************
+ *
+ * QUERY:
+ * WAIT FOR <event> [, <event> ...]
+ * event is one of:
+ * LSN value
+ * TIMEOUT delay
+ * timestamp
+ *
+ *****************************************************************************/
+WaitStmt:
+ WAIT FOR wait_strategy wait_list
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->strategy = $3;
+ n->events = $4;
+ $$ = (Node *)n;
+ }
+ ;
+wait_for:
+ WAIT FOR wait_strategy wait_list
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->strategy = $3;
+ n->events = $4;
+ $$ = (Node *)n;
+ }
+ | /* EMPTY */ { $$ = NULL; };
+
+wait_strategy:
+ ALL { $$ = WAIT_FOR_ALL; }
+ | ANY { $$ = WAIT_FOR_ANY; }
+ | /* EMPTY */ { $$ = WAIT_FOR_ALL; }
+ ;
+
+wait_list:
+ WaitEvent { $$ = list_make1($1); }
+ | wait_list ',' WaitEvent { $$ = lappend($1, $3); }
+ | wait_list WaitEvent { $$ = lappend($1, $2); }
+ ;
+
+WaitEvent:
+ LSN Sconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn = $2;
+ n->delay = 0;
+ n->time = NULL;
+ $$ = (Node *)n;
+ }
+ | TIMEOUT Iconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn = NULL;
+ n->delay = $2;
+ n->time = NULL;
+ $$ = (Node *)n;
+ }
+ | ConstDatetime Sconst
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn = NULL;
+ n->delay = 0;
+ n->time = makeStringConstCast($2, @2, $1);
+ $$ = (Node *)n;
+ }
+ ;
+
/*
* Aggregate decoration clauses
@@ -17272,6 +17346,7 @@ unreserved_keyword:
| LOCK_P
| LOCKED
| LOGGED
+ | LSN
| MAPPING
| MATCH
| MATCHED
@@ -17404,6 +17479,7 @@ unreserved_keyword:
| TEMPORARY
| TEXT_P
| TIES
+ | TIMEOUT
| TRANSACTION
| TRANSFORM
| TRIGGER
@@ -17430,6 +17506,7 @@ unreserved_keyword:
| VIEW
| VIEWS
| VOLATILE
+ | WAIT
| WHITESPACE_P
| WITHIN
| WITHOUT
@@ -17862,6 +17939,7 @@ bare_label_keyword:
| LOCK_P
| LOCKED
| LOGGED
+ | LSN
| MAPPING
| MATCH
| MATCHED
@@ -18024,6 +18102,7 @@ bare_label_keyword:
| THEN
| TIES
| TIME
+ | TIMEOUT
| TIMESTAMP
| TRAILING
| TRANSACTION
@@ -18061,6 +18140,7 @@ bare_label_keyword:
| VIEW
| VIEWS
| VOLATILE
+ | WAIT
| WHEN
| WHITESPACE_P
| WORK
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index e5119ed55d..06d165feea 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -26,6 +26,7 @@
#include "access/xlogprefetcher.h"
#include "access/xlogrecovery.h"
#include "commands/async.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -149,6 +150,7 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, AsyncShmemSize());
size = add_size(size, StatsShmemSize());
size = add_size(size, WaitEventExtensionShmemSize());
+ size = add_size(size, WaitShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -351,6 +353,11 @@ CreateOrAttachShmemStructs(void)
AsyncShmemInit();
StatsShmemInit();
WaitEventExtensionShmemInit();
+
+ /*
+ * Init array of Latches in shared memory for WAIT
+ */
+ WaitShmemInit();
}
/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 8de821f960..e884828011 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -15,6 +15,7 @@
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include <float.h>
#include "access/htup_details.h"
#include "access/reloptions.h"
@@ -59,6 +60,7 @@
#include "commands/user.h"
#include "commands/vacuum.h"
#include "commands/view.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
@@ -72,6 +74,9 @@
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+#include "utils/pg_lsn.h"
/* Hook for plugins to get control in ProcessUtility() */
ProcessUtility_hook_type ProcessUtility_hook = NULL;
@@ -272,6 +277,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
case T_LoadStmt:
case T_PrepareStmt:
case T_UnlistenStmt:
+ case T_WaitStmt:
case T_VariableSetStmt:
{
/*
@@ -612,6 +618,11 @@ standard_ProcessUtility(PlannedStmt *pstmt,
case TRANS_STMT_START:
{
ListCell *lc;
+ WaitStmt *waitstmt = (WaitStmt *) stmt->wait;
+
+ /* If needed to WAIT FOR something but failed */
+ if (stmt->wait && WaitMain(waitstmt, dest) == 0)
+ break;
BeginTransactionBlock();
foreach(lc, stmt->options)
@@ -1069,6 +1080,13 @@ standard_ProcessUtility(PlannedStmt *pstmt,
break;
}
+ case T_WaitStmt:
+ {
+ WaitStmt *stmt = (WaitStmt *) parsetree;
+ WaitMain(stmt, dest);
+ break;
+ }
+
default:
/* All other statement types have event trigger support */
ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2847,6 +2865,10 @@ CreateCommandTag(Node *parsetree)
tag = CMDTAG_NOTIFY;
break;
+ case T_WaitStmt:
+ tag = CMDTAG_WAIT;
+ break;
+
case T_ListenStmt:
tag = CMDTAG_LISTEN;
break;
@@ -3495,6 +3517,10 @@ GetCommandLogLevel(Node *parsetree)
lev = LOGSTMT_ALL;
break;
+ case T_WaitStmt:
+ lev = LOGSTMT_ALL;
+ break;
+
case T_ListenStmt:
lev = LOGSTMT_ALL;
break;
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 0000000000..0270160d44
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ * prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2016, Regents of PostgresPRO
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+
+extern int WaitUtility(XLogRecPtr lsn, const float8 delay);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWait(void);
+extern float8 WaitTimeResolve(Const *time);
+extern int WaitMain(WaitStmt *stmt, DestReceiver *dest);
+
+#endif /* WAIT_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index b3181f34ae..67ef9eb8ad 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3525,6 +3525,7 @@ typedef struct TransactionStmt
/* for two-phase-commit related commands */
char *gid pg_node_attr(query_jumble_ignore);
bool chain; /* AND CHAIN option */
+ Node *wait; /* WAIT clause: list of events to wait for */
/* token location, or -1 if unknown */
int location pg_node_attr(query_jumble_location);
} TransactionStmt;
@@ -4076,4 +4077,26 @@ typedef struct DropSubscriptionStmt
DropBehavior behavior; /* RESTRICT or CASCADE behavior */
} DropSubscriptionStmt;
+/* ----------------------
+ * WAIT FOR Statement + WAIT FOR clause of BEGIN statement
+ * TODO: if we only pick one, remove the other
+ * ----------------------
+ */
+
+typedef enum WaitForStrategy
+{
+ WAIT_FOR_ANY = 0,
+ WAIT_FOR_ALL
+} WaitForStrategy;
+
+typedef struct WaitStmt
+{
+ NodeTag type;
+ WaitForStrategy strategy;
+ List *events; /* used as a pointer to the next WAIT event */
+ char *lsn; /* WAIT FOR LSN */
+ int delay; /* WAIT FOR TIMEOUT */
+ Node *time; /* WAIT FOR TIMESTAMP or TIME */
+} WaitStmt;
+
#endif /* PARSENODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 2331acac09..ae7b65526b 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -260,6 +260,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("lsn", LSN, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("matched", MATCHED, UNRESERVED_KEYWORD, BARE_LABEL)
@@ -433,6 +434,7 @@ PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("then", THEN, RESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("ties", TIES, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("time", TIME, COL_NAME_KEYWORD, BARE_LABEL)
+PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD, BARE_LABEL)
PG_KEYWORD("to", TO, RESERVED_KEYWORD, AS_LABEL)
PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD, BARE_LABEL)
@@ -473,6 +475,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("when", WHEN, RESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("where", WHERE, RESERVED_KEYWORD, AS_LABEL)
PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h
index 7fdcec6dd9..295cd6ff3a 100644
--- a/src/include/tcop/cmdtaglist.h
+++ b/src/include/tcop/cmdtaglist.h
@@ -217,3 +217,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false)
PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false)
PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true)
PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false)
+PG_CMDTAG(CMDTAG_WAIT, "WAIT FOR", false, false, false)
diff --git a/src/test/recovery/t/040_begin_wait.pl b/src/test/recovery/t/040_begin_wait.pl
new file mode 100644
index 0000000000..f1e5b5b23d
--- /dev/null
+++ b/src/test/recovery/t/040_begin_wait.pl
@@ -0,0 +1,146 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that WAIT FOR LSN works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that WAIT is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "BEGIN WAIT FOR LSN '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq 0, "standby reached the same LSN as primary after WAIT");
+
+
+
+# Check that timeouts work on their own and let us wait for specified time (1s)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+# While we're at it, also make sure that the syntax with commas works fine and
+# that by default we use WAIT FOR ALL strategy, which means waiting for max time
+$node_standby->safe_psql('postgres',
+ "WAIT FOR TIMEOUT $one_second, TIMESTAMP '$current_time'");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $one_second, "WAIT FOR TIMEOUT waits for enough time");
+
+# Now, check that timeouts work as expected when waiting for LSN
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+ "BEGIN WAIT FOR LSN '$lsn2' TIMEOUT 1");
+ok($reached_lsn eq "f", "WAIT doesn't reach LSN if given too little wait time");
+
+
+#===============================================================================
+# TODO: remove this test if we remove the standalone "WAIT FOR" command
+#===============================================================================
+# We need to check that WAIT works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, wait for LSN which ensures a max value of 40.
+# Inside the transaction, wait for LSN that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+ 'postgres', qq[
+ BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ WAIT FOR LSN '$lsn3';
+ SELECT max(a) FROM wait_test;
+ BEGIN WAIT FOR LSN '$lsn4';
+ SELECT pg_last_wal_replay_lsn();
+ SELECT max(a) FROM wait_test;
+ COMMIT;
+]);
+
+# Make sure that we indeed reach primary's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "WAIT FOR LSN works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after WAIT.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to WAIT");
+
+
+
+# Get multiple LSNs for testing WAIT FOR ANY / WAIT FOR ALL
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(51, 60))");
+my $lsn5 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(61, 70000))");
+my $lsn6 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(61, 800000))");
+my $lsn7 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Check that WAIT FOR ANY works fine
+$node_standby->safe_psql('postgres',
+ "BEGIN WAIT FOR ANY LSN '$lsn5' LSN '$lsn6' LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn5'::pg_lsn)");
+ok($compare_lsns ge 0,
+ "WAIT FOR ANY makes us reach at least the minimum LSN from the list");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+# TODO: Could this somehow fail due to the machine being very fast at applying LSN?
+ok($compare_lsns lt 0,
+ "WAIT FOR ANY didn't make us reach the maximum LSN from the list");
+
+# Check that WAIT FOR ALL works fine
+$node_standby->safe_psql('postgres',
+ "BEGIN WAIT FOR ALL LSN '$lsn5', LSN '$lsn6', LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+ok($compare_lsns eq 0,
+ "WAIT FOR ALL makes us reach the maximum LSN from the list");
+
+
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/src/test/recovery/t/041_wait.pl b/src/test/recovery/t/041_wait.pl
new file mode 100644
index 0000000000..6f9d549416
--- /dev/null
+++ b/src/test/recovery/t/041_wait.pl
@@ -0,0 +1,145 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that WAIT FOR LSN works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that WAIT is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "WAIT FOR LSN '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq 0, "standby reached the same LSN as primary after WAIT");
+
+
+
+# Check that timeouts work on their own and let us wait for specified time (1s)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+# While we're at it, also make sure that the syntax with commas works fine and
+# that by default we use WAIT FOR ALL strategy, which means waiting for max time
+$node_standby->safe_psql('postgres',
+ "WAIT FOR TIMEOUT $one_second, TIMESTAMP '$current_time'");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $one_second, "WAIT FOR TIMEOUT waits for enough time");
+
+# Now, check that timeouts work as expected when waiting for LSN
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+ "WAIT FOR LSN '$lsn2' TIMEOUT 1");
+ok($reached_lsn eq "f", "WAIT doesn't reach LSN if given too little wait time");
+
+
+
+# We need to check that WAIT works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, wait for LSN which ensures a max value of 40.
+# Inside the transaction, wait for LSN that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '$lsn3';
+ BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
+ SELECT max(a) FROM wait_test;
+ WAIT FOR LSN '$lsn4';
+ SELECT pg_last_wal_replay_lsn();
+ SELECT max(a) FROM wait_test;
+ COMMIT;
+]);
+
+# Make sure that we indeed reach primary's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "WAIT FOR LSN works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after WAIT.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to WAIT");
+
+
+
+# Get multiple LSNs for testing WAIT FOR ANY / WAIT FOR ALL
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(51, 60))");
+my $lsn5 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(61, 70000))");
+my $lsn6 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(61, 800000))");
+my $lsn7 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Check that WAIT FOR ANY works fine
+$node_standby->safe_psql('postgres',
+ "WAIT FOR ANY LSN '$lsn5' LSN '$lsn6' LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn5'::pg_lsn)");
+ok($compare_lsns ge 0,
+ "WAIT FOR ANY makes us reach at least the minimum LSN from the list");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+# TODO: Could this somehow fail due to the machine being very fast at applying LSN?
+ok($compare_lsns lt 0,
+ "WAIT FOR ANY didn't make us reach the maximum LSN from the list");
+
+# Check that WAIT FOR ALL works fine
+$node_standby->safe_psql('postgres',
+ "WAIT FOR ALL LSN '$lsn5', LSN '$lsn6', LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+ok($compare_lsns eq 0,
+ "WAIT FOR ALL makes us reach the maximum LSN from the list");
+
+
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();