commit 204dc713ef986a531065de5bf67491dbe6cddf97
Author: Vinayak Pokale <vinpokale@gmail.com>
Date:   Fri Sep 23 15:42:55 2016 +0900

    Support transaction with foreign servers.

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index cd66abc..7bbd2d4 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1418,6 +1418,48 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-max-prepared-foreign-transactions" xreflabel="max_prepared_foreign_transactions">
+      <term><varname>max_prepared_foreign_transactions</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>max_prepared_foreign_transactions</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Sets the maximum number of foreign transactions that can be prepared
+        simultaneously.
+        If this parameter is set to zero (which is the default) and
+        <xref linkend="guc-atomic-foreign-transaction"> is enabled,
+        transactions involving foreign servers will not succeed, because foreign
+        transactions can not be prepared.
+        This parameter can only be set at server start.
+       </para>
+
+       <para>
+        When running a standby server, you must set this parameter to the
+        same or higher value than on the master server. Otherwise, queries
+        will not be allowed in the standby server.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry id="guc-atomic-foreign-transaction" xreflabel="atomic_foreign_transaction">
+      <term><varname>atomic_foreign_transaction</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>atomic_foreign_transaction</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+       When this parameter is enabled the transaction involving foreign server/s is
+       guaranteed to commit all or none of the changes to the foreign server/s.
+       The parameter can be set any time during the session. The value of this parameter
+       at the time of committing the transaction is used.
+       </para>
+
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-work-mem" xreflabel="work_mem">
       <term><varname>work_mem</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml
index 0c1db07..0077e6e 100644
--- a/doc/src/sgml/fdwhandler.sgml
+++ b/doc/src/sgml/fdwhandler.sgml
@@ -1701,4 +1701,86 @@ GetForeignServerByName(const char *name, bool missing_ok);
 
   </sect1>
 
