diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 91c13d4..8e57bae 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -93,7 +93,8 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
 					TransactionId xid, CommandId cid, int options);
 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
 				Buffer newbuf, HeapTuple oldtup,
-				HeapTuple newtup, HeapTuple old_key_tup,
+				HeapTuple newtup, OffsetNumber root_offnum,
+				HeapTuple old_key_tup,
 				bool all_visible_cleared, bool new_all_visible_cleared);
 static Bitmapset *HeapDetermineModifiedColumns(Relation relation,
 							 Bitmapset *interesting_cols,
@@ -2247,13 +2248,13 @@ heap_get_latest_tid(Relation relation,
 		 */
 		if ((tp.t_data->t_infomask & HEAP_XMAX_INVALID) ||
 			HeapTupleHeaderIsOnlyLocked(tp.t_data) ||
-			ItemPointerEquals(&tp.t_self, &tp.t_data->t_ctid))
+			HeapTupleHeaderIsHeapLatest(tp.t_data, &ctid))
 		{
 			UnlockReleaseBuffer(buffer);
 			break;
 		}
 
-		ctid = tp.t_data->t_ctid;
+		HeapTupleHeaderGetNextTid(tp.t_data, &ctid);
 		priorXmax = HeapTupleHeaderGetUpdateXid(tp.t_data);
 		UnlockReleaseBuffer(buffer);
 	}							/* end of loop */
@@ -2373,6 +2374,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 	Buffer		buffer;
 	Buffer		vmbuffer = InvalidBuffer;
 	bool		all_visible_cleared = false;
+	OffsetNumber	root_offnum;
 
 	/*
 	 * Fill in tuple header fields, assign an OID, and toast the tuple if
@@ -2411,8 +2413,14 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 	/* NO EREPORT(ERROR) from here till changes are logged */
 	START_CRIT_SECTION();
 
+	root_offnum = InvalidOffsetNumber;
 	RelationPutHeapTuple(relation, buffer, heaptup,
-						 (options & HEAP_INSERT_SPECULATIVE) != 0);
+						 (options & HEAP_INSERT_SPECULATIVE) != 0,
+						 &root_offnum);
+
+	/* We must not overwrite the speculative insertion token */
+	if ((options & HEAP_INSERT_SPECULATIVE) == 0)
+		HeapTupleHeaderSetHeapLatest(heaptup->t_data, root_offnum);
 
 	if (PageIsAllVisible(BufferGetPage(buffer)))
 	{
@@ -2640,6 +2648,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	Size		saveFreeSpace;
 	bool		need_tuple_data = RelationIsLogicallyLogged(relation);
 	bool		need_cids = RelationIsAccessibleInLogicalDecoding(relation);
+	OffsetNumber	root_offnum;
 
 	needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation);
 	saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
@@ -2710,7 +2719,13 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 		 * RelationGetBufferForTuple has ensured that the first tuple fits.
 		 * Put that on the page, and then as many other tuples as fit.
 		 */
-		RelationPutHeapTuple(relation, buffer, heaptuples[ndone], false);
+		root_offnum = InvalidOffsetNumber;
+		RelationPutHeapTuple(relation, buffer, heaptuples[ndone], false,
+				&root_offnum);
+
+		/* Mark this tuple as the latest and also set root offset */
+		HeapTupleHeaderSetHeapLatest(heaptuples[ndone]->t_data, root_offnum);
+
 		for (nthispage = 1; ndone + nthispage < ntuples; nthispage++)
 		{
 			HeapTuple	heaptup = heaptuples[ndone + nthispage];
@@ -2718,7 +2733,11 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 			if (PageGetHeapFreeSpace(page) < MAXALIGN(heaptup->t_len) + saveFreeSpace)
 				break;
 
-			RelationPutHeapTuple(relation, buffer, heaptup, false);
+			root_offnum = InvalidOffsetNumber;
+			RelationPutHeapTuple(relation, buffer, heaptup, false,
+					&root_offnum);
+			/* Mark each tuple as the latest and also set root offset */
+			HeapTupleHeaderSetHeapLatest(heaptup->t_data, root_offnum);
 
 			/*
 			 * We don't use heap_multi_insert for catalog tuples yet, but
@@ -2990,6 +3009,7 @@ heap_delete(Relation relation, ItemPointer tid,
 	HeapTupleData tp;
 	Page		page;
 	BlockNumber block;
+	OffsetNumber	offnum;
 	Buffer		buffer;
 	Buffer		vmbuffer = InvalidBuffer;
 	TransactionId new_xmax;
@@ -3000,6 +3020,7 @@ heap_delete(Relation relation, ItemPointer tid,
 	bool		all_visible_cleared = false;
 	HeapTuple	old_key_tuple = NULL;	/* replica identity of the tuple */
 	bool		old_key_copied = false;
+	OffsetNumber	root_offnum;
 
 	Assert(ItemPointerIsValid(tid));
 
@@ -3041,7 +3062,8 @@ heap_delete(Relation relation, ItemPointer tid,
 		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 	}
 
-	lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid));
+	offnum = ItemPointerGetOffsetNumber(tid);
+	lp = PageGetItemId(page, offnum);
 	Assert(ItemIdIsNormal(lp));
 
 	tp.t_tableOid = RelationGetRelid(relation);
@@ -3171,7 +3193,17 @@ l1:
 			   result == HeapTupleUpdated ||
 			   result == HeapTupleBeingUpdated);
 		Assert(!(tp.t_data->t_infomask & HEAP_XMAX_INVALID));
