Attached please find new version of AS OF patch which allows to specify time travel period.
Older versions outside this period may be reclaimed by autovacuum.
This behavior is controlled by "time_travel_period" parameter.

Zero value of this parameter disables time travel and postgres behaves in standard way. Actually you can still use AS AF construction but there is no warranty that requested versions are not reclaimed and result of query actually belongs to the specified time slice.

Value -1 means infinite history: versions are never reclaimed and autovacuum is disabled.

And positive value of this parameter specifies maximal time travel period in seconds. As in case of disabled time travel, you can specify AS OF timestamp older than this period.
But there is no warranty that requested versions still exist.

--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index eb5bbb5..7acaf30 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -78,6 +78,7 @@ BitmapHeapNext(BitmapHeapScanState *node)
 	ExprContext *econtext;
 	HeapScanDesc scan;
 	TIDBitmap  *tbm;
+	EState	   *estate;
 	TBMIterator *tbmiterator = NULL;
 	TBMSharedIterator *shared_tbmiterator = NULL;
 	TBMIterateResult *tbmres;
@@ -85,11 +86,13 @@ BitmapHeapNext(BitmapHeapScanState *node)
 	TupleTableSlot *slot;
 	ParallelBitmapHeapState *pstate = node->pstate;
 	dsa_area   *dsa = node->ss.ps.state->es_query_dsa;
+	TimestampTz     outerAsofTimestamp = 0;
 
 	/*
 	 * extract necessary information from index scan node
 	 */
 	econtext = node->ss.ps.ps_ExprContext;
+	estate = node->ss.ps.state;
 	slot = node->ss.ss_ScanTupleSlot;
 	scan = node->ss.ss_currentScanDesc;
 	tbm = node->tbm;
@@ -99,6 +102,25 @@ BitmapHeapNext(BitmapHeapScanState *node)
 		shared_tbmiterator = node->shared_tbmiterator;
 	tbmres = node->tbmres;
 
+	outerAsofTimestamp = estate->es_snapshot->asofTimestamp;
+
+	if (node->ss.asofExpr)
+	{
+		if (!node->ss.asofTimestampSet)
+		{
+			Datum		val;
+			bool		isNull;
+
+			val = ExecEvalExprSwitchContext(node->ss.asofExpr,
+											node->ss.ps.ps_ExprContext,
+											&isNull);
+			/* Interpret NULL timestamp as no timestamp */
+			node->ss.asofTimestamp = isNull ? 0 : DatumGetInt64(val);
+			node->ss.asofTimestampSet = true;
+		}
+		estate->es_snapshot->asofTimestamp = node->ss.asofTimestamp;
+	}
+
 	/*
 	 * If we haven't yet performed the underlying index scan, do it, and begin
 	 * the iteration over the bitmap.
@@ -364,11 +386,21 @@ BitmapHeapNext(BitmapHeapScanState *node)
 			}
 		}
 
-		/* OK to return this tuple */
+		/*
+		 * Restore ASOF timestamp for the current snapshot
+		 */
+		estate->es_snapshot->asofTimestamp = outerAsofTimestamp;
+
+	    /* OK to return this tuple */
 		return slot;
 	}
 
 	/*
+	 * Restore ASOF timestamp for the current snapshot
+	 */
+	estate->es_snapshot->asofTimestamp = outerAsofTimestamp;
+
+	/*
 	 * if we get here it means we are at the end of the scan..
 	 */
 	return ExecClearTuple(slot);
@@ -746,6 +778,8 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
 {
 	PlanState  *outerPlan = outerPlanState(node);
 
+	node->ss.asofTimestampSet = false;
+
 	/* rescan to release any page pin */
 	heap_rescan(node->ss.ss_currentScanDesc, NULL);
 
@@ -902,7 +936,8 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
 	 * most cases it's probably not worth working harder than that.
 	 */
 	scanstate->can_skip_fetch = (node->scan.plan.qual == NIL &&
-								 node->scan.plan.targetlist == NIL);
+								 node->scan.plan.targetlist == NIL &&
+								 node->scan.asofTimestamp == NULL);
 
 	/*
 	 * Miscellaneous initialization
@@ -920,6 +955,18 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
 		ExecInitQual(node->bitmapqualorig, (PlanState *) scanstate);
 
 	/*
+	 * Initlialize AS OF expression of any
+	 */
+	if (node->scan.asofTimestamp)
+	{
+		scanstate->ss.asofExpr = ExecInitExpr((Expr *) node->scan.asofTimestamp,
+										   &scanstate->ss.ps);
+		scanstate->ss.asofTimestampSet = false;
+	}
+	else
+		scanstate->ss.asofExpr = NULL;
+
+	/*
 	 * tuple table initialization
 	 */
 	ExecInitResultTupleSlot(estate, &scanstate->ss.ps);
@@ -1052,11 +1099,27 @@ ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
 	ParallelBitmapHeapState *pstate;
 	EState	   *estate = node->ss.ps.state;
 	dsa_area   *dsa = node->ss.ps.state->es_query_dsa;
+	TimestampTz     outerAsofTimestamp = estate->es_snapshot->asofTimestamp;
+	Scan* scan = (Scan*)node->ss.ps.plan;
 
 	/* If there's no DSA, there are no workers; initialize nothing. */
 	if (dsa == NULL)
 		return;
 