+   <sect1 id="fdw-transactions">
+    <title>Transaction manager for Foreign Data Wrappers</title>
+
+    <para>
+    <productname>PostgreSQL</> transaction manager allows FDWs to read and write
+    data on foreign server within a transaction while maintaining atomicity
+    (and hence consistency) of the foreign data. Every Foreign Data Wrapper is
+    required to register the foreign server along with the <productname>PostgreSQL</>
+    user whose user mapping is used to connect to the foreign server while starting a
+    transaction on the foreign server as part of the transaction on
+    <productname>PostgreSQL</> using <function>RegisterXactForeignServer</>.
+<programlisting>
+void
+RegisterXactForeignServer(Oid serverid,
+                            Oid userid,
+                            bool two_phase_compliant)
+</programlisting>
+    <varname>two_phase_compliant</> should be true if the foreign server supports
+    two-phase commit protocol, false otherwise.
+    </para>
+
+    <para>
+    An example of such transaction is as follows
+<programlisting>
+BEGIN;
+UPDATE ft1 SET col = 'a';
+UPDATE ft2 SET col = 'b';
+COMMIT;
+</programlisting>
+    ft1 and ft2 are foreign tables on different foreign servers may be using different
+    Foreign Data Wrappers.
+    </para>
+
+    <para>
+    When <varname>atomic_foreign_transaction</> is enabled
+    <productname>PostgreSQL</> employs Two-phase commit protocol to achieve
+    atomic distributed transaction. All the foreign servers registered should
+    support two-phase commit protocol. In Two-phase commit protocol the commit
+    is processed in two phases: prepare phase and commit phase. In prepare phase,
+    <productname>PostgreSQL</> prepares the transactions on all the foreign
+    servers registered using <function>RegisterXactForeignServer</>.
+    If any of the foreign server fails to prepare transaction, prepare phase fails.
+    In commit phase, all the prepared transactions are committed if prepare
+    phase has succeeded or rolled back if prepare phase fails to prepare
+    transactions on all the foreign servers.
+    </para>
+
+    <para>
+    During prepare phase the distributed transaction manager calls
+    <function>GetPrepareInfo</> to get the prepared transaction identifier for
+    each foreign server involved. It stores this identifier along with the
+    serverid and userid for later use. It then calls
+    <function>HandleForeignTransaction</> with the same identifier with action
+    FDW_XACT_PREPARE.
+    </para>
+    
+    <para>
+    During commit phase the distributed transaction manager calls
+    <function>HandleForeignTransaction</> with the same identifier with action
+    FDW_XACT_COMMIT_PREPARED to commit the prepared transaction or
+    FDW_XACT_ABORT_PREPARED to rollback the prepared transaction. In case the
+    distributed transaction manager fails to commit or rollback a prepared
+    transaction because of connection failure, the operation can be tried again
+    through built-in <function>pg_fdw_xact</>. One may set up a background worker
+    process to retry the operation by installing extension pg_fdw_xact_resolver
+    and including $libdir/pg_fdw_xact_resolver.so in
+    <varname>shared_preload_libraries</>.
+    </para>
+
+    <para>
+    When <varname>atomic_foreign_transaction</> is disabled, atomicity can not be
+    guaranteed across foreign servers. If transaction on <productname>PostgreSQL</>
+    is committed, Distributed transaction manager calls
+    <function>HandleForeignTransaction</> to commit the transaction on all the
+    foreign servers registered using <function>RegisterXactForeignServer</>,
+    independent of the outcome of the same operation on other foreign servers.
+    Thus transactions on some foreign servers may be committed, while the same
+    on other foreign servers would be rolled back. If the transaction on
+    <productname>PostgreSQL</> aborts transactions on all the foreign servers
+    are aborted too.
+    </para>
+    </sect1>
  </chapter>
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index 5514db1..6e23ec1 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -8,9 +8,10 @@ subdir = src/backend/access/rmgrdesc
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o genericdesc.o \
-	   gindesc.o gistdesc.o hashdesc.o heapdesc.o logicalmsgdesc.o \
-	   mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o seqdesc.o \
-	   smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
+OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o fdw_xactdesc.o \
+	   genericdesc.o gindesc.o gistdesc.o hashdesc.o heapdesc.o \
+	   logicalmsgdesc.o mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o \
+	   seqdesc.o smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o xactdesc.o \
+	   xlogdesc.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/fdw_xactdesc.c b/src/backend/access/rmgrdesc/fdw_xactdesc.c
new file mode 100644
index 0000000..b01ccf8
--- /dev/null
+++ b/src/backend/access/rmgrdesc/fdw_xactdesc.c
@@ -0,0 +1,61 @@
+/*-------------------------------------------------------------------------
+ *
+ * fdw_xactdesc.c
+ *		PostgreSQL distributed transaction manager.
+ *
+ * This module describes the WAL records for foreign transaction manager.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/fdw_xactdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/fdw_xact.h"
+#include "access/xloginsert.h"
+#include "lib/stringinfo.h"
+
+extern void
+fdw_xact_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info == XLOG_FDW_XACT_INSERT)
+	{
+		FDWXactOnDiskData *fdw_insert_xlog = (FDWXactOnDiskData *)rec;
+		appendStringInfo(buf, "Foreign server oid: %u", fdw_insert_xlog->serverid);
+		appendStringInfo(buf, " user oid: %u", fdw_insert_xlog->userid);
+		appendStringInfo(buf, " database id: %u", fdw_insert_xlog->dboid);
+		/* TODO: This should be really interpreted by each FDW */
+		/* TODO: we also need to assess whether we want to add this information */
+		appendStringInfo(buf, " foreign transaction info: ");
+		appendStringInfo(buf, "%.*s", fdw_insert_xlog->fdw_xact_id_len,
+							fdw_insert_xlog->fdw_xact_id);
+	}
+	else
+	{
+		FdwRemoveXlogRec	*fdw_remove_xlog = (FdwRemoveXlogRec *)rec;
+		appendStringInfo(buf, "Foreign server oid: %u", fdw_remove_xlog->serverid);
+		appendStringInfo(buf, " user oid: %u", fdw_remove_xlog->userid);
+		appendStringInfo(buf, " database id: %u", fdw_remove_xlog->dbid);
+	}
+
+}
+
+extern const char *
+fdw_xact_identify(uint8 info)
+{
+	switch(info & ~XLR_INFO_MASK)
+	{
+		case XLOG_FDW_XACT_INSERT:
+			return "NEW FOREIGN TRANSACTION";
+		case XLOG_FDW_XACT_REMOVE:
+			return "REMOVE FOREIGN TRANSACTION";
+	}
+	/* Keep compiler happy */
+	return NULL;
+}
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index 62ed1dc..c2f36c7 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -112,14 +112,15 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 		appendStringInfo(buf, "max_connections=%d max_worker_processes=%d "
 						 "max_prepared_xacts=%d max_locks_per_xact=%d "
 						 "wal_level=%s wal_log_hints=%s "
-						 "track_commit_timestamp=%s",
+						 "track_commit_timestamp=%s max_fdw_xacts=%d",
 						 xlrec.MaxConnections,
 						 xlrec.max_worker_processes,
 						 xlrec.max_prepared_xacts,
 						 xlrec.max_locks_per_xact,
 						 wal_level_str,
 						 xlrec.wal_log_hints ? "on" : "off",
-						 xlrec.track_commit_timestamp ? "on" : "off");
+						 xlrec.track_commit_timestamp ? "on" : "off",
+						 xlrec.max_fdw_xacts);
 	}
 	else if (info == XLOG_FPW_CHANGE)
 	{
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 16fbe47..dd7ee32 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
 OBJS = clog.o commit_ts.o generic_xlog.o multixact.o parallel.o rmgr.o slru.o \
 	subtrans.o timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
 	xact.o xlog.o xlogarchive.o xlogfuncs.o \
-	xloginsert.o xlogreader.o xlogutils.o
+	xloginsert.o xlogreader.o xlogutils.o fdw_xact.o
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/access/transam/fdw_xact.c b/src/backend/access/transam/fdw_xact.c
new file mode 100644
index 0000000..df305e5
--- /dev/null
+++ b/src/backend/access/transam/fdw_xact.c
@@ -0,0 +1,2034 @@
+/*-------------------------------------------------------------------------
+ *
+ * fdw_xact.c
+ *		PostgreSQL distributed transaction manager.
+ *
+ * This module manages the transactions involving foreign servers.
+ *
+ * Copyright (c) 2016, PostgreSQL Global Development Group
+ *
+ * src/backend/access/transam/fdw_xact.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "funcapi.h"
+
+#include "access/fdw_xact.h"
+#include "access/htup_details.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xloginsert.h"
+#include "catalog/pg_type.h"
+#include "foreign/foreign.h"
+#include "foreign/fdwapi.h"
+#include "libpq/pqsignal.h"
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/lock.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+
+/*
+ * This comment summarises how the transaction manager handles transactions
+ * involving one or more foreign server/s.
+ *
+ * When an foreign data wrapper starts transaction on a foreign server, it is
+ * required to register the foreign server and user who initiated the
+ * transaction using function RegisterXactForeignServer(). A foreign server
+ * connection is identified by oid of foreign server and user.
+ *
+ * The commit is executed in two phases:
+ * First phase (executed during pre-commit processing)
+ * -----------
+ * Transactions are prepared on all the foreign servers, which can participate
+ * in two-phase commit protocol. Transaction on other foreign servers are
+ * committed in the same phase.
+ *
+ * Second phase (executed during post-commit/abort processing)
+ * ------------
+ * If first phase succeeds, foreign servers are requested to commit respective
+ * prepared transactions. If the first phase  does not succeed because of any
+ * failure, the foreign servers are asked to rollback respective prepared
+ * transactions or abort the transactions if they are not prepared.
+ *
+ * Any network failure, server crash after preparing foreign transaction leaves
+ * that prepared transaction unresolved. During the first phase, before actually
+ * preparing the transactions, enough information is persisted to the disk and
+ * logs in order to resolve such transactions.
+ */
+
+/* Shared memory entry for a prepared or being prepared foreign transaction */
+typedef struct FDWXactData	*FDWXact;
+
+/* Structure to bundle the foreign connection participating in transaction */
+typedef struct
+{
+	Oid							serverid;
+	Oid							userid;
+	Oid							umid;
+	char						*servername;
+	FDWXact						fdw_xact;	/* foreign prepared transaction entry
+											   in case prepared */
+	bool						two_phase_commit;	/* Should use two phase commit
+													 * protocol while committing
+													 * transaction on this
+													 * server, whenever
+													 * necessary.
+													 */
+	GetPrepareId_function		prepare_id_provider;
+	EndForeignTransaction_function	end_foreing_xact;
+	PrepareForeignTransaction_function	prepare_foreign_xact;
+	ResolvePreparedForeignTransaction_function	resolve_prepared_foreign_xact;
+} FDWConnection;
+
+/* List of foreign connections participating in the transaction */
+List	*MyFDWConnections = NIL;
+
+/*
+ * By default we assume that all the foreign connections participating in this
+ * transaction can use two phase commit protocol.
+ */
+bool	TwoPhaseReady = true;
+
+/* Record the server, userid participating in the transaction. */
+void
+RegisterXactForeignServer(Oid serverid, Oid userid, bool two_phase_commit)
+{
+	FDWConnection	*fdw_conn;
+	ListCell		*lcell;
+	ForeignServer	*foreign_server;
+	ForeignDataWrapper	*fdw;
+	UserMapping		*user_mapping;
+	FdwRoutine		*fdw_routine;
+	MemoryContext	old_context;
+
+	TwoPhaseReady = TwoPhaseReady && two_phase_commit;
+
+	/* Check if the entry already exists, if so, raise an error */
+	foreach(lcell, MyFDWConnections)
+	{
+		fdw_conn = lfirst(lcell);
+
+		if (fdw_conn->serverid == serverid &&
+			fdw_conn->userid == userid)
+			ereport(ERROR,
+					(errmsg("attempt to start transction again on server %u user %u",
+							serverid, userid)));
+	}
+
+	/* This list and its contents needs to be saved in the transaction context memory */
+	old_context = MemoryContextSwitchTo(TopTransactionContext);
+	/* Add this foreign connection to the list for transaction management */
+	fdw_conn = (FDWConnection *) palloc(sizeof(FDWConnection));
+
+	/* Make sure that the FDW has at least a transaction handler */
+	foreign_server = GetForeignServer(serverid);
+	fdw = GetForeignDataWrapper(foreign_server->fdwid);
+	fdw_routine = GetFdwRoutine(fdw->fdwhandler);
+	user_mapping = GetUserMapping(userid, serverid);
+
+	if (!fdw_routine->EndForeignTransaction)
+			elog(ERROR, "no function to end a foreign transaction provided for FDW %s",
+					fdw->fdwname);
+
+	if (two_phase_commit)
+	{
+		if (!fdw_routine->GetPrepareId)
+			elog(ERROR, "no prepared transaction identifier provider function for FDW %s",
+					fdw->fdwname);
+
+		if (!fdw_routine->PrepareForeignTransaction)
+			elog(ERROR, "no function provided for preparing foreign transaction for FDW %s",
+					fdw->fdwname);
+
+		if (!fdw_routine->ResolvePreparedForeignTransaction)
+			elog(ERROR, "no function provided for resolving prepared foreign transaction for FDW %s",
+					fdw->fdwname);
+	}
+
+	fdw_conn->serverid = serverid;
+	fdw_conn->userid = userid;
+	fdw_conn->umid = user_mapping->umid;
+
+	/*
+	 * We may need following information at the end of a transaction, when the
+	 * system caches are not available. So save it before hand.
+	 */
+	fdw_conn->servername = foreign_server->servername;
+	fdw_conn->prepare_id_provider = fdw_routine->GetPrepareId;
+	fdw_conn->prepare_foreign_xact = fdw_routine->PrepareForeignTransaction;
+	fdw_conn->resolve_prepared_foreign_xact = fdw_routine->ResolvePreparedForeignTransaction;
+	fdw_conn->end_foreing_xact = fdw_routine->EndForeignTransaction;
+	fdw_conn->fdw_xact = NULL;
+	fdw_conn->two_phase_commit = two_phase_commit;
+	MyFDWConnections = lappend(MyFDWConnections, fdw_conn);
+	/* Revert back the context */
+	MemoryContextSwitchTo(old_context);
+
+	return;
+}
+
+/* Prepared transaction identifier can be maximum 256 bytes long */
+#define MAX_FDW_XACT_ID_LEN	256
+
+/* Enum to track the status of prepared foreign transaction */
+typedef enum
+{
+	FDW_XACT_PREPARING,			/* foreign transaction is (being) prepared */
+	FDW_XACT_COMMITTING_PREPARED,	/* foreign prepared transaction is to be committed */
+	FDW_XACT_ABORTING_PREPARED,	/* foreign prepared transaction is to be aborted */
+	FDW_XACT_RESOLVED			/* Status used only by pg_fdw_resolve().
+								   It doesn't appear in the in-memory entry. */
+} FDWXactStatus;
+
+typedef struct FDWXactData
+{
+	FDWXact			fx_next;	/* Next free FDWXact entry */
+	Oid				dboid;		/* database oid where to find foreign server and
+								 * user mapping
+								 */
+	TransactionId	local_xid;	/* XID of local transaction */
+	Oid				serverid;	/* foreign server where transaction takes place */
+	Oid				userid;		/* user who initiated the foreign transaction */
+	Oid				umid;
+	FDWXactStatus	fdw_xact_status;	/* The state of the foreign transaction.
+										   This doubles as the action to be
+										   taken on this entry.*/
+	XLogRecPtr		fdw_xact_lsn;		/* LSN of the log record for inserting this entry */
+	bool			fdw_xact_valid;		/* Has the entry been complete and written to file? */
+	BackendId		locking_backend;	/* Backend working on this entry */
+	int				fdw_xact_id_len;	/* Length of prepared transaction identifier */
+	char			fdw_xact_id[MAX_FDW_XACT_ID_LEN];	/* prepared transaction identifier */
+} FDWXactData;
+
+/* Directory where the foreign prepared transaction files will reside */
+#define FDW_XACTS_DIR "pg_fdw_xact"
+
+/*
+ * Name of foreign prepared transaction file is 8 bytes xid, 8 bytes foreign
+ * server oid and 8 bytes user oid separated by '_'.
+ */
+#define FDW_XACT_FILE_NAME_LEN (8 + 1 + 8 + 1 + 8)
+#define FDWXactFilePath(path, xid, serverid, userid)	\
+	snprintf(path, MAXPGPATH, FDW_XACTS_DIR "/%08X_%08X_%08X", xid, \
+							serverid, userid)
+
+/* Shared memory layout for maintaining foreign prepared transaction entries. */
+typedef struct
+{
+	/* Head of linked list of free FDWXactData structs */
+	FDWXact		freeFDWXacts;
+
+	/* Number of valid FDW transaction entries */
+	int			num_fdw_xacts;
+
+	/* Upto max_fdw_xacts entries in the array */
+	FDWXact		fdw_xacts[FLEXIBLE_ARRAY_MEMBER];	/* Variable length array */
+} FDWXactGlobalData;
+
+static void AtProcExit_FDWXact(int code, Datum arg);
+static bool resolve_fdw_xact(FDWXact fdw_xact,
+							ResolvePreparedForeignTransaction_function prepared_foreign_xact_resolver);
+static FDWXact insert_fdw_xact(Oid dboid, TransactionId xid, Oid serverid, Oid userid,
+										int fdw_xact_id_len, char *fdw_xact_id,
+										FDWXactStatus fdw_xact_status);
+static void unlock_fdw_xact(FDWXact fdw_xact);
+static void unlock_fdw_xact_entries();
+static void remove_fdw_xact(FDWXact fdw_xact);
+static FDWXact register_fdw_xact(Oid dbid, TransactionId xid, Oid serverid, Oid userid,
+								 Oid umid, int fdw_xact_info_len, char *fdw_xact_info);
+static int GetFDWXactList(FDWXact *fdw_xacts);
+static ResolvePreparedForeignTransaction_function get_prepared_foreign_xact_resolver(FDWXact fdw_xact);
+static FDWXactOnDiskData *ReadFDWXactFile(TransactionId xid, Oid serverid,
+											Oid userid);
+static void RemoveFDWXactFile(TransactionId xid, Oid serverid, Oid userid,
+								bool giveWarning);
+static void prepare_foreign_transactions(void);
+bool search_fdw_xact(TransactionId xid, Oid dbid, Oid serverid, Oid userid,
+						List **qualifying_xacts);
+
+/*
+ * Maximum number of foreign prepared transaction entries at any given time
+ * GUC variable, change requires restart.
+ */
+int	max_fdw_xacts = 0;
+
+/* Keep track of registering process exit call back. */
+static bool fdwXactExitRegistered = false;
+
+/* Pointer to the shared memory holding the foreign transactions data */
+static FDWXactGlobalData	*FDWXactGlobal;
+
+/* foreign transaction entries locked by this backend */
+List	*MyLockedFDWXacts = NIL;
+
+/*
+ * FDWXactShmemSize
+ * Calculates the size of shared memory allocated for maintaining foreign
+ * prepared transaction entries.
+ */
+extern Size
+FDWXactShmemSize(void)
+{
+	Size		size;
+
+	/* Need the fixed struct, foreign transaction information array */
+	size = offsetof(FDWXactGlobalData, fdw_xacts);
+	size = add_size(size, mul_size(max_fdw_xacts,
+								   sizeof(FDWXact)));
+	size = MAXALIGN(size);
+	size = add_size(size, mul_size(max_fdw_xacts,
+								   sizeof(FDWXactData)));
+
+	return size;
+}
+
+/*
+ * FDWXactShmemInit
+ * Initialization of shared memory for maintaining foreign prepared transaction
+ * entries. The shared memory layout is defined in definition of
+ * FDWXactGlobalData structure.
+ */
+extern void
+FDWXactShmemInit(void)
+{
+	bool		found;
+
+	FDWXactGlobal = ShmemInitStruct("Foreign transactions table",
+									FDWXactShmemSize(),
+									&found);
+	if (!IsUnderPostmaster)
+	{
+		FDWXact	fdw_xacts;
+		int		cnt;
+
+		Assert(!found);
+		FDWXactGlobal->freeFDWXacts = NULL;
+		FDWXactGlobal->num_fdw_xacts = 0;
+
+		/* Initialise the linked list of free FDW transactions */
+		fdw_xacts = (FDWXact)
+			((char *) FDWXactGlobal +
+			 MAXALIGN(offsetof(FDWXactGlobalData, fdw_xacts) +
+					  sizeof(FDWXact) * max_fdw_xacts));
+		for (cnt = 0; cnt < max_fdw_xacts; cnt++)
+		{
+			fdw_xacts[cnt].fx_next = FDWXactGlobal->freeFDWXacts;
+			FDWXactGlobal->freeFDWXacts = &fdw_xacts[cnt];
+		}
+	}
+	else
+	{
+		Assert(FDWXactGlobal);
+		Assert(found);
+	}
+}
+
+/*
+ * PreCommit_FDWXacts
+ * The function is responsible for pre-commit processing on foreign connections.
+ * The foreign transactions are prepared on the foreign servers which can
+ * execute two-phase-commit protocol. Those will be aborted or committed after
+ * the current transaction has been aborted or committed resp. We try to commit
+ * transactions on rest of the foreign servers now. For these foreign servers
+ * it is possible that some transactions commit even if the local transaction
+ * aborts.
+ */
+void
+PreCommit_FDWXacts(void)
+{
+	ListCell	*cur;
+	ListCell	*prev;
+	ListCell	*next;
+
+	/* If there are no foreign servers involved, we have no business here */
+	if (list_length(MyFDWConnections) < 1)
+		return;
+
+	/*
+	 * Try committing transactions on the foreign servers, which can not execute
+	 * two-phase-commit protocol.
+	 */
+	for (cur = list_head(MyFDWConnections), prev = NULL; cur; cur = next)
+	{
+		FDWConnection *fdw_conn = lfirst(cur);
+		next = lnext(cur);
+
+		if (!fdw_conn->two_phase_commit)
+		{
+			/*
+			 * The FDW has to make sure that the connection opened to the
+			 * foreign server is out of transaction. Even if the handler
+			 * function returns failure statue, there's hardly anything to do.
+			 */
+			if (!fdw_conn->end_foreing_xact(fdw_conn->serverid, fdw_conn->userid,
+											fdw_conn->umid, true))
+				elog(WARNING, "could not commit transaction on server %s",
+								fdw_conn->servername);
+
+			/* The connection is no more part of this transaction, forget it */
+			MyFDWConnections = list_delete_cell(MyFDWConnections, cur, prev);
+		}
+		else
+			prev = cur;
+	}
+
+	/*
+	 * Prepare the transactions on the foreign servers, which can execute
+	 * two-phase-commit protocol.
+	 */
+	prepare_foreign_transactions();
+}
+
+/*
+ * Prepare transactions on the foreign servers which can execute two phase
+ * commit protocol. Rest of the foreign servers are ignored.
+ */
+static void
+prepare_foreign_transactions(void)
+{
+	ListCell	*lcell;
+
+	/*
+	 * Loop over the foreign connections
+	 */
+	foreach(lcell, MyFDWConnections)
+	{
+		FDWConnection	*fdw_conn = (FDWConnection *)lfirst(lcell);
+		char			*fdw_xact_info;
+		int				fdw_xact_info_len;
+		FDWXact			fdw_xact;
+
+		if (!fdw_conn->two_phase_commit)
+			continue;
+
+		Assert(fdw_conn->prepare_id_provider);
+		fdw_xact_info = fdw_conn->prepare_id_provider(fdw_conn->serverid,
+													  fdw_conn->userid,
+													  &fdw_xact_info_len);
+
+		/*
+		 * Register the foreign transaction with the identifier used to prepare
+		 * it on the foreign server. Registration persists this information to
+		 * the disk and logs (that way relaying it on standby). Thus in case we
+		 * loose connectivity to the foreign server or crash ourselves, we will
+		 * remember that we have prepared transaction on the foreign server and
+		 * try to resolve it when connectivity is restored or after crash
+		 * recovery.
+		 *
+		 * If we crash after persisting the information but before preparing the
+		 * transaction on the foreign server, we will try to resolve a
+		 * never-prepared transaction, and get an error. This is fine as long as
+		 * the FDW provides us unique prepared transaction identifiers.
+		 *
+		 * If we prepare the transaction on the foreign server before persisting
+		 * the information to the disk and crash in-between these two steps, we
+		 * will forget that we prepared the transaction on the foreign server
+		 * and will not be able to resolve it after the crash. Hence persist
+		 * first then prepare.
+		 */
+		fdw_xact = register_fdw_xact(MyDatabaseId, GetTopTransactionId(),
+									 fdw_conn->serverid, fdw_conn->userid,
+									 fdw_conn->umid, fdw_xact_info_len,
+									 fdw_xact_info);
+		/*
+		 * Between register_fdw_xact call above till this backend hears back
+		 * from foreign server, the backend may abort the local transaction (say,
+		 * because of a signal). During abort processing, it will send an ABORT
+		 * message to the foreign server. If the foreign server has not prepared
+		 * the transaction, the message will succeed. If the foreign server has
+		 * prepared transaction, it will throw an error, which we will ignore and the
+		 * prepared foreign transaction will be resolved by the foreign transaction
+		 * resolver.
+		 */
+		if (!fdw_conn->prepare_foreign_xact(fdw_conn->serverid, fdw_conn->userid,
+											fdw_xact_info_len, fdw_xact_info))
+		{
+			/*
+			 * An error occured, and we didn't prepare the transaction. Delete the
+			 * entry from foreign transaction table. Raise an error, so that the
+			 * local server knows that one of the foreign server has failed to
+			 * prepare the transaction.
+			 * TODO:
+			 * FDW is expected to print the error as a warning and then we
+			 * raise actual error here. But instead, we should pull the
+			 * error text from FDW and add it here in the message or as a
+			 * context or a hint.
+			 */
+			remove_fdw_xact(fdw_xact);
+
+			/*
+			 * Delete the connection, since it doesn't require any further
+			 * processing. This deletion will invalidate current cell
+			 * pointer, but that is fine since we will not use that pointer
+			 * because the subsequent ereport will get us out of this loop.
+			 */
+			MyFDWConnections = list_delete_ptr(MyFDWConnections, fdw_conn);
+			ereport(ERROR,
+					(errmsg("can not prepare transaction on foreign server %s",
+							fdw_conn->servername)));
+		}
+
+		/* Prepare succeeded, remember it in the connection */
+		fdw_conn->fdw_xact = fdw_xact;
+	}
+	return;
+}
+/*
+ * register_fdw_xact
+ * This function is used to create new foreign transaction entry before an FDW
+ * executes the first phase of two-phase commit. The function adds the entry to
+ * WAL and then persists it to the disk by creating a file under
+ * data/pg_fdw_xact directory.
+ */
+static FDWXact
+register_fdw_xact(Oid dbid, TransactionId xid, Oid serverid, Oid userid,
+				  Oid umid,	int fdw_xact_id_len, char *fdw_xact_id)
+{
+	FDWXact				fdw_xact;
+	FDWXactOnDiskData	*fdw_xact_file_data;
+	int					data_len;
+	char				path[MAXPGPATH];
+	int					fd;
+	pg_crc32c			fdw_xact_crc;
+	pg_crc32c			bogus_crc;
+
+	/* Enter the foreign transaction in the shared memory structure */
+	fdw_xact = insert_fdw_xact(dbid, xid, serverid, userid,
+									fdw_xact_id_len, fdw_xact_id,
+									FDW_XACT_PREPARING);
+	/*
+	 * Prepare to write the entry to a file. Also add xlog entry. The contents
+	 * of the xlog record are same as what is written to the file.
+	 */
+	data_len = offsetof(FDWXactOnDiskData, fdw_xact_id);
+	data_len = data_len + fdw_xact->fdw_xact_id_len;
+	data_len = MAXALIGN(data_len);
+	fdw_xact_file_data = (FDWXactOnDiskData *) palloc0(data_len);
+	fdw_xact_file_data->dboid = fdw_xact->dboid;
+	fdw_xact_file_data->local_xid = fdw_xact->local_xid;
+	fdw_xact_file_data->serverid = fdw_xact->serverid;
+	fdw_xact_file_data->userid = fdw_xact->userid;
+	fdw_xact_file_data->umid = fdw_xact->umid;
+	fdw_xact_file_data->fdw_xact_id_len = fdw_xact->fdw_xact_id_len;
+	memcpy(fdw_xact_file_data->fdw_xact_id, fdw_xact->fdw_xact_id,
+					fdw_xact->fdw_xact_id_len);
+
+	FDWXactFilePath(path, fdw_xact->local_xid, fdw_xact->serverid,
+						fdw_xact->userid);
+
+	/* Create the file, but error out if it already exists. */
+	fd = OpenTransientFile(path, O_EXCL | O_CREAT | PG_BINARY | O_WRONLY,
+							S_IRUSR | S_IWUSR);
+	if (fd < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not create foreign transaction state file \"%s\": %m",
+						path)));
+
+	/* Write data to file, and calculate CRC as we pass over it */
+	INIT_CRC32C(fdw_xact_crc);
+	COMP_CRC32C(fdw_xact_crc, fdw_xact_file_data, data_len);
+	if (write(fd, fdw_xact_file_data, data_len) != data_len)
+	{
+		CloseTransientFile(fd);
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not write FDW transaction state file: %s", path)));
+	}
+
+	FIN_CRC32C(fdw_xact_crc);
+	/*
+	 * Write a deliberately bogus CRC to the state file; this is just paranoia
+	 * to catch the case where four more bytes will run us out of disk space.
+	 */
+	bogus_crc = ~fdw_xact_crc;
+
+	if ((write(fd, &bogus_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
+	{
+		CloseTransientFile(fd);
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not write two-phase state file: %m")));
+	}
+
+	/* Back up to prepare for rewriting the CRC */
+	if (lseek(fd, -((off_t) sizeof(pg_crc32c)), SEEK_CUR) < 0)
+	{
+		CloseTransientFile(fd);
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not seek in two-phase state file: %m")));
+	}
+
+	/*
+	 * The state file isn't valid yet, because we haven't written the correct
+	 * CRC yet.	 Before we do that, insert entry in WAL and flush it to disk.
+	 *
+	 * Between the time we have written the WAL entry and the time we write
+	 * out the correct state file CRC, we have an inconsistency: we have
+	 * recorded the foreign transaction in WAL but not on the disk. We
+	 * use a critical section to force a PANIC if we are unable to complete
+	 * the write --- then, WAL replay should repair the inconsistency.	The
+	 * odds of a PANIC actually occurring should be very tiny given that we
+	 * were able to write the bogus CRC above.
+	 */
+	START_CRIT_SECTION();
+
+	/*
+	 * We have to set delayChkpt here, too; otherwise a checkpoint starting
+	 * immediately after the WAL record is inserted could complete without
+	 * fsync'ing our foreign transaction file. (This is essentially the same
+	 * kind of race condition as the COMMIT-to-clog-write case that
+	 * RecordTransactionCommit uses delayChkpt for; see notes there.)
+	 */
+	MyPgXact->delayChkpt = true;
+
+	/* Add the entry in the xlog and save LSN for checkpointer */
+	XLogBeginInsert();
+	XLogRegisterData((char *)fdw_xact_file_data, data_len);
+	fdw_xact->fdw_xact_lsn = XLogInsert(RM_FDW_XACT_ID, XLOG_FDW_XACT_INSERT);
+	XLogFlush(fdw_xact->fdw_xact_lsn);
+
+	/* If we crash now WAL replay will fix things */
+	/* write correct CRC and close file */
+	if ((write(fd, &fdw_xact_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
+	{
+		CloseTransientFile(fd);
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not write foreign transaction file: %m")));
+	}
+
+	if (CloseTransientFile(fd) != 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not close foreign transaction file: %m")));
+
+	/* File is written completely, checkpoint can proceed with syncing */
+	fdw_xact->fdw_xact_valid = true;
+
+	MyPgXact->delayChkpt = false;
+	END_CRIT_SECTION();
+
+	pfree(fdw_xact_file_data);
+	return fdw_xact;
+}
+
+/*
+ * insert_fdw_xact
+ * Insert a new entry for a given foreign transaction identified by transaction
+ * id, foreign server and user mapping, in the shared memory. The inserted entry
+ * is returned locked.
+ *
+ * If the entry already exists, the function raises an error.
+ */
+static FDWXact
+insert_fdw_xact(Oid dboid, TransactionId xid, Oid serverid, Oid userid,
+				int fdw_xact_id_len, char *fdw_xact_id, FDWXactStatus fdw_xact_status)
+{
+	FDWXact			fdw_xact;
+	int				cnt;
+	UserMapping		*user_mapping;
+
+	if (!fdwXactExitRegistered)
+	{
+		before_shmem_exit(AtProcExit_FDWXact, 0);
+		fdwXactExitRegistered = true;
+	}
+
+	if (fdw_xact_id_len > MAX_FDW_XACT_ID_LEN)
+		elog(ERROR, "foreign transaction identifier longer (%d) than allowed (%d)",
+				fdw_xact_id_len, MAX_FDW_XACT_ID_LEN);
+
+	user_mapping = GetUserMapping(userid, serverid);
+
+	LWLockAcquire(FDWXactLock, LW_EXCLUSIVE);
+	fdw_xact = NULL;
+	for (cnt = 0; cnt < FDWXactGlobal->num_fdw_xacts; cnt++)
+	{
+		fdw_xact = FDWXactGlobal->fdw_xacts[cnt];
+
+		if (fdw_xact->local_xid == xid &&
+			fdw_xact->serverid == serverid &&
+			fdw_xact->userid == userid)
+			elog(ERROR, "duplicate entry for foreign transaction with transaction id %u, serverid %u, userid %u found",
+						xid, serverid, userid);
+	}
+
+	/*
+	 * Get the next free foreign transaction entry. Raise error if there are
+	 * none left.
+	 */
+	if (!FDWXactGlobal->freeFDWXacts)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("maximum number of foreign transactions reached"),
+				 errhint("Increase max_prepared_foreign_transactions (currently %d).",
+						 max_fdw_xacts)));
+	}
+
+	fdw_xact = FDWXactGlobal->freeFDWXacts;
+	FDWXactGlobal->freeFDWXacts = fdw_xact->fx_next;
+
+	/* Insert the entry to active array */
+	Assert(FDWXactGlobal->num_fdw_xacts < max_fdw_xacts);
+	FDWXactGlobal->fdw_xacts[FDWXactGlobal->num_fdw_xacts++] = fdw_xact;
+
+	/* Stamp the entry with backend id before releasing the LWLock */
+	fdw_xact->locking_backend = MyBackendId;
+	fdw_xact->dboid = dboid;
+	fdw_xact->local_xid = xid;
+	fdw_xact->serverid = serverid;
+	fdw_xact->userid = userid;
+	fdw_xact->umid = user_mapping->umid;
+	fdw_xact->fdw_xact_status = fdw_xact_status;
+	fdw_xact->fdw_xact_lsn = 0;
+	fdw_xact->fdw_xact_valid = false;
+	fdw_xact->fdw_xact_id_len = fdw_xact_id_len;
+	memcpy(fdw_xact->fdw_xact_id, fdw_xact_id, fdw_xact_id_len);
+
+	/* Remember that we have locked this entry. */
+	MyLockedFDWXacts = lappend(MyLockedFDWXacts, fdw_xact);
+	LWLockRelease(FDWXactLock);
+
+	return fdw_xact;
+}
+
+/*
+ * remove_fdw_xact
+ * Removes the foreign prepared transaction entry from shared memory, disk and
+ * logs about the removal in WAL.
+ */
+static void
+remove_fdw_xact(FDWXact fdw_xact)
+{
+	int cnt;
+
+	LWLockAcquire(FDWXactLock, LW_EXCLUSIVE);
+	/* Search the slot where this entry resided */
+	for (cnt = 0; cnt < FDWXactGlobal->num_fdw_xacts; cnt++)
+	{
+		if (FDWXactGlobal->fdw_xacts[cnt] == fdw_xact)
+		{
+			FdwRemoveXlogRec	fdw_remove_xlog;
+
+			/* Fill up the log record before releasing the entry */
+			fdw_remove_xlog.serverid = fdw_xact->serverid;
+			fdw_remove_xlog.dbid = fdw_xact->dboid;
+			fdw_remove_xlog.xid = fdw_xact->local_xid;
+			fdw_remove_xlog.userid = fdw_xact->userid;
+
+			/* Remove the entry from active array */
+			FDWXactGlobal->num_fdw_xacts--;
+			FDWXactGlobal->fdw_xacts[cnt] = FDWXactGlobal->fdw_xacts[FDWXactGlobal->num_fdw_xacts];
+
+			/* Put it back into free list */
+			fdw_xact->fx_next = FDWXactGlobal->freeFDWXacts;
+			FDWXactGlobal->freeFDWXacts = fdw_xact;
+
+			/* Unlock the entry */
+			fdw_xact->locking_backend = InvalidBackendId;
+			MyLockedFDWXacts = list_delete_ptr(MyLockedFDWXacts, fdw_xact);
+
+			LWLockRelease(FDWXactLock);
+
+			/*
+			 * Log that we are removing the foreign transaction entry and remove
+			 * the file from the disk as well.
+			 */
+			XLogBeginInsert();
+			XLogRegisterData((char *)&fdw_remove_xlog, sizeof(fdw_remove_xlog));
+			XLogInsert(RM_FDW_XACT_ID, XLOG_FDW_XACT_REMOVE);
+
+			/* Remove the file from the disk as well. */
+			RemoveFDWXactFile(fdw_remove_xlog.xid, fdw_remove_xlog.serverid,
+								fdw_remove_xlog.userid, true);
+			return;
+		}
+	}
+	LWLockRelease(FDWXactLock);
+
+	/* We did not find the given entry in global array */
+	elog(ERROR, "failed to find %p in FDWXactGlobal array", fdw_xact);
+}
+
+/*
+ * unlock_fdw_xact
+ * Unlock the foreign transaction entry by wiping out the locking_backend and
+ * removing it from the backend's list of foreign transaction.
+ */
+static void
+unlock_fdw_xact(FDWXact fdw_xact)
+{
+	/* Only the backend holding the lock is allowed to unlock */
+	Assert(fdw_xact->locking_backend == MyBackendId);
+	/*
+	 * First set the locking backend as invalid, and then remove it from the
+	 * list of locked foreign transactions, under the LW lock. If we reverse the
+	 * order and process exits in-between those two, we will be left an entry
+	 * locked by this backend, which gets unlocked only at the server restart.
+	 */
+
+	LWLockAcquire(FDWXactLock, LW_EXCLUSIVE);
+	fdw_xact->locking_backend = InvalidBackendId;
+	MyLockedFDWXacts = list_delete_ptr(MyLockedFDWXacts, fdw_xact);
+	LWLockRelease(FDWXactLock);
+}
+
+/*
+ * unlock_fdw_xact_entries
+ * Unlock the foreign transaction entries locked by this backend.
+ */
+static void
+unlock_fdw_xact_entries()
+{
+	while (MyLockedFDWXacts)
+	{
+		FDWXact	fdw_xact = (FDWXact) linitial(MyLockedFDWXacts);
+		unlock_fdw_xact(fdw_xact);
+	}
+}
+
+/*
+ * AtProcExit_FDWXact
+ * When the process exits, unlock the entries it held.
+ */
+static void
+AtProcExit_FDWXact(int code, Datum arg)
+{
+	unlock_fdw_xact_entries();
+}
+
+/*
+ * AtEOXact_FDWXacts
+ * The function executes phase 2 of two-phase commit protocol.
+ * At the end of transaction perform following actions
+ * 1. Mark the entries locked by this backend as ABORTING or COMMITTING
+ *	  according the result of transaction.
+ * 2. Try to commit or abort the transactions on foreign servers. If that
+ *	  succeeds, remove them from foreign transaction entries, otherwise unlock
+ *	  them.
+ */
+extern void
+AtEOXact_FDWXacts(bool is_commit)
+{
+	ListCell	*lcell;
+
+	foreach(lcell, MyFDWConnections)
+	{
+		FDWConnection *fdw_conn = lfirst(lcell);
+
+		/* Commit/abort prepared foreign transactions */
+		if (fdw_conn->fdw_xact)
+		{
+			FDWXact	fdw_xact = fdw_conn->fdw_xact;
+			fdw_xact->fdw_xact_status = (is_commit ?
+											FDW_XACT_COMMITTING_PREPARED :
+											FDW_XACT_ABORTING_PREPARED);
+			/* Try aborting or commiting the transaction on the foreign server */
+			if (!resolve_fdw_xact(fdw_xact, fdw_conn->resolve_prepared_foreign_xact))
+			{
+				/*
+				 * The transaction was not resolved on the foreign server, unlock
+				 * it, so that someone else can take care of it.
+				 */
+				unlock_fdw_xact(fdw_xact);
+			}
+		}
+		else
+		{
+			/*
+			 * On servers where two phase commit protocol could not be executed
+			 * we have tried to commit the transactions during pre-commit phase.
+			 * Any remaining transactions need to be aborted.
+			 */
+			Assert(!is_commit);
+
+			/*
+			 * The FDW has to make sure that the connection opened to the
+			 * foreign server is out of transaction. Even if the handler
+			 * function returns failure statue, there's hardly anything to do.
+			 */
+			if (!fdw_conn->end_foreing_xact(fdw_conn->serverid, fdw_conn->userid,
+											fdw_conn->umid, is_commit))
+				elog(WARNING, "could not %s transaction on server %s",
+								is_commit ? "commit" : "abort",
+								fdw_conn->servername);
+		}
+	}
+
+	/*
+	 * Unlock any locked foreign transactions. Resolver might lock the entries,
+	 * and may not be able to unlock them if aborted in-between. In any case,
+	 * there is no reason for a foreign transaction entry to be locked after the
+	 * transaction which locked it has ended.
+	 */
+	unlock_fdw_xact_entries();
+
+	/*
+	 * Reset the list of registered connections. Since the memory for the list
+	 * and its nodes comes from transaction memory context, it will be freed
+	 * after this call.
+	 */
+	MyFDWConnections = NIL;
+	/* Set TwoPhaseReady to its default value */
+	TwoPhaseReady = true;
+}
+
+/*
+ * AtPrepare_FDWXacts
+ * The function is called while preparing a transaction. If there are foreign
+ * servers involved in the transaction, this function prepares transactions
+ * on those servers.
+ */
+extern void
+AtPrepare_FDWXacts(void)
+{
+	/* If there are no foreign servers involved, we have no business here */
+	if (list_length(MyFDWConnections) < 1)
+		return;
+
+	/*
+	 * All foreign servers participating in a transaction to be prepared should
+	 * be two phase compliant.
+	 */
+	if (!TwoPhaseReady)
+		ereport(ERROR,
+				(errcode(ERRCODE_T_R_INTEGRITY_CONSTRAINT_VIOLATION),
+				 errmsg("can not prepare the transaction because some foreign server/s involved in transaction can not prepare the transaction")));
+
+	/* Prepare transactions on participating foreign servers. */
+	prepare_foreign_transactions();
+
+	/*
+	 * Unlock the foreign transaction entries so COMMIT/ROLLBACK PREPARED from
+	 * some other backend will be able to lock those if required.
+	 */
+	unlock_fdw_xact_entries();
+
+	/*
+	 * Reset the list of registered connections. Since the memory for the list
+	 * and its nodes comes from transaction memory context, it will be freed
+	 * after this call.
+	 */
+	MyFDWConnections = NIL;
+	/* Set TwoPhaseReady to its default value */
+	TwoPhaseReady = true;
+}
+
+/*
+ * FDWXactTwoPhaseFinish
+ * This function is called as part of the COMMIT/ROLLBACK PREPARED command to
+ * commit/rollback the foreign transactions prepared as part of the local
+ * prepared transaction. The function looks for the foreign transaction entries
+ * with local_xid equal to xid of the prepared transaction and tries to resolve them.
+ */
+extern void
+FDWXactTwoPhaseFinish(bool isCommit, TransactionId xid)
+{
+	List	*entries_to_resolve;
+
+	FDWXactStatus	status = isCommit ? FDW_XACT_COMMITTING_PREPARED :
+											FDW_XACT_ABORTING_PREPARED;
+	/* Get all the entries belonging to the given transaction id locked. If
+	 * foreign transaction resolver is running, it might lock entries to
+	 * check whether they can be resolved. The search function will skip such
+	 * entries. The resolver will resolve them at a later point of time.
+	 */
+	search_fdw_xact(xid, InvalidOid, InvalidOid, InvalidOid, &entries_to_resolve);
+
+	/* Try resolving the foreign transactions */
+	while (entries_to_resolve)
+	{
+		FDWXact	fdw_xact = linitial(entries_to_resolve);
+		entries_to_resolve = list_delete_first(entries_to_resolve);
+		fdw_xact->fdw_xact_status = status;
+
+		/*
+		 * Resolve the foreign transaction. If resolution is not successful,
+		 * unlock the entry so that someone else can pick it up.
+		 */
+		if (!resolve_fdw_xact(fdw_xact,
+								get_prepared_foreign_xact_resolver(fdw_xact)))
+			unlock_fdw_xact(fdw_xact);
+	}
+}
+
+static ResolvePreparedForeignTransaction_function
+get_prepared_foreign_xact_resolver(FDWXact fdw_xact)
+{
+		ForeignServer		*foreign_server;
+		ForeignDataWrapper	*fdw;
+		FdwRoutine			*fdw_routine;
+
+		foreign_server = GetForeignServer(fdw_xact->serverid);
+		fdw = GetForeignDataWrapper(foreign_server->fdwid);
+		fdw_routine = GetFdwRoutine(fdw->fdwhandler);
+		if (!fdw_routine->ResolvePreparedForeignTransaction)
+			elog(ERROR, "no foreign transaction resolver routine provided for FDW %s",
+					fdw->fdwname);
+		return fdw_routine->ResolvePreparedForeignTransaction;
+}
+
+/*
+ * resolve_fdw_xact
+ * Resolve the foreign transaction using the foreign data wrapper's transaction
+ * handler routine.
+ * If the resolution is successful, remove the foreign transaction entry from
+ * the shared memory and also remove the corresponding on-disk file.
+ */
+static bool
+resolve_fdw_xact(FDWXact fdw_xact,
+				ResolvePreparedForeignTransaction_function fdw_xact_handler)
+{
+	bool	resolved;
+	bool	is_commit;
+
+	Assert(fdw_xact->fdw_xact_status == FDW_XACT_COMMITTING_PREPARED ||
+			fdw_xact->fdw_xact_status == FDW_XACT_ABORTING_PREPARED);
+
+	is_commit = (fdw_xact->fdw_xact_status == FDW_XACT_COMMITTING_PREPARED) ?
+							true : false;
+
+	resolved = fdw_xact_handler(fdw_xact->serverid, fdw_xact->userid,
+								fdw_xact->umid, is_commit,
+								fdw_xact->fdw_xact_id_len,
+								fdw_xact->fdw_xact_id);
+
+	/* If we succeeded in resolving the transaction, remove the entry */
+	if (resolved)
+		remove_fdw_xact(fdw_xact);
+
+	return resolved;
+}
+
+/*
+ * fdw_xact_exists
+ * Returns true if there exists at least one prepared foreign transaction which
+ * matches criteria. This function is wrapper around search_fdw_xact. Check that
+ * function's prologue for details.
+ */
+bool
+fdw_xact_exists(TransactionId xid, Oid dbid, Oid serverid, Oid userid)
+{
+	return search_fdw_xact(xid, dbid, serverid, userid, NULL);
+}
+
+/*
+ * search_fdw_xact
+ * Return true if there exists at least one prepared foreign transaction
+ * entry with given criteria. The criteria is defined by arguments with
+ * valid values for respective datatypes.
+ *
+ * The table below explains the same
+ * xid	   | dbid	 | serverid | userid  | search for entry with
+ * invalid | invalid | invalid	| invalid | nothing
+ * invalid | invalid | invalid	| valid	  | given userid
+ * invalid | invalid | valid	| invalid | given serverid
+ * invalid | invalid | valid	| valid	  | given serverid and userid
+ * invalid | valid	 | invalid	| invalid | given dbid
+ * invalid | valid	 | invalid	| valid	  | given dbid and userid
+ * invalid | valid	 | valid	| invalid | given dbid and serverid
+ * invalid | valid	 | valid	| valid	  | given dbid, servroid and userid
+ * valid   | invalid | invalid	| invalid | given xid
+ * valid   | invalid | invalid	| valid	  | given xid and userid
+ * valid   | invalid | valid	| invalid | given xid, serverid
+ * valid   | invalid | valid	| valid	  | given xid, serverid, userid
+ * valid   | valid	 | invalid	| invalid | given xid and dbid
+ * valid   | valid	 | invalid	| valid	  | given xid, dbid and userid
+ * valid   | valid	 | valid	| invalid | given xid, dbid, serverid
+ * valid   | valid	 | valid	| valid	  | given xid, dbid, serverid, userid
+ *
+ * When the criteria is void (all arguments invalid) the
+ * function returns true, since any entry would match the criteria.
+ *
+ * If qualifying_fdw_xacts is not NULL, the qualifying entries are locked and
+ * returned in a linked list. Any entry which is already locked is ignored. If
+ * all the qualifying entries are locked, nothing will be returned in the list
+ * but returned value will be true.
+ */
+bool
+search_fdw_xact(TransactionId xid, Oid dbid, Oid serverid, Oid userid,
+				List **qualifying_xacts)
+{
+	int			cnt;
+	LWLockMode	lock_mode;
+	/* Return value if a qualifying entry exists */
+	bool		entry_exists = false;
+
+	if (qualifying_xacts)
+	{
+		*qualifying_xacts = NIL;
+		/* The caller expects us to lock entries */
+		lock_mode = LW_EXCLUSIVE;
+	}
+	else
+		lock_mode = LW_SHARED;
+
+	LWLockAcquire(FDWXactLock, lock_mode);
+	for (cnt = 0; cnt < FDWXactGlobal->num_fdw_xacts; cnt++)
+	{
+		FDWXact	fdw_xact = FDWXactGlobal->fdw_xacts[cnt];
+		bool	entry_matches = true;
+
+		/* xid */
+		if (xid != InvalidTransactionId && xid != fdw_xact->local_xid)
+			entry_matches = false;
+
+		/* dbid */
+		if (OidIsValid(dbid) && fdw_xact->dboid != dbid)
+			entry_matches = false;
+
+		/* serverid */
+		if (OidIsValid(serverid) && serverid != fdw_xact->serverid)
+			entry_matches = false;
+
+		/* userid */
+		if (OidIsValid(userid) && fdw_xact->userid != userid)
+			entry_matches = false;
+
+		if (entry_matches)
+		{
+			entry_exists = true;
+			if (qualifying_xacts)
+			{
+				/*
+				 * User has requested list of qualifying entries. If the
+				 * matching entry is not locked, lock it and add to the list. If
+				 * the entry is locked by some other backend, ignore it.
+				 */
+				if (fdw_xact->locking_backend == InvalidBackendId)
+				{
+					MemoryContext oldcontext;
+					fdw_xact->locking_backend = MyBackendId;
+
+					/* The list and its members may be required at the end of the transaction */
+					oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+					MyLockedFDWXacts = lappend(MyLockedFDWXacts, fdw_xact);
+					MemoryContextSwitchTo(oldcontext);
+				}
+				else if (fdw_xact->locking_backend != MyBackendId)
+					continue;
+
+				*qualifying_xacts = lappend(*qualifying_xacts, fdw_xact);
+			}
+			else
+			{
+				/*
+				 * User wants to check the existence, and we have found one
+				 * matching entry. No need to check other entries.
+				 */
+				break;
+			}
+		}
+	}
+
+	LWLockRelease(FDWXactLock);
+
+	return entry_exists;
+}
+
+/*
+ * get_dbids_with_unresolved_xact
+ * returns the oids of the databases containing unresolved foreign transactions.
+ * The function is used by pg_fdw_xact_resolver extension. Returns NIL if
+ * no such entry exists.
+ */
+List *
+get_dbids_with_unresolved_xact(void)
+{
+	int		cnt_xact;
+	List	*dbid_list = NIL;
+
+	LWLockAcquire(FDWXactLock, LW_SHARED);
+	for (cnt_xact = 0; cnt_xact < FDWXactGlobal->num_fdw_xacts; cnt_xact++)
+	{
+		FDWXact	fdw_xact;
+
+		fdw_xact = FDWXactGlobal->fdw_xacts[cnt_xact];
+
+		/* Skip locked entry as someone must be working on it */
+		if (fdw_xact->locking_backend == InvalidBackendId)
+			dbid_list = list_append_unique_oid(dbid_list, fdw_xact->dboid);
+	}
+	LWLockRelease(FDWXactLock);
+
+	return dbid_list;
+}
+
+/*
+ * fdw_xact_redo
+ * Apply the redo log for a foreign transaction.
+ */
+extern void
+fdw_xact_redo(XLogReaderState *record)
+{
+	char			*rec = XLogRecGetData(record);
+	uint8			info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+	int				rec_len = XLogRecGetDataLen(record);
+	TransactionId	xid = XLogRecGetXid(record);
+
+	if (info == XLOG_FDW_XACT_INSERT)
+	{
+		FDWXactOnDiskData	*fdw_xact_data_file = (FDWXactOnDiskData *)rec;
+		char				path[MAXPGPATH];
+		int					fd;
+		pg_crc32c	fdw_xact_crc;
+
+		/* Recompute CRC */
+		INIT_CRC32C(fdw_xact_crc);
+		COMP_CRC32C(fdw_xact_crc, rec, rec_len);
+		FIN_CRC32C(fdw_xact_crc);
+
+		FDWXactFilePath(path, xid, fdw_xact_data_file->serverid,
+							fdw_xact_data_file->userid);
+		/*
+		 * The file may exist, if it was flushed to disk after creating it. The
+		 * file might have been flushed while it was being crafted, so the
+		 * contents can not be guaranteed to be accurate. Hence truncate and
+		 * rewrite the file.
+		 */
+		fd = OpenTransientFile(path, O_CREAT | O_WRONLY | O_TRUNC | PG_BINARY,
+								S_IRUSR | S_IWUSR);
+		if (fd < 0)
+			ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not create/open foreign transaction state file \"%s\": %m",
+						path)));
+
+		/* The log record is exactly the contents of the file. */
+		if (write(fd, rec, rec_len) != rec_len)
+		{
+			CloseTransientFile(fd);
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not write FDW transaction state file: %s", path)));
+		}
+
+		if (write(fd, &fdw_xact_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
+		{
+			CloseTransientFile(fd);
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not write two-phase state file: %m")));
+		}
+
+		/*
+		 * We must fsync the file because the end-of-replay checkpoint will not do
+		 * so, there being no foreign transaction entry in shared memory yet to
+		 * tell it to.
+		 */
+		if (pg_fsync(fd) != 0)
+		{
+			CloseTransientFile(fd);
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not fsync foreign transaction state file: %m")));
+		}
+
+		CloseTransientFile(fd);
+	}
+	else if (info == XLOG_FDW_XACT_REMOVE)
+	{
+		FdwRemoveXlogRec	*fdw_remove_xlog = (FdwRemoveXlogRec *)rec;
+
+		/* Remove the file from the disk. */
+		RemoveFDWXactFile(fdw_remove_xlog->xid, fdw_remove_xlog->serverid, fdw_remove_xlog->userid,
+								true);
+	}
+	else
+		elog(ERROR, "invalid log type %d in foreign transction log record", info);
+
+	return;
+}
+
+/*
+ * CheckPointFDWXact
+ * Function syncs the foreign transaction files created between the two
+ * checkpoints.
+ * The foreign transaction entries and hence the corresponding files are expected
+ * to be very short-lived. By executing this function at the end, we might have
+ * lesser files to fsync, thus reducing some I/O. This is similar to
+ * CheckPointTwoPhase().
+ * In order to avoid disk I/O while holding a light weight lock, the function
+ * first collects the files which need to be synced under FDWXactLock and then
+ * syncs them after releasing the lock. This approach creates a race condition:
+ * after releasing the lock, and before syncing a file, the corresponding
+ * foreign transaction entry and hence the file might get removed. The function
+ * checks whether that's true and ignores the error if so.
+ */
+void
+CheckPointFDWXact(XLogRecPtr redo_horizon)
+{
+	Oid				*serverids;
+	TransactionId	*xids;
+	Oid				*userids;
+	Oid				*dbids;
+	int				nxacts;
+	int				cnt;
+	/* Quick get-away, before taking lock */
+	if (max_fdw_xacts <= 0)
+		return;
+
+	LWLockAcquire(FDWXactLock, LW_SHARED);
+
+	/* Another quick, before we allocate memory */
+	if (FDWXactGlobal->num_fdw_xacts <= 0)
+	{
+		LWLockRelease(FDWXactLock);
+		return;
+	}
+
+	/*
+	 * Collect the file paths which need to be synced. We might sync a file
+	 * again if it lives beyond the checkpoint boundaries. But this case is rare
+	 * and may not involve much I/O.
+	 */
+	xids = (TransactionId *) palloc(FDWXactGlobal->num_fdw_xacts * sizeof(TransactionId));
+	serverids = (Oid *) palloc(FDWXactGlobal->num_fdw_xacts * sizeof(Oid));
+	userids = (Oid *) palloc(FDWXactGlobal->num_fdw_xacts * sizeof(Oid));
+	dbids = (Oid *) palloc(FDWXactGlobal->num_fdw_xacts * sizeof(Oid));
+	nxacts = 0;
+
+	for (cnt = 0; cnt < FDWXactGlobal->num_fdw_xacts; cnt++)
+	{
+		FDWXact	fdw_xact = FDWXactGlobal->fdw_xacts[cnt];
+		if (fdw_xact->fdw_xact_valid &&
+			fdw_xact->fdw_xact_lsn <= redo_horizon)
+		{
+			xids[nxacts] = fdw_xact->local_xid;
+			serverids[nxacts] = fdw_xact->serverid;
+			userids[nxacts] = fdw_xact->userid;
+			dbids[nxacts] = fdw_xact->dboid;
+			nxacts++;
+		}
+	}
+
+	LWLockRelease(FDWXactLock);
+
+	for (cnt = 0; cnt < nxacts; cnt++)
+	{
+		char	path[MAXPGPATH];
+		int		fd;
+
+		FDWXactFilePath(path, xids[cnt], serverids[cnt], userids[cnt]);
+
+		fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
+
+		if (fd < 0)
+		{
+			if (errno == ENOENT)
+			{
+				/* OK if we do not have the entry anymore */
+				if (!fdw_xact_exists(xids[cnt], dbids[cnt], serverids[cnt],
+										userids[cnt]))
+					continue;
+
+				/* Restore errno in case it was changed */
+				errno = ENOENT;
+			}
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not open foreign transaction state file \"%s\": %m",
+							path)));
+		}
+
+		if (pg_fsync(fd) != 0)
+		{
+			CloseTransientFile(fd);
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not fsync foreign transaction state file \"%s\": %m",
+							path)));
+		}
+
+		if (CloseTransientFile(fd) != 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not close foreign transaction state file \"%s\": %m",
+							path)));
+	}
+
+	pfree(xids);
+	pfree(serverids);
+	pfree(userids);
+	pfree(dbids);
+}
+
+/* Built in functions */
+/*
+ * pg_fdw_xact
+ *		Produce a view with one row per prepared transaction on foreign server.
+ *
+ * This function is here so we don't have to export the
+ * FDWXactGlobalData struct definition.
+ *
+ */
+
+/*
+ * Structure to hold and iterate over the foreign transactions to be displayed
+ * by the built-in functions.
+ */
+typedef struct
+{
+	FDWXact	fdw_xacts;
+	int		num_xacts;
+	int		cur_xact;
+} WorkingStatus;
+
+/*
+ * Returns an array of all foreign prepared transactions for the user-level
+ * function pg_fdw_xact.
+ *
+ * The returned array and all its elements are copies of internal data
+ * structures, to minimize the time we need to hold the FDWXactLock.
+ *
+ * WARNING -- we return even those transactions whose information is not
+ * completely filled yet. The caller should filter them out if he doesn't want them.
+ *
+ * The returned array is palloc'd.
+ */
+static int
+GetFDWXactList(FDWXact *fdw_xacts)
+{
+	int	num_xacts;
+	int	cnt_xacts;
+
+	LWLockAcquire(FDWXactLock, LW_SHARED);
+
+	if (FDWXactGlobal->num_fdw_xacts == 0)
+	{
+		LWLockRelease(FDWXactLock);
+		*fdw_xacts = NULL;
+		return 0;
+	}
+
+	num_xacts = FDWXactGlobal->num_fdw_xacts;
+	*fdw_xacts = (FDWXact) palloc(sizeof(FDWXactData) * num_xacts);
+	for (cnt_xacts = 0; cnt_xacts < num_xacts; cnt_xacts++)
+		memcpy((*fdw_xacts) + cnt_xacts, FDWXactGlobal->fdw_xacts[cnt_xacts],
+			   sizeof(FDWXactData));
+
+	LWLockRelease(FDWXactLock);
+
+	return num_xacts;
+}
+
+Datum
+pg_fdw_xact(PG_FUNCTION_ARGS)
+{
+	FuncCallContext *funcctx;
+	WorkingStatus	*status;
+	char			*xact_status;
+
+	if (SRF_IS_FIRSTCALL())
+	{
+		TupleDesc	tupdesc;
+		MemoryContext oldcontext;
+
+		/* create a function context for cross-call persistence */
+		funcctx = SRF_FIRSTCALL_INIT();
+
+		/*
+		 * Switch to memory context appropriate for multiple function calls
+		 */
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		/* build tupdesc for result tuples */
+		/* this had better match pg_fdw_xacts view in system_views.sql */
+		tupdesc = CreateTemplateTupleDesc(6, false);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "dbid",
+						   OIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "transaction",
+						   XIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 3, "serverid",
+						   OIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 4, "userid",
+						   OIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 5, "status",
+						   TEXTOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 6, "identifier",
+						   TEXTOID, -1, 0);
+
+		funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+		/*
+		 * Collect status information that we will format and send
+		 * out as a result set.
+		 */
+		status = (WorkingStatus *) palloc(sizeof(WorkingStatus));
+		funcctx->user_fctx = (void *) status;
+
+		status->num_xacts = GetFDWXactList(&status->fdw_xacts);
+		status->cur_xact = 0;
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+
+	funcctx = SRF_PERCALL_SETUP();
+	status = funcctx->user_fctx;
+
+	while (status->cur_xact < status->num_xacts)
+	{
+		FDWXact		fdw_xact = &status->fdw_xacts[status->cur_xact++];
+		Datum		values[6];
+		bool		nulls[6];
+		HeapTuple	tuple;
+		Datum		result;
+
+		if (!fdw_xact->fdw_xact_valid)
+			continue;
+
+		/*
+		 * Form tuple with appropriate data.
+		 */
+		MemSet(values, 0, sizeof(values));
+		MemSet(nulls, 0, sizeof(nulls));
+
+		values[0] = ObjectIdGetDatum(fdw_xact->dboid);
+		values[1] = TransactionIdGetDatum(fdw_xact->local_xid);
+		values[2] = ObjectIdGetDatum(fdw_xact->serverid);
+		values[3] = ObjectIdGetDatum(fdw_xact->userid);
+		switch (fdw_xact->fdw_xact_status)
+		{
+			case FDW_XACT_PREPARING:
+				xact_status = "prepared";
+				break;
+			case FDW_XACT_COMMITTING_PREPARED:
+				xact_status = "committing";
+				break;
+			case FDW_XACT_ABORTING_PREPARED:
+				xact_status = "aborting";
+				break;
+			default:
+				xact_status = "unknown";
+				break;
+		}
+		values[4] = CStringGetTextDatum(xact_status);
+		/* should this be really interpreted by FDW */
+		values[5] = PointerGetDatum(cstring_to_text_with_len(fdw_xact->fdw_xact_id,
+															fdw_xact->fdw_xact_id_len));
+
+		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
+		result = HeapTupleGetDatum(tuple);
+		SRF_RETURN_NEXT(funcctx, result);
+	}
+
+	SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * pg_fdw_resolve
+ * a user interface to initiate foreign transaction resolution. The function
+ * tries to resolve the prepared transactions on foreign servers in the database
+ * from where it is run.
+ * The function prints the status of all the foreign transactions it
+ * encountered, whether resolved or not.
+ */
+Datum
+pg_fdw_resolve(PG_FUNCTION_ARGS)
+{
+	MemoryContext	oldcontext;
+	FuncCallContext *funcctx;
+	WorkingStatus	*status;
+	char			*xact_status;
+	List			*entries_to_resolve;
+
+	if (SRF_IS_FIRSTCALL())
+	{
+		TupleDesc	tupdesc;
+
+		/* We will be modifying the shared memory. Prepare to clean up on exit */
+		if (!fdwXactExitRegistered)
+		{
+			before_shmem_exit(AtProcExit_FDWXact, 0);
+			fdwXactExitRegistered = true;
+		}
+
+		/* Allocate space for and prepare the returning set */
+		/* create a function context for cross-call persistence */
+		funcctx = SRF_FIRSTCALL_INIT();
+		/* Switch to memory context appropriate for multiple function calls */
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		/* build tupdesc for result tuples */
+		tupdesc = CreateTemplateTupleDesc(6, false);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "dbid",
+						   OIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "transaction",
+						   XIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 3, "serverid",
+						   OIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 4, "userid",
+						   OIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 5, "status",
+						   TEXTOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 6, "identifier",
+						   TEXTOID, -1, 0);
+
+		funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+		/*
+		 * Collect status information that we will format and send
+		 * out as a result set.
+		 */
+		status = (WorkingStatus *) palloc(sizeof(WorkingStatus));
+		funcctx->user_fctx = (void *) status;
+		status->fdw_xacts = (FDWXact) palloc(sizeof(FDWXactData) * FDWXactGlobal->num_fdw_xacts);
+		status->num_xacts = 0;
+		status->cur_xact = 0;
+
+		/* Done preparation for the result. */
+		MemoryContextSwitchTo(oldcontext);
+
+		/*
+		 * Get entries whose foreign servers are part of the database where
+		 * this function was called. We can get information about only such
+		 * foreign servers. The function will lock the entries. The entries
+		 * which are locked by other backends and whose foreign servers belong
+		 * to this database are left out, since we can not work on those.
+		 */
+		search_fdw_xact(InvalidTransactionId, MyDatabaseId, InvalidOid, InvalidOid,
+						&entries_to_resolve);
+
+		/* Work to resolve the resolvable entries */
+		while (entries_to_resolve)
+		{
+			FDWXact	fdw_xact = linitial(entries_to_resolve);
+
+			/* Remove the entry as we will not use it again */
+			entries_to_resolve = list_delete_first(entries_to_resolve);
+
+			/* Copy the data for the sake of result. */
+			memcpy(status->fdw_xacts + status->num_xacts++,
+						fdw_xact, sizeof(FDWXactData));
+
+			if (fdw_xact->fdw_xact_status == FDW_XACT_COMMITTING_PREPARED ||
+					fdw_xact->fdw_xact_status == FDW_XACT_ABORTING_PREPARED)
+			{
+				/*
+				 * We have already decided what to do with the foreign transaction
+				 * nothing to be done.
+				 */
+			}
+			else if (TransactionIdDidCommit(fdw_xact->local_xid))
+				fdw_xact->fdw_xact_status = FDW_XACT_COMMITTING_PREPARED;
+			else if (TransactionIdDidAbort(fdw_xact->local_xid))
+				fdw_xact->fdw_xact_status = FDW_XACT_ABORTING_PREPARED;
+			else if (!TransactionIdIsInProgress(fdw_xact->local_xid))
+			{
+				/*
+				 * The transaction is in progress but not on any of the backends. So
+				 * probably, it crashed before actual abort or commit. So assume it
+				 * to be aborted.
+				 */
+				fdw_xact->fdw_xact_status = FDW_XACT_ABORTING_PREPARED;
+			}
+			else
+			{
+				/*
+				 * Local transaction is in progress, should not resolve the foreign
+				 * transaction. This can happen when the foreign transaction is
+				 * prepared as part of a local prepared transaction. Just
+				 * continue with the next one.
+				 */
+				unlock_fdw_xact(fdw_xact);
+				continue;
+			}
+
+			/*
+			 * Resolve the foreign transaction. If resolution was not successful,
+			 * unlock the entry so that someone else can pick it up
+			 */
+			if (!resolve_fdw_xact(fdw_xact, get_prepared_foreign_xact_resolver(fdw_xact)))
+				unlock_fdw_xact(fdw_xact);
+			else
+				/* Update the status in the result set */
+				status->fdw_xacts[status->num_xacts - 1].fdw_xact_status = FDW_XACT_RESOLVED;
+		}
+	}
+
+	/* Print the result set */
+	funcctx = SRF_PERCALL_SETUP();
+	status = funcctx->user_fctx;
+
+	while (status->cur_xact < status->num_xacts)
+	{
+		FDWXact		fdw_xact = &status->fdw_xacts[status->cur_xact++];
+		Datum		values[6];
+		bool		nulls[6];
+		HeapTuple	tuple;
+		Datum		result;
+
+		if (!fdw_xact->fdw_xact_valid)
+			continue;
+
+		/*
+		 * Form tuple with appropriate data.
+		 */
+		MemSet(values, 0, sizeof(values));
+		MemSet(nulls, 0, sizeof(nulls));
+
+		values[0] = ObjectIdGetDatum(fdw_xact->dboid);
+		values[1] = TransactionIdGetDatum(fdw_xact->local_xid);
+		values[2] = ObjectIdGetDatum(fdw_xact->serverid);
+		values[3] = ObjectIdGetDatum(fdw_xact->userid);
+		switch (fdw_xact->fdw_xact_status)
+		{
+			case FDW_XACT_PREPARING:
+				xact_status = "preparing";
+				break;
+			case FDW_XACT_COMMITTING_PREPARED:
+				xact_status = "committing";
+				break;
+			case FDW_XACT_ABORTING_PREPARED:
+				xact_status = "aborting";
+				break;
+			case FDW_XACT_RESOLVED:
+				xact_status = "resolved";
+				break;
+			default:
+				xact_status = "unknown";
+				break;
+		}
+		values[4] = CStringGetTextDatum(xact_status);
+		/* should this be really interpreted by FDW? */
+		values[5] = PointerGetDatum(cstring_to_text_with_len(fdw_xact->fdw_xact_id,
+															fdw_xact->fdw_xact_id_len));
+
+		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
+		result = HeapTupleGetDatum(tuple);
+		SRF_RETURN_NEXT(funcctx, result);
+	}
+
+	SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * Built-in function to remove prepared foreign transaction entry/s without
+ * resolving. The function gives a way to forget about such prepared
+ * transaction in case
+ * 1. The foreign server where it is prepared is no longer available
+ * 2. The user which prepared this transaction needs to be dropped
+ * 3. PITR is recoverying before a transaction id, which created the prepared
+ *	  foreign transaction
+ * 4. The database containing the entries needs to be dropped
+ *
+ * Or any such conditions in which resolution is no longer possible.
+ *
+ * The function accepts 4 arguments transaction id, dbid, serverid and userid,
+ * which define the criteria in the same way as search_fdw_xact(). The entries
+ * matching the criteria are removed. The function does not remove an entry
+ * which is locked by some other backend.
+ */
+Datum
+pg_fdw_remove(PG_FUNCTION_ARGS)
+{
+/* Some #defines only for this function to deal with the arguments */
+#define XID_ARGNUM	0
+#define DBID_ARGNUM 1
+#define SRVID_ARGNUM 2
+#define USRID_ARGNUM 3
+
+	TransactionId	xid;
+	Oid				dbid;
+	Oid				serverid;
+	Oid				userid;
+	List			*entries_to_remove;
+
+	xid = PG_ARGISNULL(XID_ARGNUM) ? InvalidTransactionId :
+									DatumGetTransactionId(PG_GETARG_DATUM(XID_ARGNUM));
+	dbid = PG_ARGISNULL(DBID_ARGNUM) ? InvalidOid :
+									PG_GETARG_OID(DBID_ARGNUM);
+	serverid = PG_ARGISNULL(SRVID_ARGNUM) ? InvalidOid :
+									PG_GETARG_OID(SRVID_ARGNUM);
+	userid = PG_ARGISNULL(USRID_ARGNUM) ? InvalidOid :
+									PG_GETARG_OID(USRID_ARGNUM);
+
+	search_fdw_xact(xid, dbid, serverid, userid, &entries_to_remove);
+
+	while (entries_to_remove)
+	{
+		FDWXact	fdw_xact = linitial(entries_to_remove);
+		entries_to_remove = list_delete_first(entries_to_remove);
+
+		remove_fdw_xact(fdw_xact);
+	}
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Code dealing with the on disk files used to store foreign transaction
+ * information.
+ */
+
+/*
+ * ReadFDWXactFile
+ * Read the foreign transction state file and return the contents in a
+ * structure allocated in-memory. The structure can be later freed by the
+ * caller.
+ */
+static FDWXactOnDiskData *
+ReadFDWXactFile(TransactionId xid, Oid serverid, Oid userid)
+{
+	char				path[MAXPGPATH];
+	int					fd;
+	FDWXactOnDiskData	*fdw_xact_file_data;
+	struct stat			stat;
+	uint32				crc_offset;
+	pg_crc32c			calc_crc;
+	pg_crc32c			file_crc;
+	char				*buf;
+
+	FDWXactFilePath(path, xid, serverid, userid);
+
+	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
+	if (fd < 0)
+		ereport(ERROR,
+			(errcode_for_file_access(),
+			 errmsg("could not open FDW transaction state file \"%s\": %m",
+					path)));
+
+	/*
+	 * Check file length.  We can determine a lower bound pretty easily. We
+	 * set an upper bound to avoid palloc() failure on a corrupt file, though
+	 * we can't guarantee that we won't get an out of memory error anyway,
+	 * even on a valid file.
+	 */
+	if (fstat(fd, &stat))
+	{
+		CloseTransientFile(fd);
+
+		ereport(WARNING,
+					(errcode_for_file_access(),
+					 errmsg("could not stat FDW transaction state file \"%s\": %m",
+							path)));
+		return NULL;
+	}
+
+	if (stat.st_size < offsetof(FDWXactOnDiskData, fdw_xact_id) ||
+		stat.st_size > MaxAllocSize)
+	{
+		CloseTransientFile(fd);
+		ereport(WARNING,
+					(errcode_for_file_access(),
+					 errmsg("Too large FDW transaction state file \"%s\": %m",
+							path)));
+		return NULL;
+	}
+
+	buf = (char *) palloc(stat.st_size);
+	fdw_xact_file_data = (FDWXactOnDiskData *)buf;
+	crc_offset = stat.st_size - sizeof(pg_crc32c);
+	/* Slurp the file */
+	if (read(fd, fdw_xact_file_data, stat.st_size) != stat.st_size)
+	{
+		CloseTransientFile(fd);
+		ereport(WARNING,
+					(errcode_for_file_access(),
+					 errmsg("could not read FDW transaction state file \"%s\": %m",
+							path)));
+		pfree(fdw_xact_file_data);
+		return NULL;
+	}
+
+	CloseTransientFile(fd);
+	/*
+	 * Check the CRC.
+	 */
+
+	INIT_CRC32C(calc_crc);
+	COMP_CRC32C(calc_crc, buf, crc_offset);
+	FIN_CRC32C(calc_crc);
+
+	file_crc = *((pg_crc32c *) (buf + crc_offset));
+
+	if (!EQ_CRC32C(calc_crc, file_crc))
+	{
+		pfree(buf);
+		return NULL;
+	}
+
+	if (fdw_xact_file_data->serverid != serverid ||
+			fdw_xact_file_data->userid != userid ||
+			fdw_xact_file_data->local_xid != xid)
+	{
+		ereport(WARNING,
+				  (errmsg("removing corrupt foreign transaction state file \"%s\"",
+							  path)));
+		CloseTransientFile(fd);
+		pfree(buf);
+		return NULL;
+	}
+
+	return fdw_xact_file_data;
+}
+
+/*
+ * PrescanFDWXacts
+ * Read the foreign prepared transactions directory for oldest active
+ * transaction. The transactions corresponding to the xids in this directory
+ * are not necessarily active per say locally. But we still need those XIDs to
+ * be alive so that
+ * 1. we can determine whether they are committed or aborted
+ * 2. the file name contains xid which shouldn't get used again to avoid
+ *	  conflicting file names.
+ *
+ * The function accepts the oldest active xid determined by other functions
+ * (e.g. PrescanPreparedTransactions()). It then compares every xid it comes
+ * across while scanning foreign prepared transactions directory with the oldest
+ * active xid. It returns the oldest of those xids or oldest active xid
+ * whichever is older.
+ *
+ * If any foreign prepared transaction is part of a future transaction (PITR),
+ * the function removes the corresponding file as
+ * 1. We can not know the status of the local transaction which prepared this
+ * foreign transaction
+ * 2. The foreign server or the user may not be available as per new timeline
+ *
+ * Anyway, the local transaction which prepared the foreign prepared transaction
+ * does not exist as per the new timeline, so it's better to forget the foreign
+ * prepared transaction as well.
+ */
+TransactionId
+PrescanFDWXacts(TransactionId oldestActiveXid)
+{
+	TransactionId	nextXid = ShmemVariableCache->nextXid;
+	DIR				*cldir;
+	struct dirent	*clde;
+
+	cldir = AllocateDir(FDW_XACTS_DIR);
+	while ((clde = ReadDir(cldir, FDW_XACTS_DIR)) != NULL)
+	{
+		if (strlen(clde->d_name) == FDW_XACT_FILE_NAME_LEN &&
+			strspn(clde->d_name, "0123456789ABCDEF_") == FDW_XACT_FILE_NAME_LEN)
+		{
+			Oid					serverid;
+			Oid					userid;
+			TransactionId		local_xid;
+
+			sscanf(clde->d_name, "%08x_%08x_%08x", &local_xid, &serverid,
+					&userid);
+
+			/*
+			 * Remove a foreign prepared transaction file correspnding
+			 * to an XID, which is too new.
+			 */
+			if (TransactionIdFollowsOrEquals(local_xid, nextXid))
+			{
+				ereport(WARNING,
+					  (errmsg("removing future foreign prepared transaction file \"%s\"",
+							  clde->d_name)));
+				RemoveFDWXactFile(local_xid, serverid, userid, true);
+				continue;
+			}
+
+			if (TransactionIdPrecedesOrEquals(local_xid, oldestActiveXid))
+				oldestActiveXid = local_xid;
+		}
+	}
+
+	FreeDir(cldir);
+	return oldestActiveXid;
+}
+/*
+ * ReadFDWXact
+ * Read the foreign prepared transaction information and set it up for further
+ * usage.
+ */
+void
+ReadFDWXacts(void)
+{
+	DIR				*cldir;
+	struct dirent	*clde;
+
+	cldir = AllocateDir(FDW_XACTS_DIR);
+	while ((clde = ReadDir(cldir, FDW_XACTS_DIR)) != NULL)
+	{
+		if (strlen(clde->d_name) == FDW_XACT_FILE_NAME_LEN &&
+			strspn(clde->d_name, "0123456789ABCDEF_") == FDW_XACT_FILE_NAME_LEN)
+		{
+			Oid					serverid;
+			Oid					userid;
+			TransactionId		local_xid;
+			FDWXactOnDiskData	*fdw_xact_file_data;
+			FDWXact				fdw_xact;
+
+			sscanf(clde->d_name, "%08x_%08x_%08x", &local_xid, &serverid,
+					&userid);
+
+			fdw_xact_file_data = ReadFDWXactFile(local_xid, serverid, userid);
+
+			if (!fdw_xact_file_data)
+			{
+				ereport(WARNING,
+						(errmsg("Removing corrupt foreign transaction file \"%s\"",
+								 clde->d_name)));
+				RemoveFDWXactFile(local_xid, serverid, userid, false);
+				continue;
+			}
+
+			ereport(LOG,
+					(errmsg("recovering foreign transaction entry for xid %u, foreign server %u and user %u",
+								local_xid, serverid, userid)));
+
+			/*
+			 * Add this entry into the table of foreign transactions. The status
+			 * of the transaction is set as preparing, since we do not know the
+			 * exact status right now. Resolver will set it later based on the
+			 * status of local transaction which prepared this foreign
+			 * transaction.
+			 */
+			fdw_xact = insert_fdw_xact(fdw_xact_file_data->dboid, local_xid,
+									   serverid, userid,
+									   fdw_xact_file_data->fdw_xact_id_len,
+									   fdw_xact_file_data->fdw_xact_id,
+									   FDW_XACT_PREPARING);
+			/* Add some valid LSN */
+			fdw_xact->fdw_xact_lsn = 0;
+			/* Mark the entry as ready */
+			fdw_xact->fdw_xact_valid = true;
+			/* Unlock the entry as we don't need it any further */
+			unlock_fdw_xact(fdw_xact);
+			pfree(fdw_xact_file_data);
+		}
+	}
+
+	FreeDir(cldir);
+}
+
+/*
+ * Remove the foreign transaction file for given entry.
+ *
+ * If giveWarning is false, do not complain about file-not-present;
+ * this is an expected case during WAL replay.
+ */
+void
+RemoveFDWXactFile(TransactionId xid, Oid serverid, Oid userid, bool giveWarning)
+{
+	char		path[MAXPGPATH];
+
+	FDWXactFilePath(path, xid, serverid, userid);
+	if (unlink(path))
+		if (errno != ENOENT || giveWarning)
+			ereport(WARNING,
+					(errcode_for_file_access(),
+				   errmsg("could not remove foreign transaction state file \"%s\": %m",
+						  path)));
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 9bb1362..ad71c0e 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -9,6 +9,7 @@
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/fdw_xact.h"
 #include "access/gin.h"
 #include "access/gist_private.h"
 #include "access/generic_xlog.h"
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 5415604..734ed48 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -59,6 +59,7 @@
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/fdw_xact.h"
 #include "access/htup_details.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
@@ -1452,6 +1453,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 
 	PredicateLockTwoPhaseFinish(xid, isCommit);
 
+	/*
+	 * Commit/Rollback the foreign transactions prepared as part of this
+	 * prepared transaction.
+	 */
+	FDWXactTwoPhaseFinish(isCommit, xid);
+
 	/* Count the prepared xact as committed or aborted */
 	AtEOXact_PgStat(isCommit);
 
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e11b229..4956b3d 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -21,6 +21,7 @@
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/fdw_xact.h"
 #include "access/multixact.h"
 #include "access/parallel.h"
 #include "access/subtrans.h"
@@ -186,6 +187,10 @@ typedef struct TransactionStateData
 	bool		didLogXid;		/* has xid been included in WAL record? */
 	int			parallelModeLevel;		/* Enter/ExitParallelMode counter */
 	struct TransactionStateData *parent;		/* back link to parent */
+	int			num_foreign_servers;	/* number of foreign servers participating in the transaction,
+										   Only valid for top level transaction */
+	int			can_prepare;			/* can all the foreign server involved in
+										   this transaction participate in 2PC */
 } TransactionStateData;
 
 typedef TransactionStateData *TransactionState;
@@ -1917,6 +1922,9 @@ StartTransaction(void)
 	AtStart_Cache();
 	AfterTriggerBeginXact();
 
+	/* Foreign transaction stuff */
+	s->num_foreign_servers = 0;
+
 	/*
 	 * done with start processing, set current transaction state to "in
 	 * progress"
@@ -1977,6 +1985,9 @@ CommitTransaction(void)
 			break;
 	}
 
+	/* Pre-commit step for foreign transcations */
+	PreCommit_FDWXacts();
+
 	CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT
 					  : XACT_EVENT_PRE_COMMIT);
 
@@ -2134,6 +2145,7 @@ CommitTransaction(void)
 	AtEOXact_HashTables(true);
 	AtEOXact_PgStat(true);
 	AtEOXact_Snapshot(true);
+	AtEOXact_FDWXacts(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2318,6 +2330,7 @@ PrepareTransaction(void)
 	AtPrepare_PgStat();
 	AtPrepare_MultiXact();
 	AtPrepare_RelationMap();
+	AtPrepare_FDWXacts();
 
 	/*
 	 * Here is where we really truly prepare.
@@ -2604,6 +2617,7 @@ AbortTransaction(void)
 		AtEOXact_ComboCid();
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false);
+		AtEOXact_FDWXacts(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 2189c22..0d66d1c 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -23,6 +23,7 @@
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/fdw_xact.h"
 #include "access/multixact.h"
 #include "access/rewriteheap.h"
 #include "access/subtrans.h"
@@ -4906,6 +4907,7 @@ BootStrapXLOG(void)
 	ControlFile->wal_log_hints = wal_log_hints;
 	ControlFile->track_commit_timestamp = track_commit_timestamp;
 	ControlFile->data_checksum_version = bootstrap_data_checksum_version;
+	ControlFile->max_fdw_xacts = max_fdw_xacts;
 
 	/* some additional ControlFile fields are set in WriteControlFile() */
 
@@ -5972,6 +5974,9 @@ CheckRequiredParameterValues(void)
 		RecoveryRequiresIntParameter("max_locks_per_transaction",
 									 max_locks_per_xact,
 									 ControlFile->max_locks_per_xact);
+		RecoveryRequiresIntParameter("max_prepared_foreign_transaction",
+									 max_fdw_xacts,
+									 ControlFile->max_fdw_xacts);
 	}
 }
 