-		hufd->ctid = tp.t_data->t_ctid;
+
+		/*
+		 * If we're at the end of the chain, then just return the same TID back
+		 * to the caller. The caller uses that as a hint to know if we have hit
+		 * the end of the chain
+		 */
+		if (!HeapTupleHeaderIsHeapLatest(tp.t_data, &tp.t_self))
+			HeapTupleHeaderGetNextTid(tp.t_data, &hufd->ctid);
+		else
+			ItemPointerCopy(&tp.t_self, &hufd->ctid);
+
 		hufd->xmax = HeapTupleHeaderGetUpdateXid(tp.t_data);
 		if (result == HeapTupleSelfUpdated)
 			hufd->cmax = HeapTupleHeaderGetCmax(tp.t_data);
@@ -3220,6 +3252,23 @@ l1:
 							  xid, LockTupleExclusive, true,
 							  &new_xmax, &new_infomask, &new_infomask2);
 
+	/*
+	 * heap_get_root_tuple_one() may call palloc, which is disallowed once we
+	 * enter the critical section. So check if the root offset is cached in the
+	 * tuple and if not, fetch that information hard way before entering the
+	 * critical section
+	 *
+	 * Most often and unless we are dealing with a pg-upgraded cluster, the
+	 * root offset information should be cached. So there should not be too
+	 * much overhead of fetching this information. Also, once a tuple is
+	 * updated, the information will be copied to the new version. So it's not
+	 * as if we're going to pay this price forever
+	 */
+	if (!HeapTupleHeaderHasRootOffset(tp.t_data))
+		heap_get_root_tuple_one(page,
+				ItemPointerGetOffsetNumber(&tp.t_self),
+				&root_offnum);
+
 	START_CRIT_SECTION();
 
 	/*
@@ -3247,8 +3296,10 @@ l1:
 	HeapTupleHeaderClearHotUpdated(tp.t_data);
 	HeapTupleHeaderSetXmax(tp.t_data, new_xmax);
 	HeapTupleHeaderSetCmax(tp.t_data, cid, iscombo);
-	/* Make sure there is no forward chain link in t_ctid */
-	tp.t_data->t_ctid = tp.t_self;
+
+	/* Mark this tuple as the latest tuple in the update chain */
+	if (!HeapTupleHeaderHasRootOffset(tp.t_data))
+		HeapTupleHeaderSetHeapLatest(tp.t_data, root_offnum);
 
 	MarkBufferDirty(buffer);
 
@@ -3449,6 +3500,8 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 	bool		old_key_copied = false;
 	Page		page;
 	BlockNumber block;
+	OffsetNumber	offnum;
+	OffsetNumber	root_offnum;
 	MultiXactStatus mxact_status;
 	Buffer		buffer,
 				newbuf,
@@ -3511,6 +3564,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 
 
 	block = ItemPointerGetBlockNumber(otid);
+	offnum = ItemPointerGetOffsetNumber(otid);
 	buffer = ReadBuffer(relation, block);
 	page = BufferGetPage(buffer);
 
@@ -3795,7 +3849,12 @@ l2:
 			   result == HeapTupleUpdated ||
 			   result == HeapTupleBeingUpdated);
 		Assert(!(oldtup.t_data->t_infomask & HEAP_XMAX_INVALID));
-		hufd->ctid = oldtup.t_data->t_ctid;
+
+		if (!HeapTupleHeaderIsHeapLatest(oldtup.t_data, &oldtup.t_self))
+			HeapTupleHeaderGetNextTid(oldtup.t_data, &hufd->ctid);
+		else
+			ItemPointerCopy(&oldtup.t_self, &hufd->ctid);
+
 		hufd->xmax = HeapTupleHeaderGetUpdateXid(oldtup.t_data);
 		if (result == HeapTupleSelfUpdated)
 			hufd->cmax = HeapTupleHeaderGetCmax(oldtup.t_data);
@@ -3935,6 +3994,7 @@ l2:
 		uint16		infomask_lock_old_tuple,
 					infomask2_lock_old_tuple;
 		bool		cleared_all_frozen = false;
+		OffsetNumber	root_offnum;
 
 		/*
 		 * To prevent concurrent sessions from updating the tuple, we have to
@@ -3962,6 +4022,15 @@ l2:
 
 		Assert(HEAP_XMAX_IS_LOCKED_ONLY(infomask_lock_old_tuple));
 
+		/*
+		 * Fetch root offset before entering the critical section. We do this
+		 * only if the information is not already available
+		 */
+		if (!HeapTupleHeaderHasRootOffset(oldtup.t_data))
+			heap_get_root_tuple_one(page,
+					ItemPointerGetOffsetNumber(&oldtup.t_self),
+					&root_offnum);
+
 		START_CRIT_SECTION();
 
 		/* Clear obsolete visibility flags ... */
@@ -3976,7 +4045,8 @@ l2:
 		HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo);
 
 		/* temporarily make it look not-updated, but locked */
-		oldtup.t_data->t_ctid = oldtup.t_self;
+		if (!HeapTupleHeaderHasRootOffset(oldtup.t_data))
+			HeapTupleHeaderSetHeapLatest(oldtup.t_data, root_offnum);
 
 		/*
 		 * Clear all-frozen bit on visibility map if needed. We could
@@ -4134,6 +4204,11 @@ l2:
 										   bms_overlap(modified_attrs, id_attrs),
 										   &old_key_copied);
 
+	if (!HeapTupleHeaderHasRootOffset(oldtup.t_data))
+		heap_get_root_tuple_one(page,
+				ItemPointerGetOffsetNumber(&(oldtup.t_self)),
+				&root_offnum);
+
 	/* NO EREPORT(ERROR) from here till changes are logged */
 	START_CRIT_SECTION();
 