+	if (scan->asofTimestamp)
+	{
+		Datum		val;
+		bool		isNull;
+
+		ExprState* asofExpr = ExecInitExpr((Expr *) scan->asofTimestamp,
+									  &node->ss.ps);
+		val = ExecEvalExprSwitchContext(asofExpr,
+										node->ss.ps.ps_ExprContext,
+										&isNull);
+		/* Interpret NULL timestamp as no timestamp */
+		estate->es_snapshot->asofTimestamp = isNull ? 0 : DatumGetInt64(val);
+	}
+
 	pstate = shm_toc_allocate(pcxt->toc, node->pscan_len);
 
 	pstate->tbmiterator = 0;
@@ -1071,6 +1134,8 @@ ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
 	ConditionVariableInit(&pstate->cv);
 	SerializeSnapshot(estate->es_snapshot, pstate->phs_snapshot_data);
 
+	estate->es_snapshot->asofTimestamp = outerAsofTimestamp;
+
 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pstate);
 	node->pstate = pstate;
 }
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index 2ffef23..a0b505c 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -86,7 +86,7 @@ IndexNext(IndexScanState *node)
 	IndexScanDesc scandesc;
 	HeapTuple	tuple;
 	TupleTableSlot *slot;
-
+	TimestampTz outerAsofTimestamp;
 	/*
 	 * extract necessary information from index scan node
 	 */
@@ -104,6 +104,30 @@ IndexNext(IndexScanState *node)
 	econtext = node->ss.ps.ps_ExprContext;
 	slot = node->ss.ss_ScanTupleSlot;
 
+	outerAsofTimestamp = estate->es_snapshot->asofTimestamp;
+
+	if (node->ss.asofExpr)
+	{
+		if (!node->ss.asofTimestampSet)
+		{
+			Datum		val;
+			bool		isNull;
+
+			val = ExecEvalExprSwitchContext(node->ss.asofExpr,
+											node->ss.ps.ps_ExprContext,
+											&isNull);
+			/* Interpret NULL timestamp as no timestamp */
+			if (isNull)
+				node->ss.asofTimestamp = 0;
+			else
+			{
+				node->ss.asofTimestamp = DatumGetInt64(val);
+			}
+			node->ss.asofTimestampSet = true;
+		}
+		estate->es_snapshot->asofTimestamp = node->ss.asofTimestamp;
+	}
+
 	if (scandesc == NULL)
 	{
 		/*
@@ -160,9 +184,17 @@ IndexNext(IndexScanState *node)
 				continue;
 			}
 		}
+		/*
+		 * Restore ASOF timestamp for the current snapshot
+		 */
+		estate->es_snapshot->asofTimestamp = outerAsofTimestamp;
 
 		return slot;
 	}
+	/*
+	 * Restore ASOF timestamp for the current snapshot
+	 */
+	estate->es_snapshot->asofTimestamp = outerAsofTimestamp;
 
 	/*
 	 * if we get here it means the index scan failed so we are at the end of
@@ -578,6 +610,8 @@ ExecIndexScan(PlanState *pstate)
 void
 ExecReScanIndexScan(IndexScanState *node)
 {
+	node->ss.asofTimestampSet = false;
+
 	/*
 	 * If we are doing runtime key calculations (ie, any of the index key
 	 * values weren't simple Consts), compute the new key values.  But first,
@@ -918,6 +952,18 @@ ExecInitIndexScan(IndexScan *node, EState *estate, int eflags)
 		ExecInitExprList(node->indexorderbyorig, (PlanState *) indexstate);
 
 	/*
+	 * Initlialize AS OF expression of any
+	 */
+	if (node->scan.asofTimestamp)
+	{
+		indexstate->ss.asofExpr = ExecInitExpr((Expr *) node->scan.asofTimestamp,
+											&indexstate->ss.ps);
+		indexstate->ss.asofTimestampSet = false;
+	}
+	else
+		indexstate->ss.asofExpr = NULL;
+
+	/*
 	 * tuple table initialization
 	 */
 	ExecInitResultTupleSlot(estate, &indexstate->ss.ps);