@@ -6658,7 +6663,10 @@ StartupXLOG(void)
 			InitRecoveryTransactionEnvironment();
 
 			if (wasShutdown)
+			{
 				oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids);
+				oldestActiveXID = PrescanFDWXacts(oldestActiveXID);
+			}
 			else
 				oldestActiveXID = checkPoint.oldestActiveXid;
 			Assert(TransactionIdIsValid(oldestActiveXID));
@@ -7274,6 +7282,7 @@ StartupXLOG(void)
 
 	/* Pre-scan prepared transactions to find out the range of XIDs present */
 	oldestActiveXID = PrescanPreparedTransactions(NULL, NULL);
+	oldestActiveXID = PrescanFDWXacts(oldestActiveXID);
 
 	/*
 	 * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
@@ -7466,6 +7475,12 @@ StartupXLOG(void)
 	RecoverPreparedTransactions();
 
 	/*
+	 * WAL reply must have created the files for prepared foreign transactions.
+	 * Reload the shared-memory foreign transaction state.
+	 */
+	ReadFDWXacts();
+
+	/*
 	 * Shutdown the recovery environment. This must occur after
 	 * RecoverPreparedTransactions(), see notes for lock_twophase_recover()
 	 */
@@ -8723,6 +8738,11 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 	CheckPointReplicationOrigin();
 	/* We deliberately delay 2PC checkpointing as long as possible */
 	CheckPointTwoPhase(checkPointRedo);