@@ -4159,6 +4234,17 @@ l2:
 		HeapTupleSetHeapOnly(heaptup);
 		/* Mark the caller's copy too, in case different from heaptup */
 		HeapTupleSetHeapOnly(newtup);
+		/*
+		 * For HOT (or WARM) updated tuples, we store the offset of the root
+		 * line pointer of this chain in the ip_posid field of the new tuple.
+		 * Usually this information will be available in the corresponding
+		 * field of the old tuple. But for aborted updates or pg_upgraded
+		 * databases, we might be seeing the old-style CTID chains and hence
+		 * the information must be obtained by hard way (we should have done
+		 * that before entering the critical section above)
+		 */
+		if (HeapTupleHeaderHasRootOffset(oldtup.t_data))
+			root_offnum = HeapTupleHeaderGetRootOffset(oldtup.t_data);
 	}
 	else
 	{
@@ -4166,10 +4252,21 @@ l2:
 		HeapTupleClearHotUpdated(&oldtup);
 		HeapTupleClearHeapOnly(heaptup);
 		HeapTupleClearHeapOnly(newtup);
+		root_offnum = InvalidOffsetNumber;
 	}
 
-	RelationPutHeapTuple(relation, newbuf, heaptup, false);		/* insert new tuple */
-
+	/* insert new tuple */
+	RelationPutHeapTuple(relation, newbuf, heaptup, false, &root_offnum);
+	/*
+	 * Also mark both copies as latest and set the root offset information. If
+	 * we're doing a HOT/WARM update, then we just copy the information from
+	 * old tuple, if available or computed above. For regular updates,
+	 * RelationPutHeapTuple must have returned us the actual offset number
+	 * where the new version was inserted and we store the same value since the
+	 * update resulted in a new HOT-chain
+	 */
+	HeapTupleHeaderSetHeapLatest(heaptup->t_data, root_offnum);
+	HeapTupleHeaderSetHeapLatest(newtup->t_data, root_offnum);
 
 	/* Clear obsolete visibility flags, possibly set by ourselves above... */
 	oldtup.t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
@@ -4182,7 +4279,7 @@ l2:
 	HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo);
 
 	/* record address of new tuple in t_ctid of old one */
-	oldtup.t_data->t_ctid = heaptup->t_self;
+	HeapTupleHeaderSetNextTid(oldtup.t_data, &(heaptup->t_self));
 
 	/* clear PD_ALL_VISIBLE flags, reset all visibilitymap bits */
 	if (PageIsAllVisible(BufferGetPage(buffer)))
@@ -4221,6 +4318,7 @@ l2:
 
 		recptr = log_heap_update(relation, buffer,
 								 newbuf, &oldtup, heaptup,
+								 root_offnum,
 								 old_key_tuple,
 								 all_visible_cleared,
 								 all_visible_cleared_new);