@@ -1672,12 +1718,30 @@ ExecIndexScanInitializeDSM(IndexScanState *node,
 {
 	EState	   *estate = node->ss.ps.state;
 	ParallelIndexScanDesc piscan;
+	TimestampTz outerAsofTimestamp = estate->es_snapshot->asofTimestamp;
+	Scan* scan = (Scan*)node->ss.ps.plan;
+
+	if (scan->asofTimestamp)
+	{
+		Datum		val;
+		bool		isNull;
+
+		node->ss.asofExpr = ExecInitExpr((Expr *) scan->asofTimestamp,
+									  &node->ss.ps);
+		val = ExecEvalExprSwitchContext(node->ss.asofExpr,
+										node->ss.ps.ps_ExprContext,
+										&isNull);
+		/* Interpret NULL timestamp as no timestamp */
+		estate->es_snapshot->asofTimestamp = isNull ? 0 : DatumGetInt64(val);
+	}
 
 	piscan = shm_toc_allocate(pcxt->toc, node->iss_PscanLen);
 	index_parallelscan_initialize(node->ss.ss_currentRelation,
 								  node->iss_RelationDesc,
 								  estate->es_snapshot,
 								  piscan);
+	estate->es_snapshot->asofTimestamp = outerAsofTimestamp;
+
 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, piscan);
 	node->iss_ScanDesc =
 		index_beginscan_parallel(node->ss.ss_currentRelation,
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index a5bd60e..d19d210 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -54,6 +54,7 @@ SeqNext(SeqScanState *node)
 	EState	   *estate;
 	ScanDirection direction;
 	TupleTableSlot *slot;
+	TimestampTz     outerAsofTimestamp;
 
 	/*
 	 * get information from the estate and scan state
@@ -63,6 +64,25 @@ SeqNext(SeqScanState *node)
 	direction = estate->es_direction;
 	slot = node->ss.ss_ScanTupleSlot;
 
+	outerAsofTimestamp = estate->es_snapshot->asofTimestamp;
+
+	if (node->ss.asofExpr)
+	{
+		if (!node->ss.asofTimestampSet)
+		{
+			Datum		val;
+			bool		isNull;
+
+			val = ExecEvalExprSwitchContext(node->ss.asofExpr,
+											node->ss.ps.ps_ExprContext,
+											&isNull);
+			/* Interpret NULL timestamp as no timestamp */
+			node->ss.asofTimestamp = isNull ? 0 : DatumGetInt64(val);
+			node->ss.asofTimestampSet = true;
+		}
+		estate->es_snapshot->asofTimestamp = node->ss.asofTimestamp;
+	}
+
 	if (scandesc == NULL)
 	{
 		/*
@@ -81,6 +101,11 @@ SeqNext(SeqScanState *node)
 	tuple = heap_getnext(scandesc, direction);
 
 	/*
+	 * Restore ASOF timestamp for the current snapshot
+	 */
+	estate->es_snapshot->asofTimestamp = outerAsofTimestamp;
+
+	/*
 	 * save the tuple and the buffer returned to us by the access methods in
 	 * our scan tuple slot and return the slot.  Note: we pass 'false' because
 	 * tuples returned by heap_getnext() are pointers onto disk pages and were
@@ -196,6 +221,19 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
 		ExecInitQual(node->plan.qual, (PlanState *) scanstate);
 
 	/*
+	 * Initlialize AS OF expression of any
+	 */
+	if (node->asofTimestamp)
+	{
+		scanstate->ss.asofExpr = ExecInitExpr((Expr *) node->asofTimestamp,
+											&scanstate->ss.ps);
+		scanstate->ss.asofTimestampSet = false;
+	}
+	else
+		scanstate->ss.asofExpr = NULL;
+
+
+	/*
 	 * tuple table initialization
 	 */
 	ExecInitResultTupleSlot(estate, &scanstate->ss.ps);
@@ -273,6 +311,7 @@ ExecReScanSeqScan(SeqScanState *node)
 	HeapScanDesc scan;
 
 	scan = node->ss.ss_currentScanDesc;
+	node->ss.asofTimestampSet = false;
 
 	if (scan != NULL)
 		heap_rescan(scan,		/* scan desc */
@@ -316,11 +355,30 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
 {
 	EState	   *estate = node->ss.ps.state;
 	ParallelHeapScanDesc pscan;
+	TimestampTz     outerAsofTimestamp = estate->es_snapshot->asofTimestamp;
+	Scan* scan = (Scan*)node->ss.ps.plan;
+
+	if (scan->asofTimestamp)
+	{
+		Datum		val;
+		bool		isNull;
+
+		node->ss.asofExpr = ExecInitExpr((Expr *) scan->asofTimestamp,
+										 &node->ss.ps);
+		val = ExecEvalExprSwitchContext(node->ss.asofExpr,
+										node->ss.ps.ps_ExprContext,
+										&isNull);
+		/* Interpret NULL timestamp as no timestamp */
+		estate->es_snapshot->asofTimestamp = isNull ? 0 : DatumGetInt64(val);
+	}
 
 	pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
 	heap_parallelscan_initialize(pscan,
 								 node->ss.ss_currentRelation,
 								 estate->es_snapshot);
+
+	estate->es_snapshot->asofTimestamp = outerAsofTimestamp;
+
 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
 	node->ss.ss_currentScanDesc =
 		heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
@@ -337,8 +395,24 @@ ExecSeqScanReInitializeDSM(SeqScanState *node,
 						   ParallelContext *pcxt)
 {
 	HeapScanDesc scan = node->ss.ss_currentScanDesc;
+	EState	   *estate = node->ss.ps.state;
+	TimestampTz  outerAsofTimestamp = estate->es_snapshot->asofTimestamp;
+
+	if (node->ss.asofExpr)
+	{
+		Datum		val;
+		bool		isNull;
+
+		val = ExecEvalExprSwitchContext(node->ss.asofExpr,
+										node->ss.ps.ps_ExprContext,
+										&isNull);
+		/* Interpret NULL timestamp as no timestamp */
+		estate->es_snapshot->asofTimestamp = isNull ? 0 : DatumGetInt64(val);
+	}
 
 	heap_parallelscan_reinitialize(scan->rs_parallel);
+
+	estate->es_snapshot->asofTimestamp = outerAsofTimestamp;
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 84d7171..259d991 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -410,6 +410,7 @@ CopyScanFields(const Scan *from, Scan *newnode)
 	CopyPlanFields((const Plan *) from, (Plan *) newnode);
 
 	COPY_SCALAR_FIELD(scanrelid);
+	COPY_NODE_FIELD(asofTimestamp);
 }
 
 /*
@@ -1216,6 +1217,7 @@ _copyRangeVar(const RangeVar *from)
 	COPY_SCALAR_FIELD(relpersistence);
 	COPY_NODE_FIELD(alias);
 	COPY_LOCATION_FIELD(location);
+	COPY_NODE_FIELD(asofTimestamp);
 
 	return newnode;
 }
@@ -2326,6 +2328,7 @@ _copyRangeTblEntry(const RangeTblEntry *from)
 	COPY_BITMAPSET_FIELD(insertedCols);
 	COPY_BITMAPSET_FIELD(updatedCols);
 	COPY_NODE_FIELD(securityQuals);
+	COPY_NODE_FIELD(asofTimestamp);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 2e869a9..8ee4228 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -112,6 +112,7 @@ _equalRangeVar(const RangeVar *a, const RangeVar *b)
 	COMPARE_SCALAR_FIELD(relpersistence);
 	COMPARE_NODE_FIELD(alias);
 	COMPARE_LOCATION_FIELD(location);
+	COMPARE_NODE_FIELD(asofTimestamp);
 
 	return true;
 }
@@ -2661,6 +2662,7 @@ _equalRangeTblEntry(const RangeTblEntry *a, const RangeTblEntry *b)
 	COMPARE_BITMAPSET_FIELD(insertedCols);
 	COMPARE_BITMAPSET_FIELD(updatedCols);
 	COMPARE_NODE_FIELD(securityQuals);
+	COMPARE_NODE_FIELD(asofTimestamp);
 
 	return true;
 }
diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c
index c2a93b2..0ace44d 100644
--- a/src/backend/nodes/nodeFuncs.c
+++ b/src/backend/nodes/nodeFuncs.c
@@ -2338,6 +2338,10 @@ range_table_walker(List *rtable,
 
 		if (walker(rte->securityQuals, context))
 			return true;
+
+		if (walker(rte->asofTimestamp, context))
+			return true;
+
 	}
 	return false;
 }
@@ -3161,6 +3165,7 @@ range_table_mutator(List *rtable,
 				break;
 		}
 		MUTATE(newrte->securityQuals, rte->securityQuals, List *);
+		MUTATE(newrte->asofTimestamp, rte->asofTimestamp, Node *);
 		newrt = lappend(newrt, newrte);
 	}
 	return newrt;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index e468d7c..3ee00f3 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -3105,6 +3105,7 @@ _outRangeTblEntry(StringInfo str, const RangeTblEntry *node)
 	WRITE_BITMAPSET_FIELD(insertedCols);
 	WRITE_BITMAPSET_FIELD(updatedCols);
 	WRITE_NODE_FIELD(securityQuals);
+	WRITE_NODE_FIELD(asofTimestamp);
 }
 
 static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 1133c70..cf7c637 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1399,6 +1399,7 @@ _readRangeTblEntry(void)
 	READ_BITMAPSET_FIELD(insertedCols);
 	READ_BITMAPSET_FIELD(updatedCols);
 	READ_NODE_FIELD(securityQuals);
+	READ_NODE_FIELD(asofTimestamp);
 
 	READ_DONE();
 }
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 1a9fd82..713f9b3 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -168,10 +168,10 @@ static SeqScan *make_seqscan(List *qptlist, List *qpqual, Index scanrelid);
 static SampleScan *make_samplescan(List *qptlist, List *qpqual, Index scanrelid,
 				TableSampleClause *tsc);
 static IndexScan *make_indexscan(List *qptlist, List *qpqual, Index scanrelid,
-			   Oid indexid, List *indexqual, List *indexqualorig,
-			   List *indexorderby, List *indexorderbyorig,
-			   List *indexorderbyops,
-			   ScanDirection indexscandir);
+								 Oid indexid, List *indexqual, List *indexqualorig,
+								 List *indexorderby, List *indexorderbyorig,
+								 List *indexorderbyops,
+								 ScanDirection indexscandir);
 static IndexOnlyScan *make_indexonlyscan(List *qptlist, List *qpqual,
 				   Index scanrelid, Oid indexid,
 				   List *indexqual, List *indexorderby,
@@ -509,6 +509,7 @@ create_scan_plan(PlannerInfo *root, Path *best_path, int flags)
 	List	   *gating_clauses;
 	List	   *tlist;
 	Plan	   *plan;
+	RangeTblEntry *rte;
 
 	/*
 	 * Extract the relevant restriction clauses from the parent relation. The
@@ -709,6 +710,12 @@ create_scan_plan(PlannerInfo *root, Path *best_path, int flags)
 			break;
 	}
 
+	if (plan != NULL)
+	{
+		rte = planner_rt_fetch(rel->relid, root);
+		((Scan*)plan)->asofTimestamp = rte->asofTimestamp;
+	}
+
 	/*
 	 * If there are any pseudoconstant clauses attached to this node, insert a
 	 * gating Result node that evaluates the pseudoconstants as one-time
@@ -2434,7 +2441,7 @@ create_seqscan_plan(PlannerInfo *root, Path *best_path,
 	Assert(scan_relid > 0);
 	Assert(best_path->parent->rtekind == RTE_RELATION);
 
-	/* Sort clauses into best execution order */
+    /* Sort clauses into best execution order */
 	scan_clauses = order_qual_clauses(root, scan_clauses);
 
 	/* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 382791f..ceb6542 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -84,6 +84,7 @@ create_upper_paths_hook_type create_upper_paths_hook = NULL;
 #define EXPRKIND_ARBITER_ELEM		10
 #define EXPRKIND_TABLEFUNC			11
 #define EXPRKIND_TABLEFUNC_LATERAL	12
+#define EXPRKIND_ASOF	            13
 
 /* Passthrough data for standard_qp_callback */
 typedef struct
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index ebfc94f..a642e28 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -449,7 +449,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 %type <node>	fetch_args limit_clause select_limit_value
 				offset_clause select_offset_value
-				select_offset_value2 opt_select_fetch_first_value
+				select_offset_value2 opt_select_fetch_first_value opt_asof_clause
 %type <ival>	row_or_rows first_or_next
 
 %type <list>	OptSeqOptList SeqOptList OptParenthesizedSeqOptList
@@ -704,7 +704,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
  * as NOT, at least with respect to their left-hand subexpression.
  * NULLS_LA and WITH_LA are needed to make the grammar LALR(1).
  */
-%token		NOT_LA NULLS_LA WITH_LA
+%token		NOT_LA NULLS_LA WITH_LA AS_LA
 
 
 /* Precedence: lowest to highest */
@@ -11720,9 +11720,10 @@ from_list:
 /*
  * table_ref is where an alias clause can be attached.
  */
-table_ref:	relation_expr opt_alias_clause
+table_ref:	relation_expr opt_alias_clause opt_asof_clause
 				{
 					$1->alias = $2;
+					$1->asofTimestamp = $3;
 					$$ = (Node *) $1;
 				}
 			| relation_expr opt_alias_clause tablesample_clause
@@ -11948,6 +11949,10 @@ opt_alias_clause: alias_clause						{ $$ = $1; }
 			| /*EMPTY*/								{ $$ = NULL; }
 		;
 
+opt_asof_clause: AS_LA OF a_expr                    { $$ = $3; }
+			| /*EMPTY*/								{ $$ = NULL; }
+		;
+
 /*
  * func_alias_clause can include both an Alias and a coldeflist, so we make it
  * return a 2-element list that gets disassembled by calling production.
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index 4c4f4cd..6c3e506 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -439,6 +439,7 @@ check_agglevels_and_constraints(ParseState *pstate, Node *expr)
 			break;
 		case EXPR_KIND_LIMIT:
 		case EXPR_KIND_OFFSET:
+		case EXPR_KIND_ASOF:
 			errkind = true;
 			break;
 		case EXPR_KIND_RETURNING:
@@ -856,6 +857,7 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
 			break;
 		case EXPR_KIND_LIMIT:
 		case EXPR_KIND_OFFSET:
+		case EXPR_KIND_ASOF:
 			errkind = true;
 			break;
 		case EXPR_KIND_RETURNING:
diff --git a/src/backend/parser/parse_clause.c b/src/backend/parser/parse_clause.c
index 2828bbf..a23f3d8 100644
--- a/src/backend/parser/parse_clause.c
+++ b/src/backend/parser/parse_clause.c
@@ -426,7 +426,11 @@ transformTableEntry(ParseState *pstate, RangeVar *r)
 
 	/* We need only build a range table entry */
 	rte = addRangeTableEntry(pstate, r, r->alias, r->inh, true);
-
+	if (r->asofTimestamp)
+	{
+		Node* asof = transformExpr(pstate, r->asofTimestamp, EXPR_KIND_ASOF);
+		rte->asofTimestamp = coerce_to_specific_type(pstate, asof, TIMESTAMPTZOID, "ASOF");
+	}
 	return rte;
 }
 
diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c
index 29f9da7..cd83fc3 100644
--- a/src/backend/parser/parse_expr.c
+++ b/src/backend/parser/parse_expr.c
@@ -1818,6 +1818,7 @@ transformSubLink(ParseState *pstate, SubLink *sublink)
 		case EXPR_KIND_VALUES:
 		case EXPR_KIND_VALUES_SINGLE:
 		case EXPR_KIND_CALL:
+		case EXPR_KIND_ASOF:
 			/* okay */
 			break;
 		case EXPR_KIND_CHECK_CONSTRAINT:
@@ -3470,6 +3471,8 @@ ParseExprKindName(ParseExprKind exprKind)
 			return "PARTITION BY";
 		case EXPR_KIND_CALL:
 			return "CALL";
+		case EXPR_KIND_ASOF:
+			return "ASOF";
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index e6b0856..a6bcfc7 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -2250,6 +2250,7 @@ check_srf_call_placement(ParseState *pstate, Node *last_srf, int location)
 			break;
 		case EXPR_KIND_LIMIT:
 		case EXPR_KIND_OFFSET:
+		case EXPR_KIND_ASOF:
 			errkind = true;
 			break;
 		case EXPR_KIND_RETURNING:
diff --git a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c
index 58bdb23..ddf6af4 100644
--- a/src/backend/parser/parse_relation.c
+++ b/src/backend/parser/parse_relation.c
@@ -1206,6 +1206,7 @@ addRangeTableEntry(ParseState *pstate,
 
 	rte->rtekind = RTE_RELATION;
 	rte->alias = alias;
+	rte->asofTimestamp = relation->asofTimestamp;
 
 	/*
 	 * Get the rel's OID.  This access also ensures that we have an up-to-date
diff --git a/src/backend/parser/parser.c b/src/backend/parser/parser.c
index 245b4cd..a3845b5 100644
--- a/src/backend/parser/parser.c
+++ b/src/backend/parser/parser.c
@@ -108,6 +108,9 @@ base_yylex(YYSTYPE *lvalp, YYLTYPE *llocp, core_yyscan_t yyscanner)
 	 */
 	switch (cur_token)
 	{
+		case AS:
+			cur_token_length = 2;
+			break;
 		case NOT:
 			cur_token_length = 3;
 			break;
@@ -155,6 +158,10 @@ base_yylex(YYSTYPE *lvalp, YYLTYPE *llocp, core_yyscan_t yyscanner)
 	/* Replace cur_token if needed, based on lookahead */
 	switch (cur_token)
 	{
+		case AS:
+		    if (next_token == OF)
+			    cur_token = AS_LA;
+		    break;
 		case NOT:
 			/* Replace NOT by NOT_LA if it's followed by BETWEEN, IN, etc */
 			switch (next_token)
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index d87799c..c3b3790 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -51,6 +51,7 @@
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog.h"
+#include "access/commit_ts.h"
 #include "catalog/catalog.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -91,6 +92,9 @@ typedef struct ProcArrayStruct
 	/* oldest catalog xmin of any replication slot */
 	TransactionId replication_slot_catalog_xmin;
 
+	TransactionId time_travel_xmin;
+	TimestampTz   time_travel_horizon;
+
 	/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
 	int			pgprocnos[FLEXIBLE_ARRAY_MEMBER];
 } ProcArrayStruct;
@@ -1256,6 +1260,87 @@ TransactionIdIsActive(TransactionId xid)
 	return result;
 }
 
+/*
+ * Get minimal XID which belonds to time travel period.
+ * This function tries to adjust current time travel horizon.
+ * It is commit_ts SLRU to map xids to timestamps. As far as order of XIDs doesn;t match with order of timestamps,
+ * this function may produce no quite correct results in case of presence of long living transaction.
+ * So time travel period specification is not exact and should consider maximal transaction duration.
+ *
+ * Passed time_travel_xmin&time_travel_horizon are taken from procarray under lock.
+ */
+static TransactionId
+GetTimeTravelXmin(TransactionId oldestXmin, TransactionId time_travel_xmin, TimestampTz time_travel_horizon)
+{
+	if (time_travel_period < 0)
+	{
+		/* Infinite history */
+		oldestXmin -= MaxTimeTravelPeriod;
+	}
+	else
+	{
+		/* Limited history: check time travel horizon */
+		TimestampTz new_horizon = GetCurrentTimestamp()	- (TimestampTz)time_travel_period*USECS_PER_SEC;
+		TransactionId old_xmin = time_travel_xmin;
+
+		if (time_travel_xmin != InvalidTransactionId)
+		{
+			/* We have already determined time travel horizon: check if it needs to be adjusted */
+			TimestampTz old_horizon = time_travel_horizon;
+			TransactionId xid = old_xmin;
+
+			while (timestamptz_cmp_internal(old_horizon, new_horizon) < 0)
+			{
+				/* Move horizon forward */
+				time_travel_xmin  = xid;
+				time_travel_horizon = old_horizon;
+				do {
+					TransactionIdAdvance(xid);
+					/* Stop if we reach oldest xmin */
+					if (TransactionIdFollowsOrEquals(xid, oldestXmin))
+						goto EndScan;
+				} while (!TransactionIdGetCommitTsData(xid, &old_horizon, NULL));
+			}
+		}
+		else
+		{
+			/* Find out time travel horizon */
+			TransactionId xid = oldestXmin;
+
+			do {
+				TransactionIdRetreat(xid);
+				/*
+				 * Lack of information about transaction timestamp in SLRU means that we reach unexisted or untracked transaction,
+				 * so we need to stop traversal in this case
+				 */
+				if (!TransactionIdGetCommitTsData(xid, &time_travel_horizon, NULL))
+					goto EndScan;
+				time_travel_xmin = xid;
+			} while (timestamptz_cmp_internal(time_travel_horizon, new_horizon) > 0);
+		}
+	  EndScan:
+		if (old_xmin != time_travel_xmin)
+		{
+			/* Horizon moved */
+			LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+			/* Recheck under lock that xmin is advanced */
+			if (TransactionIdPrecedes(procArray->time_travel_xmin, time_travel_xmin))
+			{
+				procArray->time_travel_xmin = time_travel_xmin;
+				procArray->time_travel_horizon = time_travel_horizon;
+			}
+			LWLockRelease(ProcArrayLock);
+		}
+		/* Move oldest xmin in the past if it is required for time travel */
+		if (TransactionIdPrecedes(time_travel_xmin, oldestXmin))
+			oldestXmin = time_travel_xmin;
+	}
+
+	if (!TransactionIdIsNormal(oldestXmin))
+		oldestXmin = FirstNormalTransactionId;
+
+	return oldestXmin;
+}
 
 /*
  * GetOldestXmin -- returns oldest transaction that was running
@@ -1321,6 +1406,8 @@ GetOldestXmin(Relation rel, int flags)
 
 	volatile TransactionId replication_slot_xmin = InvalidTransactionId;
 	volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
+	volatile TransactionId time_travel_xmin;
+	TimestampTz time_travel_horizon;
 
 	/*
 	 * If we're not computing a relation specific limit, or if a shared
@@ -1383,6 +1470,9 @@ GetOldestXmin(Relation rel, int flags)
 	replication_slot_xmin = procArray->replication_slot_xmin;
 	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
 
+	time_travel_xmin = procArray->time_travel_xmin;
+	time_travel_horizon = procArray->time_travel_horizon;
+
 	if (RecoveryInProgress())
 	{
 		/*
@@ -1423,6 +1513,9 @@ GetOldestXmin(Relation rel, int flags)
 			result = FirstNormalTransactionId;
 	}
 
+	if (time_travel_period != 0)
+		result = GetTimeTravelXmin(result, time_travel_xmin, time_travel_horizon);
+
 	/*
 	 * Check whether there are replication slots requiring an older xmin.
 	 */
@@ -1469,6 +1562,7 @@ GetMaxSnapshotSubxidCount(void)
 	return TOTAL_MAX_CACHED_SUBXIDS;
 }
 
+
 /*
  * GetSnapshotData -- returns information about running transactions.
  *
@@ -1518,6 +1612,8 @@ GetSnapshotData(Snapshot snapshot)
 	bool		suboverflowed = false;
 	volatile TransactionId replication_slot_xmin = InvalidTransactionId;
 	volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
+	volatile TransactionId time_travel_xmin;
+	TimestampTz time_travel_horizon;
 
 	Assert(snapshot != NULL);
 
@@ -1707,6 +1803,9 @@ GetSnapshotData(Snapshot snapshot)
 	replication_slot_xmin = procArray->replication_slot_xmin;
 	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
 
+	time_travel_xmin = procArray->time_travel_xmin;
+	time_travel_horizon = procArray->time_travel_horizon;
+
 	if (!TransactionIdIsValid(MyPgXact->xmin))
 		MyPgXact->xmin = TransactionXmin = xmin;
 
@@ -1730,6 +1829,9 @@ GetSnapshotData(Snapshot snapshot)
 		NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
 		RecentGlobalXmin = replication_slot_xmin;
 
+	if (time_travel_period != 0)
+		RecentGlobalXmin = GetTimeTravelXmin(RecentGlobalXmin, time_travel_xmin, time_travel_horizon);
+
 	/* Non-catalog tables can be vacuumed if older than this xid */
 	RecentGlobalDataXmin = RecentGlobalXmin;
 
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index e32901d..7d25060 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -191,6 +191,7 @@ static void assign_application_name(const char *newval, void *extra);
 static bool check_cluster_name(char **newval, void **extra, GucSource source);
 static const char *show_unix_socket_permissions(void);
 static const char *show_log_file_mode(void);
+static void assign_time_travel_period_hook(int newval, void *extra);
 
 /* Private functions in guc-file.l that need to be called from guc.c */
 static ConfigVariable *ProcessConfigFileInternal(GucContext context,
@@ -1713,6 +1714,15 @@ static struct config_bool ConfigureNamesBool[] =
 static struct config_int ConfigureNamesInt[] =
 {
 	{
+		{"time_travel_period", PGC_POSTMASTER, AUTOVACUUM,
+			gettext_noop("Specifies time travel period in seconds: 0 disables, -1 infinite"),
+			NULL
+		},
+		&time_travel_period,
+		0, -1, MaxTimeTravelPeriod,
+		NULL, assign_time_travel_period_hook, NULL
+	},
+	{
 		{"archive_timeout", PGC_SIGHUP, WAL_ARCHIVING,
 			gettext_noop("Forces a switch to the next WAL file if a "
 						 "new file has not been started within N seconds."),
@@ -10530,4 +10540,14 @@ show_log_file_mode(void)
 	return buf;
 }
 
+static void assign_time_travel_period_hook(int newval, void *extra)
+{
+	if (newval != 0)
+	{
+		track_commit_timestamp = true;
+		if (newval < 0)
+			autovacuum_start_daemon = false;
+	}
+}
+
 #include "guc-file.c"
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 0b03290..d0bacd4 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -73,6 +73,7 @@
  * GUC parameters
  */
 int			old_snapshot_threshold; /* number of minutes, -1 disables */
+int         time_travel_period;     /* number of seconds, 0 disables, -1 infinite */
 
 /*
  * Structure for dealing with old_snapshot_threshold implementation.
@@ -244,6 +245,7 @@ typedef struct SerializedSnapshotData
 	bool		takenDuringRecovery;
 	CommandId	curcid;
 	TimestampTz whenTaken;
+	TimestampTz asofTimestamp;
 	XLogRecPtr	lsn;
 } SerializedSnapshotData;
 
@@ -2080,6 +2082,7 @@ SerializeSnapshot(Snapshot snapshot, char *start_address)
 	serialized_snapshot.takenDuringRecovery = snapshot->takenDuringRecovery;
 	serialized_snapshot.curcid = snapshot->curcid;
 	serialized_snapshot.whenTaken = snapshot->whenTaken;
+	serialized_snapshot.asofTimestamp = snapshot->asofTimestamp;
 	serialized_snapshot.lsn = snapshot->lsn;
 
 	/*
@@ -2154,6 +2157,7 @@ RestoreSnapshot(char *start_address)
 	snapshot->takenDuringRecovery = serialized_snapshot.takenDuringRecovery;
 	snapshot->curcid = serialized_snapshot.curcid;
 	snapshot->whenTaken = serialized_snapshot.whenTaken;
+	snapshot->asofTimestamp = serialized_snapshot.asofTimestamp;
 	snapshot->lsn = serialized_snapshot.lsn;
 
 	/* Copy XIDs, if present. */
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index 2b218e0..09e067f 100644
--- a/src/backend/utils/time/tqual.c
+++ b/src/backend/utils/time/tqual.c
@@ -69,6 +69,7 @@
 #include "access/transam.h"
 #include "access/xact.h"
 #include "access/xlog.h"
+#include "access/commit_ts.h"
 #include "storage/bufmgr.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
@@ -1476,6 +1477,16 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 {
 	uint32		i;
 
+	if (snapshot->asofTimestamp != 0)
+	{
+		TimestampTz ts;
+		if (TransactionIdGetCommitTsData(xid, &ts, NULL))
+		{
+			return timestamptz_cmp_internal(snapshot->asofTimestamp, ts) < 0;
+		}
+	}
+
+
 	/*
 	 * Make a quick range check to eliminate most XIDs without looking at the
 	 * xip arrays.  Note that this is OK even if we convert a subxact XID to
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 86076de..e46f4c6 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -33,6 +33,7 @@
 #define FrozenTransactionId			((TransactionId) 2)
 #define FirstNormalTransactionId	((TransactionId) 3)
 #define MaxTransactionId			((TransactionId) 0xFFFFFFFF)
+#define MaxTimeTravelPeriod         ((TransactionId) 0x3FFFFFFF)
 
 /* ----------------
  *		transaction ID manipulation macros
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index c9a5279..ed923ab 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1120,6 +1120,9 @@ typedef struct ScanState
 	Relation	ss_currentRelation;
 	HeapScanDesc ss_currentScanDesc;
 	TupleTableSlot *ss_ScanTupleSlot;
+	ExprState  *asofExpr;	      /* AS OF expression */
+	bool        asofTimestampSet; /* As OF timestamp evaluated */
+	TimestampTz asofTimestamp;    /* AS OF timestamp or 0 if not set */
 } ScanState;
 
 /* ----------------
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 2eaa6b2..b78c8e2 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1062,6 +1062,7 @@ typedef struct RangeTblEntry
 	Bitmapset  *insertedCols;	/* columns needing INSERT permission */
 	Bitmapset  *updatedCols;	/* columns needing UPDATE permission */
 	List	   *securityQuals;	/* security barrier quals to apply, if any */
+	Node       *asofTimestamp;  /* AS OF timestamp */
 } RangeTblEntry;
 
 /*
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index d763da6..083dc90 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -327,7 +327,8 @@ typedef struct BitmapOr
 typedef struct Scan
 {
 	Plan		plan;
-	Index		scanrelid;		/* relid is index into the range table */
+	Index		scanrelid;	   /* relid is index into the range table */
+	Node       *asofTimestamp; /* AS OF timestamp */
 } Scan;
 
 /* ----------------
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index 074ae0a..11e1a0c 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -70,6 +70,7 @@ typedef struct RangeVar
 								 * on children? */
 	char		relpersistence; /* see RELPERSISTENCE_* in pg_class.h */
 	Alias	   *alias;			/* table alias & optional column aliases */
+	Node       *asofTimestamp;  /* expression with AS OF timestamp */
 	int			location;		/* token location, or -1 if unknown */
 } RangeVar;
 
diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h
index 565bb3d..b1efb5c 100644
--- a/src/include/parser/parse_node.h
+++ b/src/include/parser/parse_node.h
@@ -68,7 +68,8 @@ typedef enum ParseExprKind
 	EXPR_KIND_TRIGGER_WHEN,		/* WHEN condition in CREATE TRIGGER */
 	EXPR_KIND_POLICY,			/* USING or WITH CHECK expr in policy */
 	EXPR_KIND_PARTITION_EXPRESSION,	/* PARTITION BY expression */
-	EXPR_KIND_CALL				/* CALL argument */
+	EXPR_KIND_CALL,				/* CALL argument */
+	EXPR_KIND_ASOF              /* AS OF */
 } ParseExprKind;
 
 
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index 8585194..1e1414a 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -47,7 +47,7 @@
 
 /* GUC variables */
 extern PGDLLIMPORT int old_snapshot_threshold;
-
+extern PGDLLIMPORT int time_travel_period;
 
 extern Size SnapMgrShmemSize(void);
 extern void SnapMgrInit(void);
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index bf51977..a00f0d9 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -111,6 +111,7 @@ typedef struct SnapshotData
 	pairingheap_node ph_node;	/* link in the RegisteredSnapshots heap */
 
 	TimestampTz whenTaken;		/* timestamp when snapshot was taken */
+	TimestampTz asofTimestamp;	/* select AS OF timestamp */
 	XLogRecPtr	lsn;			/* position in the WAL stream when taken */
 } SnapshotData;
 

Reply via email to