+	/*
+	 * We deliberately delay foreign transaction checkpointing as long as
+	 * possible.
+	 */
+	CheckPointFDWXact(checkPointRedo);
 }
 
 /*
@@ -9133,7 +9153,8 @@ XLogReportParameters(void)
 		max_worker_processes != ControlFile->max_worker_processes ||
 		max_prepared_xacts != ControlFile->max_prepared_xacts ||
 		max_locks_per_xact != ControlFile->max_locks_per_xact ||
-		track_commit_timestamp != ControlFile->track_commit_timestamp)
+		track_commit_timestamp != ControlFile->track_commit_timestamp ||
+		max_fdw_xacts != ControlFile->max_fdw_xacts)
 	{
 		/*
 		 * The change in number of backend slots doesn't need to be WAL-logged
@@ -9154,6 +9175,7 @@ XLogReportParameters(void)
 			xlrec.wal_level = wal_level;
 			xlrec.wal_log_hints = wal_log_hints;
 			xlrec.track_commit_timestamp = track_commit_timestamp;
+			xlrec.max_fdw_xacts = max_fdw_xacts;
 
 			XLogBeginInsert();
 			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
@@ -9169,6 +9191,7 @@ XLogReportParameters(void)
 		ControlFile->wal_level = wal_level;
 		ControlFile->wal_log_hints = wal_log_hints;
 		ControlFile->track_commit_timestamp = track_commit_timestamp;
+		ControlFile->max_fdw_xacts = max_fdw_xacts;
 		UpdateControlFile();
 	}
 }
@@ -9357,6 +9380,7 @@ xlog_redo(XLogReaderState *record)
 			RunningTransactionsData running;
 
 			oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids);
+			oldestActiveXID = PrescanFDWXacts(oldestActiveXID);
 
 			/*
 			 * Construct a RunningTransactions snapshot representing a shut
@@ -9549,6 +9573,7 @@ xlog_redo(XLogReaderState *record)
 		ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact;
 		ControlFile->wal_level = xlrec.wal_level;
 		ControlFile->wal_log_hints = xlrec.wal_log_hints;
+		ControlFile->max_fdw_xacts = xlrec.max_fdw_xacts;
 
 		/*
 		 * Update minRecoveryPoint to ensure that if recovery is aborted, we
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 3870a4d..fca709d 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -17,6 +17,7 @@
 #include <unistd.h>
 #include <signal.h>
 
+#include "access/fdw_xact.h"
 #include "access/htup_details.h"
 #include "bootstrap/bootstrap.h"
 #include "catalog/index.h"
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index ada2142..77de39b 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -251,6 +251,15 @@ CREATE VIEW pg_prepared_xacts AS
          LEFT JOIN pg_authid U ON P.ownerid = U.oid
          LEFT JOIN pg_database D ON P.dbid = D.oid;
 
+CREATE VIEW pg_fdw_xacts AS
+	SELECT P.transaction, D.datname AS database, S.srvname AS "foreign server",
+			U.rolname AS "local user", P.status,
+			P.identifier AS "foreign transaction identifier"
+	FROM pg_fdw_xact() AS P
+		LEFT JOIN pg_authid U ON P.userid = U.oid
+		LEFT JOIN pg_database D ON P.dbid = D.oid
+		LEFT JOIN pg_foreign_server S ON P.serverid = S.oid;
+
 CREATE VIEW pg_prepared_statements AS
     SELECT * FROM pg_prepared_statement() AS P;
 
diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c
index eb531af..9a10696 100644
--- a/src/backend/commands/foreigncmds.c
+++ b/src/backend/commands/foreigncmds.c
@@ -13,6 +13,7 @@
  */
 #include "postgres.h"
 