@@ -4501,7 +4599,8 @@ heap_lock_tuple(Relation relation, HeapTuple tuple,
 	ItemId		lp;
 	Page		page;
 	Buffer		vmbuffer = InvalidBuffer;
-	BlockNumber block;
+	BlockNumber	block;
+	OffsetNumber	offnum;
 	TransactionId xid,
 				xmax;
 	uint16		old_infomask,
@@ -4510,9 +4609,11 @@ heap_lock_tuple(Relation relation, HeapTuple tuple,
 	bool		first_time = true;
 	bool		have_tuple_lock = false;
 	bool		cleared_all_frozen = false;
+	OffsetNumber	root_offnum;
 
 	*buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
 	block = ItemPointerGetBlockNumber(tid);
+	offnum = ItemPointerGetOffsetNumber(tid);
 
 	/*
 	 * Before locking the buffer, pin the visibility map page if it appears to
@@ -4532,6 +4633,7 @@ heap_lock_tuple(Relation relation, HeapTuple tuple,
 	tuple->t_data = (HeapTupleHeader) PageGetItem(page, lp);
 	tuple->t_len = ItemIdGetLength(lp);
 	tuple->t_tableOid = RelationGetRelid(relation);
+	tuple->t_self = *tid;
 
 l3:
 	result = HeapTupleSatisfiesUpdate(tuple, cid, *buffer);
@@ -4559,7 +4661,11 @@ l3:
 		xwait = HeapTupleHeaderGetRawXmax(tuple->t_data);
 		infomask = tuple->t_data->t_infomask;
 		infomask2 = tuple->t_data->t_infomask2;
-		ItemPointerCopy(&tuple->t_data->t_ctid, &t_ctid);
+
+		if (!HeapTupleHeaderIsHeapLatest(tuple->t_data, tid))
+			HeapTupleHeaderGetNextTid(tuple->t_data, &t_ctid);
+		else
+			ItemPointerCopy(tid, &t_ctid);
 
 		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
 
@@ -4997,7 +5103,12 @@ failed:
 		Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated ||
 			   result == HeapTupleWouldBlock);
 		Assert(!(tuple->t_data->t_infomask & HEAP_XMAX_INVALID));
-		hufd->ctid = tuple->t_data->t_ctid;
+
+		if (!HeapTupleHeaderIsHeapLatest(tuple->t_data, tid))
+			HeapTupleHeaderGetNextTid(tuple->t_data, &hufd->ctid);
+		else
+			ItemPointerCopy(tid, &hufd->ctid);
+
 		hufd->xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
 		if (result == HeapTupleSelfUpdated)
 			hufd->cmax = HeapTupleHeaderGetCmax(tuple->t_data);
@@ -5045,6 +5156,11 @@ failed:
 							  GetCurrentTransactionId(), mode, false,
 							  &xid, &new_infomask, &new_infomask2);
 
+	if (!HeapTupleHeaderHasRootOffset(tuple->t_data))
+		heap_get_root_tuple_one(page,
+				ItemPointerGetOffsetNumber(&tuple->t_self),
+				&root_offnum);
+
 	START_CRIT_SECTION();
 
 	/*
@@ -5073,7 +5189,10 @@ failed:
 	 * the tuple as well.
 	 */
 	if (HEAP_XMAX_IS_LOCKED_ONLY(new_infomask))
-		tuple->t_data->t_ctid = *tid;
+	{
+		if (!HeapTupleHeaderHasRootOffset(tuple->t_data))
+			HeapTupleHeaderSetHeapLatest(tuple->t_data, root_offnum);
+	}
 
 	/* Clear only the all-frozen bit on visibility map if needed */
 	if (PageIsAllVisible(page) &&
@@ -5587,6 +5706,7 @@ heap_lock_updated_tuple_rec(Relation rel, ItemPointer tid, TransactionId xid,
 	bool		cleared_all_frozen = false;
 	Buffer		vmbuffer = InvalidBuffer;
 	BlockNumber block;
+	OffsetNumber offnum;
 
 	ItemPointerCopy(tid, &tupid);
 
@@ -5595,6 +5715,8 @@ heap_lock_updated_tuple_rec(Relation rel, ItemPointer tid, TransactionId xid,
 		new_infomask = 0;
 		new_xmax = InvalidTransactionId;
 		block = ItemPointerGetBlockNumber(&tupid);
+		offnum = ItemPointerGetOffsetNumber(&tupid);
+
 		ItemPointerCopy(&tupid, &(mytup.t_self));
 
 		if (!heap_fetch(rel, SnapshotAny, &mytup, &buf, false, NULL))
@@ -5824,7 +5946,7 @@ l4:
 
 		/* if we find the end of update chain, we're done. */
 		if (mytup.t_data->t_infomask & HEAP_XMAX_INVALID ||
-			ItemPointerEquals(&mytup.t_self, &mytup.t_data->t_ctid) ||
+			HeapTupleHeaderIsHeapLatest(mytup.t_data, &mytup.t_self) ||
 			HeapTupleHeaderIsOnlyLocked(mytup.t_data))
 		{
 			result = HeapTupleMayBeUpdated;
@@ -5833,7 +5955,7 @@ l4:
 
 		/* tail recursion */
 		priorXmax = HeapTupleHeaderGetUpdateXid(mytup.t_data);
-		ItemPointerCopy(&(mytup.t_data->t_ctid), &tupid);
+		HeapTupleHeaderGetNextTid(mytup.t_data, &tupid);
 		UnlockReleaseBuffer(buf);
 		if (vmbuffer != InvalidBuffer)
 			ReleaseBuffer(vmbuffer);
@@ -5950,7 +6072,7 @@ heap_finish_speculative(Relation relation, HeapTuple tuple)
 	 * Replace the speculative insertion token with a real t_ctid, pointing to
 	 * itself like it does on regular tuples.
 	 */
-	htup->t_ctid = tuple->t_self;
+	HeapTupleHeaderSetHeapLatest(htup, offnum);
 
 	/* XLOG stuff */
 	if (RelationNeedsWAL(relation))
@@ -6076,8 +6198,7 @@ heap_abort_speculative(Relation relation, HeapTuple tuple)
 	HeapTupleHeaderSetXmin(tp.t_data, InvalidTransactionId);
 
 	/* Clear the speculative insertion token too */
-	tp.t_data->t_ctid = tp.t_self;
-
+	HeapTupleHeaderSetHeapLatest(tp.t_data, ItemPointerGetOffsetNumber(tid));
 	MarkBufferDirty(buffer);
 
 	/*
@@ -7425,6 +7546,7 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
 static XLogRecPtr
 log_heap_update(Relation reln, Buffer oldbuf,
 				Buffer newbuf, HeapTuple oldtup, HeapTuple newtup,
+				OffsetNumber root_offnum,
 				HeapTuple old_key_tuple,
 				bool all_visible_cleared, bool new_all_visible_cleared)
 {
@@ -7545,6 +7667,9 @@ log_heap_update(Relation reln, Buffer oldbuf,
 	xlrec.new_offnum = ItemPointerGetOffsetNumber(&newtup->t_self);
 	xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data);
 
+	Assert(OffsetNumberIsValid(root_offnum));
+	xlrec.root_offnum = root_offnum;
+
 	bufflags = REGBUF_STANDARD;
 	if (init)
 		bufflags |= REGBUF_WILL_INIT;
@@ -8199,7 +8324,13 @@ heap_xlog_delete(XLogReaderState *record)
 			PageClearAllVisible(page);
 
 		/* Make sure there is no forward chain link in t_ctid */
-		htup->t_ctid = target_tid;
+		if (!HeapTupleHeaderHasRootOffset(htup))
+		{
+			OffsetNumber	root_offnum;
+			heap_get_root_tuple_one(page, xlrec->offnum, &root_offnum); 
+			HeapTupleHeaderSetHeapLatest(htup, root_offnum);
+		}
+
 		PageSetLSN(page, lsn);
 		MarkBufferDirty(buffer);
 	}
@@ -8289,7 +8420,8 @@ heap_xlog_insert(XLogReaderState *record)
 		htup->t_hoff = xlhdr.t_hoff;
 		HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
 		HeapTupleHeaderSetCmin(htup, FirstCommandId);
-		htup->t_ctid = target_tid;
+
+		HeapTupleHeaderSetHeapLatest(htup, xlrec->offnum);
 
 		if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum,
 						true, true) == InvalidOffsetNumber)
@@ -8424,8 +8556,8 @@ heap_xlog_multi_insert(XLogReaderState *record)
 			htup->t_hoff = xlhdr->t_hoff;
 			HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
 			HeapTupleHeaderSetCmin(htup, FirstCommandId);
-			ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
-			ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
+
+			HeapTupleHeaderSetHeapLatest(htup, offnum);
 
 			offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
 			if (offnum == InvalidOffsetNumber)
@@ -8561,7 +8693,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update)
 		HeapTupleHeaderSetXmax(htup, xlrec->old_xmax);
 		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
 		/* Set forward chain link in t_ctid */
-		htup->t_ctid = newtid;
+		HeapTupleHeaderSetNextTid(htup, &newtid);
 
 		/* Mark the page as a candidate for pruning */
 		PageSetPrunable(page, XLogRecGetXid(record));
@@ -8694,13 +8826,17 @@ heap_xlog_update(XLogReaderState *record, bool hot_update)
 		HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
 		HeapTupleHeaderSetCmin(htup, FirstCommandId);
 		HeapTupleHeaderSetXmax(htup, xlrec->new_xmax);
-		/* Make sure there is no forward chain link in t_ctid */
-		htup->t_ctid = newtid;
 
 		offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
 		if (offnum == InvalidOffsetNumber)
 			elog(PANIC, "failed to add tuple");
 
+		/*
+		 * Make sure the tuple is marked as the latest and root offset
+		 * information is restored
+		 */
+		HeapTupleHeaderSetHeapLatest(htup, xlrec->root_offnum);
+
 		if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
 			PageClearAllVisible(page);
 
@@ -8763,6 +8899,9 @@ heap_xlog_confirm(XLogReaderState *record)
 		 */
 		ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum);
 
+		/* For newly inserted tuple, set root offset to itself */
+		HeapTupleHeaderSetHeapLatest(htup, offnum);
+
 		PageSetLSN(page, lsn);
 		MarkBufferDirty(buffer);
 	}
