On 2020-04-07 00:58, Kartyshov Ivan wrote:
Ok, here is a new version of patch with single LSN and TIMEOUT.
I had a look at the code and did some more code cleanup, with Ivan's
permission.
This is what I did:
- Removed "WAIT FOR" command tag from cmdtaglist.h and renamed WaitStmt
to WaitClause (since there's no standalone WAIT FOR command anymore)
- Added _copyWaitClause() and _equalWaitClause()
- Removed unused #include-s from utility.c
- Adjusted tests and documentation
- Fixed/added some code comments
I have a couple of questions about WaitUtility() though:
- When waiting forever (due to not specifying a timeout), isn't 60
seconds too long of an interval to check for interrupts?
- If we did specify a timeout, it might be a very long one. In this
case, shouldn't we also make sure to wake up sometimes to check for
interrupts?
- Is it OK that specifying timeout = 0 (BEGIN WAIT FOR LSN ... TIMEOUT
0) is the same as not specifying timeout at all?
--
Anna Akenteva
Postgres Professional:
The Russian Postgres Company
http://www.postgrespro.com
diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index c23bbfb4e71..19a33d7d8fb 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] [ WAIT FOR LSN <replaceable class="parameter">lsn_value</replaceable> [TIMEOUT <replaceable class="parameter">number_of_milliseconds</replaceable> ] ]
<phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
@@ -63,6 +63,17 @@ BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</
<xref linkend="sql-set-transaction"/>
was executed.
</para>
+
+ <para>
+ The <literal>WAIT FOR</literal> clause allows to wait for the target log
+ sequence number (<acronym>LSN</acronym>) to be replayed on standby before
+ starting the transaction in <productname>PostgreSQL</productname> databases
+ with master-standby asynchronous replication. Wait time can be limited by
+ specifying a timeout, which is measured in milliseconds and must be a positive
+ integer. If <acronym>LSN</acronym> was not reached before timeout, transaction
+ doesn't begin. Waiting can be interrupted using <literal>Ctrl+C</literal>, or
+ by shutting down the <literal>postgres</literal> server.
+ </para>
</refsect1>
<refsect1>
@@ -146,6 +157,10 @@ BEGIN;
different purpose in embedded SQL. You are advised to be careful
about the transaction semantics when porting database applications.
</para>
+
+ <para>
+ There is no <command>WAIT FOR</command> clause in the SQL standard.
+ </para>
</refsect1>
<refsect1>
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index d6cd1d41779..c9f70d2709a 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] [ WAIT FOR LSN <replaceable class="parameter">lsn_value</replaceable> [TIMEOUT <replaceable class="parameter">number_of_milliseconds</replaceable> ] ]
<phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
@@ -40,6 +40,17 @@ START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable
characteristics, as if <xref linkend="sql-set-transaction"/> was executed. This is the same
as the <xref linkend="sql-begin"/> command.
</para>
+
+ <para>
+ The <literal>WAIT FOR</literal> clause allows to wait for the target log
+ sequence number (<acronym>LSN</acronym>) to be replayed on standby before
+ starting the transaction in <productname>PostgreSQL</productname> databases
+ with master-standby asynchronous replication. Wait time can be limited by
+ specifying a timeout, which is measured in milliseconds and must be a positive
+ integer. If <acronym>LSN</acronym> was not reached before timeout, transaction
+ doesn't begin. Waiting can be interrupted using <literal>Ctrl+C</literal>, or
+ by shutting down the <literal>postgres</literal> server.
+ </para>
</refsect1>
<refsect1>
@@ -78,6 +89,10 @@ START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable
omitted.
</para>
+ <para>
+ There is no <command>WAIT FOR</command> clause in the SQL standard.
+ </para>
+
<para>
See also the compatibility section of <xref linkend="sql-set-transaction"/>.
</para>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index abf954ba392..d2856c88943 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -42,6 +42,7 @@
#include "catalog/pg_database.h"
#include "commands/progress.h"
#include "commands/tablespace.h"
+#include "commands/wait.h"
#include "common/controldata_utils.h"
#include "executor/instrument.h"
#include "miscadmin.h"
@@ -7332,6 +7333,15 @@ StartupXLOG(void)
break;
}
+ /*
+ * If we replayed an LSN that someone was waiting for,
+ * set latches in shared memory array to notify the waiter.
+ */
+ if (XLogCtl->lastReplayedEndRecPtr >= GetMinWait())
+ {
+ WaitSetLatch(XLogCtl->lastReplayedEndRecPtr);
+ }
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogreader, LOG, false);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index d4815d3ce65..9b310926c12 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -57,6 +57,7 @@ OBJS = \
user.o \
vacuum.o \
variable.o \
- view.o
+ view.o \
+ wait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 00000000000..d7dbf3b725f
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,291 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ * Implements WAIT FOR, which allows waiting for events such as
+ * LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ * src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <float.h>
+#include <math.h>
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlogdefs.h"
+#include "access/xlog.h"
+#include "catalog/pg_type.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/backendid.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void AddEvent(XLogRecPtr lsn_to_wait);
+static void DeleteEvent(void);
+
+/* Shared memory structure */
+typedef struct
+{
+ int backend_maxid;
+ XLogRecPtr min_lsn;
+ slock_t mutex;
+ /* LSNs that different backends are waiting */
+ XLogRecPtr lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static volatile WaitState *state;
+
+/*
+ * Add the wait event of the current backend to shared memory array
+ */
+static void
+AddEvent(XLogRecPtr lsn_to_wait)
+{
+ SpinLockAcquire(&state->mutex);
+ if (state->backend_maxid < MyBackendId)
+ state->backend_maxid = MyBackendId;
+
+ state->lsn[MyBackendId] = lsn_to_wait;
+
+ if (lsn_to_wait < state->min_lsn)
+ state->min_lsn = lsn_to_wait;
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete wait event of the current backend from the shared memory array.
+ *
+ * TODO: Consider state cleanup on backend failure.
+ * Check:
+ * 1) nomal|smart|fast|immediate stop
+ * 2) SIGKILL and SIGTERM
+ */
+static void
+DeleteEvent(void)
+{
+ int i;
+ XLogRecPtr lsn_to_delete = state->lsn[MyBackendId];
+
+ state->lsn[MyBackendId] = InvalidXLogRecPtr;
+
+ SpinLockAcquire(&state->mutex);
+
+ /* If we are deleting the minimal LSN, then choose the next min_lsn */
+ if (state->min_lsn == lsn_to_delete)
+ {
+ state->min_lsn = PG_UINT64_MAX;
+ for (i = 2; i <= state->backend_maxid; i++)
+ if (state->lsn[i] != InvalidXLogRecPtr &&
+ state->lsn[i] < state->min_lsn)
+ state->min_lsn = state->lsn[i];
+ }
+
+ /* If deleting from the end of the array, shorten the array's used part */
+ if (state->backend_maxid == MyBackendId)
+ for (i = (MyBackendId); i >= 2; i--)
+ if (state->lsn[i] != InvalidXLogRecPtr)
+ {
+ state->backend_maxid = i;
+ break;
+ }
+
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(WaitState, lsn);
+ size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+ return size;
+}
+
+/*
+ * Initialize an array of events to wait for in shared memory
+ */
+void
+WaitShmemInit(void)
+{
+ bool found;
+ uint32 i;
+
+ state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+ WaitShmemSize(),
+ &found);
+ if (!found)
+ {
+ SpinLockInit(&state->mutex);
+
+ for (i = 0; i < (MaxBackends + 1); i++)
+ state->lsn[i] = InvalidXLogRecPtr;
+
+ state->backend_maxid = 0;
+ state->min_lsn = PG_UINT64_MAX;
+ }
+}
+
+/*
+ * Set latches in shared memory to signal that new LSN has been replayed
+ */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+ uint32 i;
+ int backend_maxid;
+ PGPROC *backend;
+
+ SpinLockAcquire(&state->mutex);
+ backend_maxid = state->backend_maxid;
+ SpinLockRelease(&state->mutex);
+
+ for (i = 2; i <= backend_maxid; i++)
+ {
+ backend = BackendIdGetProc(i);
+
+ if (backend && state->lsn[i] != 0 &&
+ state->lsn[i] <= cur_lsn)
+ {
+ SetLatch(&backend->procLatch);
+ }
+ }
+}
+
+/*
+ * Get minimal LSN that someone waits for
+ */
+XLogRecPtr
+GetMinWait(void)
+{
+ return state->min_lsn;
+}
+
+/*
+ * On WAIT use a latch to wait till LSN is replayed,
+ * postmaster dies or timeout happens.
+ * Returns 1 if LSN was reached and 0 otherwise.
+ */
+int
+WaitUtility(XLogRecPtr target_lsn, const float8 secs)
+{
+ XLogRecPtr cur_lsn = GetXLogReplayRecPtr(NULL);
+ int latch_events;
+ float8 endtime;
+ int res = 0;
+
+#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
+ endtime = GetNowFloat() + secs;
+
+ latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+ /* Check if we already reached the needed LSN */
+ if (cur_lsn >= target_lsn)
+ return 1;
+
+ AddEvent(target_lsn);
+
+ for (;;)
+ {
+ int rc;
+ float8 delay = 0;
+ long delay_ms;
+
+ if (secs > 0.0)
+ delay = endtime - GetNowFloat();
+ else
+ /* If we wait forever, use 1 min timeout to check for interrupts */
+ delay = 60;
+
+ if (delay > 0.0)
+ delay_ms = (long) ceil(delay * 1000.0);
+ else
+ break;
+
+ /*
+ * If received an interruption from CHECK_FOR_INTERRUPTS,
+ * then delete the current event from array.
+ */
+ if (InterruptPending)
+ {
+ DeleteEvent();
+ ProcessInterrupts();
+ }
+
+ /* If postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ rc = WaitLatch(MyLatch, latch_events, delay_ms,
+ WAIT_EVENT_CLIENT_READ);
+
+ ResetLatch(MyLatch);
+
+ if (rc & WL_LATCH_SET)
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+
+ /* If LSN has been replayed */
+ if (target_lsn <= cur_lsn)
+ break;
+ }
+
+ DeleteEvent();
+
+ if (cur_lsn < target_lsn)
+ elog(NOTICE,"LSN is not reached. Try to increase wait time.");
+ else
+ res = 1;
+
+ return res;
+}
+
+/*
+ * Implementation of WAIT FOR
+ */
+int
+WaitMain(WaitClause *stmt, DestReceiver *dest)
+{
+ TupleDesc tupdesc;
+ TupOutputState *tstate;
+ int res = 0;
+
+ res = WaitUtility(DatumGetLSN(
+ DirectFunctionCall1(pg_lsn_in,CStringGetDatum(stmt->lsn))),
+ stmt->timeout / 1000.0);
+
+ /* Need a tuple descriptor representing a single TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+
+ /* Prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+ /* Send the result */
+ do_text_output_oneline(tstate, res ? "t" : "f");
+ end_tup_output(tstate);
+ return res;
+}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index f9d86859ee7..b00c772b71e 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3741,10 +3741,22 @@ _copyTransactionStmt(const TransactionStmt *from)
COPY_STRING_FIELD(savepoint_name);
COPY_STRING_FIELD(gid);
COPY_SCALAR_FIELD(chain);
+ COPY_NODE_FIELD(wait);
return newnode;
}
+static WaitClause *
+_copyWaitClause(const WaitClause *from)
+{
+ WaitClause *newnode = makeNode(WaitClause);
+
+ COPY_STRING_FIELD(lsn);
+ COPY_SCALAR_FIELD(timeout);
+
+ return newnode;
+};
+
static CompositeTypeStmt *
_copyCompositeTypeStmt(const CompositeTypeStmt *from)
{
@@ -5332,6 +5344,9 @@ copyObjectImpl(const void *from)
case T_TransactionStmt:
retval = _copyTransactionStmt(from);
break;
+ case T_WaitClause:
+ retval = _copyWaitClause(from);
+ break;
case T_CompositeTypeStmt:
retval = _copyCompositeTypeStmt(from);
break;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index e8e781834a5..c1b622ea301 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1539,6 +1539,16 @@ _equalTransactionStmt(const TransactionStmt *a, const TransactionStmt *b)
COMPARE_STRING_FIELD(savepoint_name);
COMPARE_STRING_FIELD(gid);
COMPARE_SCALAR_FIELD(chain);
+ COMPARE_NODE_FIELD(wait);
+
+ return true;
+}
+
+static bool
+_equalWaitClause(const WaitClause *a, const WaitClause *b)
+{
+ COMPARE_STRING_FIELD(lsn);
+ COMPARE_SCALAR_FIELD(timeout);
return true;
}
@@ -3389,6 +3399,9 @@ equal(const void *a, const void *b)
case T_TransactionStmt:
retval = _equalTransactionStmt(a, b);
break;
+ case T_WaitClause:
+ retval = _equalWaitClause(a, b);
+ break;
case T_CompositeTypeStmt:
retval = _equalCompositeTypeStmt(a, b);
break;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 35ed8c0d538..8eb54d4ab05 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2778,6 +2778,28 @@ _outDefElem(StringInfo str, const DefElem *node)
WRITE_LOCATION_FIELD(location);
}
+static void
+_outTransactionStmt(StringInfo str, const TransactionStmt *node)
+{
+ WRITE_NODE_TYPE("TRANSACTIONSTMT");
+
+ WRITE_STRING_FIELD(savepoint_name);
+ WRITE_STRING_FIELD(gid);
+ WRITE_NODE_FIELD(options);
+ WRITE_BOOL_FIELD(chain);
+ WRITE_ENUM_FIELD(kind, TransactionStmtKind);
+ WRITE_NODE_FIELD(wait);
+}
+
+static void
+_outWaitClause(StringInfo str, const WaitClause *node)
+{
+ WRITE_NODE_TYPE("WAITCLAUSE");
+
+ WRITE_STRING_FIELD(lsn);
+ WRITE_UINT_FIELD(timeout);
+}
+
static void
_outTableLikeClause(StringInfo str, const TableLikeClause *node)
{
@@ -4327,6 +4349,12 @@ outNode(StringInfo str, const void *obj)
case T_PartitionRangeDatum:
_outPartitionRangeDatum(str, obj);
break;
+ case T_TransactionStmt:
+ _outTransactionStmt(str, obj);
+ break;
+ case T_WaitClause:
+ _outWaitClause(str, obj);
+ break;
default:
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 3449c26bd11..a2f40285c6e 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -592,6 +592,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <partboundspec> PartitionBoundSpec
%type <list> hash_partbound
%type <defelt> hash_partbound_elem
+%type <ival> wait_time
+%type <node> wait_for
/*
* Non-keyword token types. These are hard-wired into the "flex" lexer.
@@ -661,7 +663,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
- LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+ LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN
MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
@@ -692,7 +694,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P
TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
- TIES TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
+ TIES TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
TREAT TRIGGER TRIM TRUE_P
TRUNCATE TRUSTED TYPE_P TYPES_P
@@ -702,7 +704,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
VERBOSE VERSION_P VIEW VIEWS VOLATILE
- WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+ WAIT WHEN WHERE WHITESPACE_P WINDOW
+ WITH WITHIN WITHOUT WORK WRAPPER WRITE
XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -9946,18 +9949,20 @@ TransactionStmt:
n->chain = $3;
$$ = (Node *)n;
}
- | BEGIN_P opt_transaction transaction_mode_list_or_empty
+ | BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_BEGIN;
n->options = $3;
+ n->wait = $4;
$$ = (Node *)n;
}
- | START TRANSACTION transaction_mode_list_or_empty
+ | START TRANSACTION transaction_mode_list_or_empty wait_for
{
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_START;
n->options = $3;
+ n->wait = $4;
$$ = (Node *)n;
}
| COMMIT opt_transaction opt_transaction_chain
@@ -14187,6 +14192,25 @@ xml_passing_mech:
| BY VALUE_P
;
+/*
+ * WAIT FOR clause of BEGIN and START TRANSACTION statements
+ */
+wait_for:
+ WAIT FOR LSN Sconst wait_time
+ {
+ WaitClause *n = makeNode(WaitClause);
+ n->lsn = $4;
+ n->timeout = $5;
+ $$ = (Node *)n;
+ }
+ | /* EMPTY */ { $$ = NULL; }
+ ;
+
+wait_time:
+ TIMEOUT Iconst { $$ = $2; }
+ | /* EMPTY */ { $$ = 0; }
+ ;
+
/*
* Aggregate decoration clauses
@@ -15338,6 +15362,7 @@ unreserved_keyword:
| LOCK_P
| LOCKED
| LOGGED
+ | LSN
| MAPPING
| MATCH
| MATERIALIZED
@@ -15465,6 +15490,7 @@ unreserved_keyword:
| TEMPORARY
| TEXT_P
| TIES
+ | TIMEOUT
| TRANSACTION
| TRANSFORM
| TRIGGER
@@ -15491,6 +15517,7 @@ unreserved_keyword:
| VIEW
| VIEWS
| VOLATILE
+ | WAIT
| WHITESPACE_P
| WITHIN
| WITHOUT
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 427b0d59cde..bb8af349808 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
#include "access/subtrans.h"
#include "access/twophase.h"
#include "commands/async.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -147,6 +148,7 @@ CreateSharedMemoryAndSemaphores(void)
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
+ size = add_size(size, WaitShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -264,6 +266,11 @@ CreateSharedMemoryAndSemaphores(void)
SyncScanShmemInit();
AsyncShmemInit();
+ /*
+ * Init array of Latches in shared memory for WAIT
+ */
+ WaitShmemInit();
+
#ifdef EXEC_BACKEND
/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index b1f7f6e2d01..e0ca38106d4 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -57,6 +57,7 @@
#include "commands/user.h"
#include "commands/vacuum.h"
#include "commands/view.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
@@ -591,6 +592,11 @@ standard_ProcessUtility(PlannedStmt *pstmt,
case TRANS_STMT_START:
{
ListCell *lc;
+ WaitClause *waitstmt = (WaitClause *) stmt->wait;
+
+ /* If needed to WAIT FOR something but failed */
+ if (stmt->wait && WaitMain(waitstmt, dest) == 0)
+ break;
BeginTransactionBlock();
foreach(lc, stmt->options)
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 00000000000..4df5ed5638f
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,25 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ * prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+
+extern int WaitUtility(XLogRecPtr lsn, const float8 delay);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWait(void);
+extern int WaitMain(WaitClause *stmt, DestReceiver *dest);
+
+#endif /* WAIT_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 50b1ba51863..c37663a28bd 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -492,6 +492,7 @@ typedef enum NodeTag
T_StartReplicationCmd,
T_TimeLineHistoryCmd,
T_SQLCmd,
+ T_WaitClause,
/*
* TAGS FOR RANDOM OTHER STUFF
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index cd6f1be6435..30f5d7c0916 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1430,6 +1430,17 @@ typedef struct OnConflictClause
int location; /* token location, or -1 if unknown */
} OnConflictClause;
+/*
+ * WaitClause -
+ * representation of WAIT FOR clause for BEGIN and START TRANSACTION.
+ */
+typedef struct WaitClause
+{
+ NodeTag type;
+ char *lsn; /* LSN to wait for */
+ int timeout; /* Number of milliseconds to limit wait time */
+} WaitClause;
+
/*
* CommonTableExpr -
* representation of WITH list element
@@ -3058,6 +3069,7 @@ typedef struct TransactionStmt
char *savepoint_name; /* for savepoint commands */
char *gid; /* for two-phase-commit related commands */
bool chain; /* AND CHAIN option */
+ Node *wait; /* WAIT FOR clause */
} TransactionStmt;
/* ----------------------
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 08f22ce211d..6e1848fe4cc 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -243,6 +243,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
+PG_KEYWORD("lsn", LSN, UNRESERVED_KEYWORD)
PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)
PG_KEYWORD("materialized", MATERIALIZED, UNRESERVED_KEYWORD)
@@ -410,6 +411,7 @@ PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD)
PG_KEYWORD("then", THEN, RESERVED_KEYWORD)
PG_KEYWORD("ties", TIES, UNRESERVED_KEYWORD)
PG_KEYWORD("time", TIME, COL_NAME_KEYWORD)
+PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD)
PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD)
PG_KEYWORD("to", TO, RESERVED_KEYWORD)
PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD)
@@ -450,6 +452,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD)
PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)
diff --git a/src/test/recovery/t/020_begin_wait.pl b/src/test/recovery/t/020_begin_wait.pl
new file mode 100644
index 00000000000..da18ac3761c
--- /dev/null
+++ b/src/test/recovery/t/020_begin_wait.pl
@@ -0,0 +1,71 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+
+# And some content and take a backup
+$node_master->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Using the backup, create a streaming standby with a 1 second delay
+my $node_standby = get_new_node('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_master, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+# Check that timeouts make us wait for the specified time (1s here)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+$node_standby->safe_psql('postgres',
+ "BEGIN WAIT FOR LSN '0/FFFFFFFF' TIMEOUT $one_second");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $one_second, "WAIT FOR TIMEOUT waits for enough time");
+
+
+# Check that timeouts let us stop waiting right away, before reaching target LSN
+$node_master->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+ "BEGIN WAIT FOR LSN '$lsn1' TIMEOUT 1");
+ok($reached_lsn eq "f", "WAIT doesn't reach LSN if given too little wait time");
+
+
+# Check that WAIT FOR works fine and reaches target LSN if given no timeout
+
+# Add data on master, memorize master's last LSN
+$node_master->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Wait for it to appear on replica, memorize replica's last LSN
+$node_standby->safe_psql('postgres',
+ "BEGIN WAIT FOR LSN '$lsn2'");
+$reached_lsn = $node_standby->safe_psql('postgres',
+ "SELECT pg_last_wal_replay_lsn()");
+
+# Make sure that master's and replica's LSNs are the same after WAIT
+my $compare_lsns = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp('$reached_lsn'::pg_lsn, '$lsn2'::pg_lsn)");
+ok($compare_lsns eq 0,
+ "standby reached the same LSN as master before starting transaction");
+
+
+$node_standby->stop;
+$node_master->stop;