+#include "access/fdw_xact.h"
 #include "access/heapam.h"
 #include "access/htup_details.h"
 #include "access/reloptions.h"
@@ -1087,6 +1088,20 @@ RemoveForeignServerById(Oid srvId)
 	if (!HeapTupleIsValid(tp))
 		elog(ERROR, "cache lookup failed for foreign server %u", srvId);
 
+	/*
+	 * Check if the foreign server has any foreign transaction prepared on it.
+	 * If there is one, and it gets dropped, we will not have any chance to
+	 * resolve that transaction.
+	 */
+	if (fdw_xact_exists(InvalidTransactionId, MyDatabaseId, srvId, InvalidOid))
+	{
+		Form_pg_foreign_server srvForm;
+		srvForm = (Form_pg_foreign_server) GETSTRUCT(tp);
+		ereport(ERROR,
+				(errmsg("server \"%s\" has unresolved prepared transactions on it",
+						NameStr(srvForm->srvname))));
+	}
+
 	simple_heap_delete(rel, &tp->t_self);
 
 	ReleaseSysCache(tp);
@@ -1385,6 +1400,17 @@ RemoveUserMapping(DropUserMappingStmt *stmt)
 	user_mapping_ddl_aclcheck(useId, srv->serverid, srv->servername);
 
 	/*
+	 * If there is a foreign prepared transaction with this user mapping,
+	 * dropping the user mapping might result in dangling prepared
+	 * transaction.
+	 */
+	if (fdw_xact_exists(InvalidTransactionId, MyDatabaseId, srv->serverid,
+						useId))
+		ereport(ERROR,
+				(errmsg("server \"%s\" has unresolved prepared transaction for user \"%s\"",
+							srv->servername, MappingUserName(useId))));
+
+	/*
 	 * Do the deletion
 	 */
 	object.classId = UserMappingRelationId;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 46cd5ba..c0f000c 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -148,6 +148,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 		case RM_SPGIST_ID:
 		case RM_BRIN_ID:
 		case RM_COMMIT_TS_ID:
+		case RM_FDW_XACT_ID:
 		case RM_REPLORIGIN_ID:
 		case RM_GENERIC_ID:
 			/* just deal with xid, and done */
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index c04b17f..74f10b7 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -16,6 +16,7 @@
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/fdw_xact.h"
 #include "access/heapam.h"
 #include "access/multixact.h"
 #include "access/nbtree.h"
@@ -141,6 +142,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
+		size = add_size(size, FDWXactShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -253,6 +255,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	BTreeShmemInit();
 	SyncScanShmemInit();
 	AsyncShmemInit();
+	FDWXactShmemInit();
 
 #ifdef EXEC_BACKEND
 
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index f8996cd..6589cfe 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -47,3 +47,4 @@ CommitTsLock						39
 ReplicationOriginLock				40
 MultiXactTruncationLock				41
 OldSnapshotTimeMapLock				42
+FDWXactLock					43
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index ce4eef9..7e055f6 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -27,6 +27,7 @@
 #endif
 
 #include "access/commit_ts.h"
+#include "access/fdw_xact.h"
 #include "access/gin.h"
 #include "access/transam.h"
 #include "access/twophase.h"
@@ -2055,6 +2056,19 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	/*
+	 * See also CheckRequiredParameterValues() if this parameter changes
+	 */
+	{
+		{"max_prepared_foreign_transactions", PGC_POSTMASTER, RESOURCES_MEM,
+			gettext_noop("Sets the maximum number of simultaneously prepared transactions on foreign servers."),
+			NULL
+		},
+		&max_fdw_xacts,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 #ifdef LOCK_DEBUG
 	{
 		{"trace_lock_oidmin", PGC_SUSET, DEVELOPER_OPTIONS,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index b1c3aea..dea5a47 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -119,6 +119,12 @@
 					# (change requires restart)
 # Caution: it is not advisable to set max_prepared_transactions nonzero unless
 # you actively intend to use prepared transactions.
+#max_prepared_foreign_transactions = 0	# zero disables the feature
+					# (change requires restart)
+# Note:  Increasing max_prepared_foreign_transactions costs ~600(?) bytes of shared memory
+# per foreign transaction slot.
+# It is not advisable to set max_prepared_foreign_transactions nonzero unless you
+# actively intend to use atomic foreign transactions feature.
 #work_mem = 4MB				# min 64kB
 #maintenance_work_mem = 64MB		# min 1MB
 #replacement_sort_tuples = 150000	# limits use of replacement selection sort
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index 3350e13..d303e43 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -210,6 +210,7 @@ static const char *const subdirs[] = {
 	"pg_snapshots",
 	"pg_subtrans",
 	"pg_twophase",
+	"pg_fdw_xact",
 	"pg_multixact",
 	"pg_multixact/members",
 	"pg_multixact/offsets",
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index 96619a2..90cceb5 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -296,5 +296,7 @@ main(int argc, char *argv[])
 		   (ControlFile->float8ByVal ? _("by value") : _("by reference")));
 	printf(_("Data page checksum version:           %u\n"),
 		   ControlFile->data_checksum_version);
+	printf(_("Current max_fdw_xacts setting:   %d\n"),
+		   ControlFile->max_fdw_xacts);
 	return 0;
 }
diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c
index 525b82b..c8cf4ce 100644
--- a/src/bin/pg_resetxlog/pg_resetxlog.c
+++ b/src/bin/pg_resetxlog/pg_resetxlog.c
@@ -586,6 +586,7 @@ GuessControlValues(void)
 	ControlFile.MaxConnections = 100;
 	ControlFile.max_worker_processes = 8;
 	ControlFile.max_prepared_xacts = 0;
+	ControlFile.max_fdw_xacts = 0;
 	ControlFile.max_locks_per_xact = 64;
 
 	ControlFile.maxAlign = MAXIMUM_ALIGNOF;
@@ -802,6 +803,7 @@ RewriteControlFile(void)
 	ControlFile.MaxConnections = 100;
 	ControlFile.max_worker_processes = 8;
 	ControlFile.max_prepared_xacts = 0;
+	ControlFile.max_fdw_xacts = 0;
 	ControlFile.max_locks_per_xact = 64;
 
 	/* Now we can force the recorded xlog seg size to the right thing. */
diff --git a/src/bin/pg_xlogdump/rmgrdesc.c b/src/bin/pg_xlogdump/rmgrdesc.c
index 8fe20ce..d6ff550 100644
--- a/src/bin/pg_xlogdump/rmgrdesc.c
+++ b/src/bin/pg_xlogdump/rmgrdesc.c
@@ -8,9 +8,11 @@
 #define FRONTEND 1
 #include "postgres.h"
 
+#include "access/fdw_xact.h"
 #include "access/brin_xlog.h"
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/fdw_xact.h"
 #include "access/generic_xlog.h"
 #include "access/gin.h"
 #include "access/gist_private.h"
diff --git a/src/include/access/fdw_xact.h b/src/include/access/fdw_xact.h
new file mode 100644
index 0000000..87636de
--- /dev/null
+++ b/src/include/access/fdw_xact.h
@@ -0,0 +1,75 @@
+/*
+ * fdw_xact.h
+ *
+ * PostgreSQL distributed transaction manager
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/fdw_xact.h
+ */
+#ifndef FDW_XACT_H
+#define FDW_XACT_H
+
+#include "storage/backendid.h"
+#include "foreign/foreign.h"
+#include "access/xlogreader.h"
+#include "lib/stringinfo.h"
+#include "nodes/pg_list.h"
+
+/*
+ * On disk file structure
+ */
+typedef struct
+{
+	Oid				dboid;		/* database oid where to find foreign server and
+								 * user mapping
+								 */
+	TransactionId	local_xid;
+	Oid				serverid;			/* foreign server where transaction takes place */
+	Oid				userid;				/* user who initiated the foreign transaction */
+	Oid				umid;
+	uint32			fdw_xact_id_len;	/* Length of the value stored in the next field */
+	/* This should always be the last member */
+	char			fdw_xact_id[FLEXIBLE_ARRAY_MEMBER];	/* variable length array
+														 * to store foreign transaction
+														 * information.
+														 */
+} FDWXactOnDiskData;
+
+typedef struct
+{
+	TransactionId	xid;
+	Oid				serverid;
+	Oid				userid;
+	Oid				dbid;
+} FdwRemoveXlogRec;
+
+extern int	max_fdw_xacts;
+
+/* Info types for logs related to FDW transactions */
+#define XLOG_FDW_XACT_INSERT	0x00
+#define XLOG_FDW_XACT_REMOVE	0x10
+
+extern Size FDWXactShmemSize(void);
+extern void FDWXactShmemInit(void);
+extern void ReadFDWXacts(void);
+extern TransactionId PrescanFDWXacts(TransactionId oldestActiveXid);
+extern bool fdw_xact_has_usermapping(Oid serverid, Oid userid);
+extern bool fdw_xact_has_server(Oid serverid);
+extern void fdw_xact_redo(XLogReaderState *record);
+extern void fdw_xact_desc(StringInfo buf, XLogReaderState *record);
+extern const char *fdw_xact_identify(uint8 info);
+extern void AtEOXact_FDWXacts(bool is_commit);
+extern void AtPrepare_FDWXacts(void);
+extern void FDWXactTwoPhaseFinish(bool isCommit, TransactionId xid);
+extern bool fdw_xact_exists(TransactionId xid, Oid dboid, Oid serverid,
+								Oid userid);
+extern void CheckPointFDWXact(XLogRecPtr redo_horizon);
+extern void RegisterXactForeignServer(Oid serverid, Oid userid, bool can_prepare);
+extern bool FdwTwoPhaseNeeded(void);
+extern void PreCommit_FDWXacts(void);
+/* For the sake of foreign transaction resolver */
+extern List	*get_dbids_with_unresolved_xact(void);
+
+#endif /* FDW_XACT_H */
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index a7a0ae2..86448ff 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -44,6 +44,7 @@ PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL)
 PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup)
 PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
 PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
+PG_RMGR(RM_FDW_XACT_ID, "Foreign Transactions", fdw_xact_redo, fdw_xact_desc, fdw_xact_identify, NULL, NULL)
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL)
 PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL)
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 0a595cc..9a92ce7 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -213,6 +213,7 @@ typedef struct xl_parameter_change
 	int			MaxConnections;
 	int			max_worker_processes;
 	int			max_prepared_xacts;