@@ -8826,11 +8965,18 @@ heap_xlog_lock(XLogReaderState *record)
 		 */
 		if (HEAP_XMAX_IS_LOCKED_ONLY(htup->t_infomask))
 		{
+			ItemPointerData	target_tid;
+
+			ItemPointerSet(&target_tid, BufferGetBlockNumber(buffer), offnum);
 			HeapTupleHeaderClearHotUpdated(htup);
 			/* Make sure there is no forward chain link in t_ctid */
-			ItemPointerSet(&htup->t_ctid,
-						   BufferGetBlockNumber(buffer),
-						   offnum);
+			if (!HeapTupleHeaderHasRootOffset(htup))
+			{
+				OffsetNumber	root_offnum;
+				heap_get_root_tuple_one(page,
+						offnum, &root_offnum);
+				HeapTupleHeaderSetHeapLatest(htup, root_offnum);
+			}
 		}
 		HeapTupleHeaderSetXmax(htup, xlrec->locking_xid);
 		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c
index 6529fe3..14ed263 100644
--- a/src/backend/access/heap/hio.c
+++ b/src/backend/access/heap/hio.c
@@ -31,12 +31,18 @@
  * !!! EREPORT(ERROR) IS DISALLOWED HERE !!!  Must PANIC on failure!!!
  *
  * Note - caller must hold BUFFER_LOCK_EXCLUSIVE on the buffer.
+ *
+ * The caller can optionally tell us to set the root offset to the given value.
+ * Otherwise, the root offset is set to the offset of the new location once its
+ * known. The former is used while updating an existing tuple while latter is
+ * used during insertion of a new row.
  */
 void
 RelationPutHeapTuple(Relation relation,
 					 Buffer buffer,
 					 HeapTuple tuple,
-					 bool token)
+					 bool token,
+					 OffsetNumber *root_offnum)
 {
 	Page		pageHeader;
 	OffsetNumber offnum;
@@ -60,16 +66,21 @@ RelationPutHeapTuple(Relation relation,
 	ItemPointerSet(&(tuple->t_self), BufferGetBlockNumber(buffer), offnum);
 
 	/*
-	 * Insert the correct position into CTID of the stored tuple, too (unless
-	 * this is a speculative insertion, in which case the token is held in
-	 * CTID field instead)
+	 * Set block number and the root offset into CTID of the stored tuple, too
+	 * (unless this is a speculative insertion, in which case the token is held
+	 * in CTID field instead)
 	 */
 	if (!token)
 	{
 		ItemId		itemId = PageGetItemId(pageHeader, offnum);
 		Item		item = PageGetItem(pageHeader, itemId);
 
+		/* Copy t_ctid to set the correct block number */
 		((HeapTupleHeader) item)->t_ctid = tuple->t_self;
+
+		if (!OffsetNumberIsValid(*root_offnum))
+			*root_offnum = offnum;
+		HeapTupleHeaderSetHeapLatest((HeapTupleHeader) item, *root_offnum);
 	}
 }
 
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index d69a266..2406e77 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -55,6 +55,8 @@ static void heap_prune_record_redirect(PruneState *prstate,
 static void heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum);
 static void heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum);
 
+static void heap_get_root_tuples_internal(Page page,
+				OffsetNumber target_offnum, OffsetNumber *root_offsets);
 
 /*
  * Optionally prune and repair fragmentation in the specified page.
@@ -553,6 +555,17 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 		if (!HeapTupleHeaderIsHotUpdated(htup))
 			break;
 
+
+		/*
+		 * If the tuple was HOT-updated and the update was later
+		 * aborted, someone could mark this tuple to be the last tuple
+		 * in the chain, without clearing the HOT-updated flag. So we must
+		 * check if this is the last tuple in the chain and stop following the
+		 * CTID, else we risk getting into an infinite recursion (though
+		 * prstate->marked[] currently protects against that)
+		 */
+		if (HeapTupleHeaderHasRootOffset(htup))
+			break;
 		/*
 		 * Advance to next chain member.
 		 */
@@ -726,27 +739,48 @@ heap_page_prune_execute(Buffer buffer,
 
 
 /*
- * For all items in this page, find their respective root line pointers.
- * If item k is part of a HOT-chain with root at item j, then we set
- * root_offsets[k - 1] = j.
+ * Either for all items in this page or for the given item, find their
+ * respective root line pointers.
+ *
+ * When target_offnum is a valid offset number, the caller is interested in
+ * just one item. In that case, the root line pointer is returned in
+ * root_offsets.
  *
- * The passed-in root_offsets array must have MaxHeapTuplesPerPage entries.
- * We zero out all unused entries.
+ * When target_offnum is a InvalidOffsetNumber then the caller wants to know
+ * the root line pointers of all the items in this page. The root_offsets array
+ * must have MaxHeapTuplesPerPage entries in that case. If item k is part of a
+ * HOT-chain with root at item j, then we set root_offsets[k - 1] = j. We zero
+ * out all unused entries.
  *
  * The function must be called with at least share lock on the buffer, to
  * prevent concurrent prune operations.
  *
+ * This is not a cheap function since it must scan through all line
+ * pointers and tuples on the page in order to find the root line pointers. To
+ * minimize the cost, we break early if target_offnum is specified and root
+ * line
+ * pointer to target_offnum is found.
+ *
  * Note: The information collected here is valid only as long as the caller
  * holds a pin on the buffer. Once pin is released, a tuple might be pruned
  * and reused by a completely unrelated tuple.
+ *
+ * Note: This function must not be called inside a critical section because it
+ * internally calls HeapTupleHeaderGetUpdateXid which somewhere down the stack
+ * may try to allocate heap memory. Memory allocation is disallowed in a
+ * critical section
  */
-void
-heap_get_root_tuples(Page page, OffsetNumber *root_offsets)
+static void
+heap_get_root_tuples_internal(Page page, OffsetNumber target_offnum,
+		OffsetNumber *root_offsets)
 {
 	OffsetNumber offnum,
 				maxoff;
 
-	MemSet(root_offsets, 0, MaxHeapTuplesPerPage * sizeof(OffsetNumber));
+	if (OffsetNumberIsValid(target_offnum))
+		*root_offsets = InvalidOffsetNumber;
+	else
+		MemSet(root_offsets, 0, MaxHeapTuplesPerPage * sizeof(OffsetNumber));
 
 	maxoff = PageGetMaxOffsetNumber(page);
 	for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum))
@@ -774,9 +808,28 @@ heap_get_root_tuples(Page page, OffsetNumber *root_offsets)
 
 			/*
 			 * This is either a plain tuple or the root of a HOT-chain.
-			 * Remember it in the mapping.
+			 *
+			 * If the target_offnum is specified and if we found its mapping,
+			 * return
 			 */
-			root_offsets[offnum - 1] = offnum;
+			if (OffsetNumberIsValid(target_offnum))
+			{
+				if (target_offnum == offnum)
+				{
+					root_offsets[0] = offnum;
+					return;
+				}
+				/*
+				 * No need to remember mapping for any other item. The
+				 * root_offsets array may not even has place for them. So be
+				 * careful about not writing past the array
+				 */
+			}
+			else
+			{
+				/* Remember it in the mapping  */
+				root_offsets[offnum - 1] = offnum;
+			}
 
 			/* If it's not the start of a HOT-chain, we're done with it */
 			if (!HeapTupleHeaderIsHotUpdated(htup))
@@ -817,15 +870,64 @@ heap_get_root_tuples(Page page, OffsetNumber *root_offsets)
 				!TransactionIdEquals(priorXmax, HeapTupleHeaderGetXmin(htup)))
 				break;
 
-			/* Remember the root line pointer for this item */
-			root_offsets[nextoffnum - 1] = offnum;
+			/*
+			 * If target_offnum is specified and we found its mapping, return
+			 */
+			if (OffsetNumberIsValid(target_offnum))
+			{
+				if (nextoffnum == target_offnum)
+				{
+					root_offsets[0] = offnum;
+					return;
+				}
+				/*
+				 * No need to remember mapping for any other item. The
+				 * root_offsets array may not even has place for them. So be
+				 * careful about not writing past the array
+				 */
+			}
+			else
+			{
+				/* Remember the root line pointer for this item */
+				root_offsets[nextoffnum - 1] = offnum;
+			}
 
 			/* Advance to next chain member, if any */
 			if (!HeapTupleHeaderIsHotUpdated(htup))
 				break;
 
+			/*
+			 * If the tuple was HOT-updated and the update was later aborted,
+			 * someone could mark this tuple to be the last tuple in the chain
+			 * and store root offset in CTID, without clearing the HOT-updated
+			 * flag. So we must check if CTID is actually root offset and break
+			 * to avoid infinite recursion
+			 */
+			if (HeapTupleHeaderHasRootOffset(htup))
+				break;
+
 			nextoffnum = ItemPointerGetOffsetNumber(&htup->t_ctid);
 			priorXmax = HeapTupleHeaderGetUpdateXid(htup);
 		}
 	}
 }