+	int			max_fdw_xacts;
 	int			max_locks_per_xact;
 	int			wal_level;
 	bool		wal_log_hints;
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 0bc41ab..3413201 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -180,6 +180,7 @@ typedef struct ControlFileData
 	int			MaxConnections;
 	int			max_worker_processes;
 	int			max_prepared_xacts;
+	int			max_fdw_xacts;
 	int			max_locks_per_xact;
 	bool		track_commit_timestamp;
 
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index e2d08ba..a2272db 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5261,6 +5261,12 @@ DATA(insert OID = 3992 ( dense_rank			PGNSP PGUID 12 1 0 2276 0 t f f f f f i s
 DESCR("rank of hypothetical row without gaps");
 DATA(insert OID = 3993 ( dense_rank_final	PGNSP PGUID 12 1 0 2276 0 f f f f f f i s 2 0 20 "2281 2276" "{2281,2276}" "{i,v}" _null_ _null_ _null_ hypothetical_dense_rank_final _null_ _null_ _null_ ));
 DESCR("aggregate final function");
+DATA(insert OID = 4109 ( pg_fdw_xact	PGNSP PGUID 12 1 1000 0 0 f f f f t t v u 0 0 2249 "" "{26, 28,26,26,25,25}" "{o,o,o,o,o,o}" "{dbid, transaction,serverid,userid,status,identifier}" _null_ _null_ pg_fdw_xact _null_ _null_ _null_ ));
+DESCR("view foreign transactions");
+DATA(insert OID = 4110 ( pg_fdw_resolve	PGNSP PGUID 12 1 1000 0 0 f f f f t t v u 0 0 2249 "" "{26, 28,26,26,25,25}" "{o,o,o,o,o,o}" "{dbid, transaction,serverid,userid,status,identifier}" _null_ _null_ pg_fdw_resolve _null_ _null_ _null_ ));
+DESCR("resolve foreign transactions");
+DATA(insert OID = 4111 ( pg_fdw_remove PGNSP PGUID 12 1 0 0 0 f f f f f f v u 4 0 2278 "28 26 26 26" _null_ _null_ "{transaction,dbid,serverid,userid}" _null_ _null_ pg_fdw_remove _null_ _null_ _null_ ));
+DESCR("remove foreign transactions");
 
 /* pg_upgrade support */
 DATA(insert OID = 3582 ( binary_upgrade_set_next_pg_type_oid PGNSP PGUID  12 1 0 0 0 f f f f t f v r 1 0 2278 "26" _null_ _null_ _null_ _null_ _null_ binary_upgrade_set_next_pg_type_oid _null_ _null_ _null_ ));
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index e1b0d0d..3383651 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -12,6 +12,7 @@
 #ifndef FDWAPI_H
 #define FDWAPI_H
 
+#include "access/fdw_xact.h"
 #include "access/parallel.h"
 #include "nodes/execnodes.h"
 #include "nodes/relation.h"
@@ -143,6 +144,23 @@ typedef bool (*AnalyzeForeignTable_function) (Relation relation,
 typedef List *(*ImportForeignSchema_function) (ImportForeignSchemaStmt *stmt,
 														   Oid serverOid);
 
+typedef bool (*EndForeignTransaction_function) (Oid serverid, Oid userid,
+												Oid umid, bool is_commit);
+
+typedef bool (*PrepareForeignTransaction_function) (Oid serverid, Oid userid,
+													int prep_info_len, char *prep_info);
+
+typedef bool (*ResolvePreparedForeignTransaction_function) (Oid serverid,
+															Oid userid,
+															Oid umid,
+															bool is_commit,
+															int prep_info_len,
+															char *prep_info);
+
+typedef char *(*GetPrepareId_function) (Oid serverid, Oid userid,
+														int *prep_info_len);
+
+
 typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node,
 													  ParallelContext *pcxt);
 typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node,
@@ -219,6 +237,12 @@ typedef struct FdwRoutine
 	/* Support functions for IMPORT FOREIGN SCHEMA */
 	ImportForeignSchema_function ImportForeignSchema;
 
+	/* Supprot functions for foreign transactions */
+	GetPrepareId_function GetPrepareId;
+	EndForeignTransaction_function EndForeignTransaction;
+	PrepareForeignTransaction_function PrepareForeignTransaction;
+	ResolvePreparedForeignTransaction_function ResolvePreparedForeignTransaction;
+
 	/* Support functions for parallelism under Gather node */
 	IsForeignScanParallelSafe_function IsForeignScanParallelSafe;
 	EstimateDSMForeignScan_function EstimateDSMForeignScan;
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index f576f05..f49334b 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -251,11 +251,12 @@ extern PGPROC *PreparedXactProcs;
  * We set aside some extra PGPROC structures for auxiliary processes,
  * ie things that aren't full-fledged backends but need shmem access.
  *
- * Background writer, checkpointer and WAL writer run during normal operation.
+ * Background writer, checkpointer, WAL writer and foreign transction resolver
+ * run during normal operation.
  * Startup process and WAL receiver also consume 2 slots, but WAL writer is
  * launched only after startup has exited, so we only need 4 slots.
  */
-#define NUM_AUXILIARY_PROCS		4
+#define NUM_AUXILIARY_PROCS		5
 
 
 /* configurable options */
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index 2ae212a..aa6f203 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -1332,4 +1332,8 @@ extern Datum pg_prepared_statement(PG_FUNCTION_ARGS);
 /* utils/mmgr/portalmem.c */
 extern Datum pg_cursor(PG_FUNCTION_ARGS);
 
+/* access/transam/fdw_xact.c */
+extern Datum pg_fdw_xact(PG_FUNCTION_ARGS);
+extern Datum pg_fdw_resolve(PG_FUNCTION_ARGS);
+extern Datum pg_fdw_remove(PG_FUNCTION_ARGS);
 #endif   /* BUILTINS_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 00700f2..57f9e51 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1323,6 +1323,16 @@ pg_cursors| SELECT c.name,
     c.is_scrollable,
     c.creation_time
    FROM pg_cursor() c(name, statement, is_holdable, is_binary, is_scrollable, creation_time);
+pg_fdw_xacts| SELECT p.transaction,
+    d.datname AS database,
+    s.srvname AS "foreign server",
+    u.rolname AS "local user",
+    p.status,
+    p.identifier AS "foreign transaction identifier"
+   FROM (((pg_fdw_xact() p(dbid, transaction, serverid, userid, status, identifier)
+     LEFT JOIN pg_authid u ON ((p.userid = u.oid)))
+     LEFT JOIN pg_database d ON ((p.dbid = d.oid)))
+     LEFT JOIN pg_foreign_server s ON ((p.serverid = s.oid)));
 pg_file_settings| SELECT a.sourcefile,
     a.sourceline,
     a.seqno,
diff --git a/src/test/regress/pg_regress.c b/src/test/regress/pg_regress.c
index 14c87c9..a67793e 100644
--- a/src/test/regress/pg_regress.c
+++ b/src/test/regress/pg_regress.c
@@ -2233,9 +2233,11 @@ regression_main(int argc, char *argv[], init_function ifunc, test_function tfunc
 		 * Adjust the default postgresql.conf for regression testing. The user
 		 * can specify a file to be appended; in any case we expand logging
 		 * and set max_prepared_transactions to enable testing of prepared
-		 * xacts.  (Note: to reduce the probability of unexpected shmmax
-		 * failures, don't set max_prepared_transactions any higher than
-		 * actually needed by the prepared_xacts regression test.)
+		 * xacts. We also set max_fdw_transctions to enable testing of atomic
+		 * foreign transactions. (Note: to reduce the probability of unexpected
+		 * shmmax failures, don't set max_prepared_transactions or
+		 * max_prepared_foreign_transactions any higher than actually needed by the
+		 * corresponding regression tests.).
 		 */
 		snprintf(buf, sizeof(buf), "%s/data/postgresql.conf", temp_instance);
 		pg_conf = fopen(buf, "a");
@@ -2249,7 +2251,8 @@ regression_main(int argc, char *argv[], init_function ifunc, test_function tfunc
 		fputs("log_checkpoints = on\n", pg_conf);
 		fputs("log_lock_waits = on\n", pg_conf);
 		fputs("log_temp_files = 128kB\n", pg_conf);
-		fputs("max_prepared_transactions = 2\n", pg_conf);
+		fputs("max_prepared_transactions = 3\n", pg_conf);
+		fputs("max_prepared_foreign_transactions = 2\n", pg_conf);
 
 		for (sl = temp_configs; sl != NULL; sl = sl->next)
 		{