+
+/*
+ * Get root line pointer for the given tuple
+ */
+void
+heap_get_root_tuple_one(Page page, OffsetNumber target_offnum,
+		OffsetNumber *root_offnum)
+{
+	return heap_get_root_tuples_internal(page, target_offnum, root_offnum);
+}
+
+/*
+ * Get root line pointers for all tuples in the page
+ */
+void
+heap_get_root_tuples(Page page, OffsetNumber *root_offsets)
+{
+	return heap_get_root_tuples_internal(page, InvalidOffsetNumber,
+			root_offsets);
+}
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 90ab6f2..5f64ca6 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -419,14 +419,18 @@ rewrite_heap_tuple(RewriteState state,
 	 */
 	if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
 		  HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
-		!(ItemPointerEquals(&(old_tuple->t_self),
-							&(old_tuple->t_data->t_ctid))))
+		!(HeapTupleHeaderIsHeapLatest(old_tuple->t_data, &old_tuple->t_self)))
 	{
 		OldToNewMapping mapping;
 
 		memset(&hashkey, 0, sizeof(hashkey));
 		hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
-		hashkey.tid = old_tuple->t_data->t_ctid;
+
+		/* 
+		 * We've already checked that this is not the last tuple in the chain,
+		 * so fetch the next TID in the chain
+		 */
+		HeapTupleHeaderGetNextTid(old_tuple->t_data, &hashkey.tid);
 
 		mapping = (OldToNewMapping)
 			hash_search(state->rs_old_new_tid_map, &hashkey,
@@ -439,7 +443,7 @@ rewrite_heap_tuple(RewriteState state,
 			 * set the ctid of this tuple to point to the new location, and
 			 * insert it right away.
 			 */
-			new_tuple->t_data->t_ctid = mapping->new_tid;
+			HeapTupleHeaderSetNextTid(new_tuple->t_data, &mapping->new_tid);
 
 			/* We don't need the mapping entry anymore */
 			hash_search(state->rs_old_new_tid_map, &hashkey,
@@ -525,7 +529,7 @@ rewrite_heap_tuple(RewriteState state,
 				new_tuple = unresolved->tuple;
 				free_new = true;
 				old_tid = unresolved->old_tid;
-				new_tuple->t_data->t_ctid = new_tid;
+				HeapTupleHeaderSetNextTid(new_tuple->t_data, &new_tid);
 
 				/*
 				 * We don't need the hash entry anymore, but don't free its
@@ -731,7 +735,12 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
 		newitemid = PageGetItemId(page, newoff);
 		onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
 
-		onpage_tup->t_ctid = tup->t_self;
+		/* 
+		 * Set t_ctid just to ensure that block number is copied correctly, but
+		 * then immediately mark the tuple as the latest
+		 */
+		HeapTupleHeaderSetNextTid(onpage_tup, &tup->t_self);
+		HeapTupleHeaderSetHeapLatest(onpage_tup, newoff);
 	}
 
 	/* If heaptup is a private copy, release it. */
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c
index 8d119f6..9920f48 100644
--- a/src/backend/executor/execIndexing.c
+++ b/src/backend/executor/execIndexing.c
@@ -788,7 +788,8 @@ retry:
 			  DirtySnapshot.speculativeToken &&
 			  TransactionIdPrecedes(GetCurrentTransactionId(), xwait))))
 		{
-			ctid_wait = tup->t_data->t_ctid;
+			if (!HeapTupleHeaderIsHeapLatest(tup->t_data, &tup->t_self))
+				HeapTupleHeaderGetNextTid(tup->t_data, &ctid_wait);
 			reason_wait = indexInfo->ii_ExclusionOps ?
 				XLTW_RecheckExclusionConstr : XLTW_InsertIndex;
 			index_endscan(index_scan);
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index ff277d3..9182fa7 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -2563,7 +2563,7 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
 		 * As above, it should be safe to examine xmax and t_ctid without the
 		 * buffer content lock, because they can't be changing.
 		 */
-		if (ItemPointerEquals(&tuple.t_self, &tuple.t_data->t_ctid))
+		if (HeapTupleHeaderIsHeapLatest(tuple.t_data, &tuple.t_self))
 		{
 			/* deleted, so forget about it */
 			ReleaseBuffer(buffer);
@@ -2571,7 +2571,7 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
 		}
 
 		/* updated, so look at the updated row */
-		tuple.t_self = tuple.t_data->t_ctid;
+		HeapTupleHeaderGetNextTid(tuple.t_data, &tuple.t_self);
 		/* updated row should have xmin matching this xmax */
 		priorXmax = HeapTupleHeaderGetUpdateXid(tuple.t_data);
 		ReleaseBuffer(buffer);
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index ee7e05a..22507dc 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -188,6 +188,8 @@ extern void heap_page_prune_execute(Buffer buffer,
 						OffsetNumber *redirected, int nredirected,
 						OffsetNumber *nowdead, int ndead,
 						OffsetNumber *nowunused, int nunused);
+extern void heap_get_root_tuple_one(Page page, OffsetNumber target_offnum,
+		OffsetNumber *root_offnum);
 extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets);
 
 /* in heap/syncscan.c */
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 52f28b8..a4a1fe1 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -193,6 +193,8 @@ typedef struct xl_heap_update
 	uint8		flags;
 	TransactionId new_xmax;		/* xmax of the new tuple */
 	OffsetNumber new_offnum;	/* new tuple's offset */
+	OffsetNumber root_offnum;	/* offset of the root line pointer in case of
+								   HOT or WARM update */
 
 	/*
 	 * If XLOG_HEAP_CONTAINS_OLD_TUPLE or XLOG_HEAP_CONTAINS_OLD_KEY flags are
@@ -200,7 +202,7 @@ typedef struct xl_heap_update
 	 */
 } xl_heap_update;
 
-#define SizeOfHeapUpdate	(offsetof(xl_heap_update, new_offnum) + sizeof(OffsetNumber))
+#define SizeOfHeapUpdate	(offsetof(xl_heap_update, root_offnum) + sizeof(OffsetNumber))
 
 /*
  * This is what we need to know about vacuum page cleanup/redirect
diff --git a/src/include/access/hio.h b/src/include/access/hio.h
index 2824f23..8752f69 100644
--- a/src/include/access/hio.h
+++ b/src/include/access/hio.h
@@ -36,7 +36,7 @@ typedef struct BulkInsertStateData
 
 
 extern void RelationPutHeapTuple(Relation relation, Buffer buffer,
-					 HeapTuple tuple, bool token);
+					 HeapTuple tuple, bool token, OffsetNumber *root_offnum);
 extern Buffer RelationGetBufferForTuple(Relation relation, Size len,
 						  Buffer otherBuffer, int options,
 						  BulkInsertState bistate,
diff --git a/src/include/access/htup_details.h b/src/include/access/htup_details.h
index fae955e..11bd1c8 100644
--- a/src/include/access/htup_details.h
+++ b/src/include/access/htup_details.h
@@ -260,13 +260,19 @@ struct HeapTupleHeaderData
  * information stored in t_infomask2:
  */
 #define HEAP_NATTS_MASK			0x07FF	/* 11 bits for number of attributes */
-/* bits 0x1800 are available */
+/* bits 0x0800 are available */
+#define HEAP_LATEST_TUPLE		0x1000	/*
+										 * This is the last tuple in chain and
+										 * ip_posid points to the root line
+										 * pointer
+										 */
 #define HEAP_KEYS_UPDATED		0x2000	/* tuple was updated and key cols
 										 * modified, or tuple deleted */
 #define HEAP_HOT_UPDATED		0x4000	/* tuple was HOT-updated */
 #define HEAP_ONLY_TUPLE			0x8000	/* this is heap-only tuple */
 
-#define HEAP2_XACT_MASK			0xE000	/* visibility-related bits */
+#define HEAP2_XACT_MASK			0xF000	/* visibility-related bits */
+
 
 /*
  * HEAP_TUPLE_HAS_MATCH is a temporary flag used during hash joins.  It is
@@ -504,6 +510,32 @@ do { \
   (tup)->t_infomask2 & HEAP_ONLY_TUPLE \
 )
 
+#define HeapTupleHeaderSetHeapLatest(tup, offnum) \
+do { \
+	AssertMacro(OffsetNumberIsValid(offnum)); \
+	(tup)->t_infomask2 |= HEAP_LATEST_TUPLE; \
+	ItemPointerSetOffsetNumber(&(tup)->t_ctid, (offnum)); \
+} while (0)
+
+#define HeapTupleHeaderClearHeapLatest(tup) \
+( \
+	(tup)->t_infomask2 &= ~HEAP_LATEST_TUPLE \
+)
+
+/*
+ * If HEAP_LATEST_TUPLE is set in the last tuple in the update chain. But for
+ * clusters which are upgraded from pre-10.0 release, we still check if c_tid
+ * is pointing to itself and declare such tuple as the latest tuple in the
+ * chain
+ */
+#define HeapTupleHeaderIsHeapLatest(tup, tid) \
+( \
+  (((tup)->t_infomask2 & HEAP_LATEST_TUPLE) != 0) || \
+  ((ItemPointerGetBlockNumber(&(tup)->t_ctid) == ItemPointerGetBlockNumber(tid)) && \
+   (ItemPointerGetOffsetNumber(&(tup)->t_ctid) == ItemPointerGetOffsetNumber(tid))) \
+)
+
+
 #define HeapTupleHeaderSetHeapOnly(tup) \
 ( \
   (tup)->t_infomask2 |= HEAP_ONLY_TUPLE \
@@ -542,6 +574,45 @@ do { \
 
 
 /*
+ * Set the t_ctid chain and also clear the HEAP_LATEST_TUPLE flag since we
+ * probably have a new tuple in the chain
+ */
+#define HeapTupleHeaderSetNextTid(tup, tid) \
+do { \
+		ItemPointerCopy((tid), &((tup)->t_ctid)); \
+		HeapTupleHeaderClearHeapLatest((tup)); \
+} while (0)
+
+/*
+ * Get TID of next tuple in the update chain. Caller should have checked that
+ * we are not already at the end of the chain because in that case t_ctid may
+ * actually store the root line pointer of the HOT chain whose member this
+ * tuple is.
+ */
+#define HeapTupleHeaderGetNextTid(tup, next_ctid) \
+do { \
+	AssertMacro(!((tup)->t_infomask2 & HEAP_LATEST_TUPLE)); \
+	ItemPointerCopy(&(tup)->t_ctid, (next_ctid)); \
+} while (0)
+
+#define HeapTupleHeaderGetRootOffset(tup) \
+( \
+	AssertMacro(((tup)->t_infomask2 & HEAP_LATEST_TUPLE) != 0), \
+	ItemPointerGetOffsetNumber(&(tup)->t_ctid) \
+)
+
+/*
+ * We use the same HEAP_LATEST_TUPLE flag to check if the tuple's t_ctid field
+ * contains the root line pointer. We can't use the same
+ * HeapTupleHeaderIsHeapLatest macro because that also checks for TID-equality
+ * to decide whether a tuple is at the of the chain
+ */
+#define HeapTupleHeaderHasRootOffset(tup) \
+( \
+	((tup)->t_infomask2 & HEAP_LATEST_TUPLE) != 0 \
+)
+
+/*
  * BITMAPLEN(NATTS) -
  *		Computes size of null bitmap given number of data columns.
  */
