diff --git a/contrib/test_decoding/expected/prepared.out b/contrib/test_decoding/expected/prepared.out
index 46e915d..8df7c24 100644
--- a/contrib/test_decoding/expected/prepared.out
+++ b/contrib/test_decoding/expected/prepared.out
@@ -39,7 +39,7 @@ INSERT INTO test_prepared2 VALUES (9);
 DROP TABLE test_prepared1;
 DROP TABLE test_prepared2;
 -- show results
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                                   data                                   
 -------------------------------------------------------------------------
  BEGIN
@@ -66,6 +66,40 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
  COMMIT
 (22 rows)
 
+-- same but with twophase decoding
+SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1');
+                                  data                                   
+-------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ PREPARE 'test_prepared#1'
+ COMMIT PREPARED 'test_prepared#1'
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:2
+ COMMIT
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:3
+ PREPARE 'test_prepared#2'
+ ABORT PREPARED 'test_prepared#2'
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:4
+ COMMIT
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:5
+ table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
+ PREPARE 'test_prepared#3'
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:7
+ COMMIT
+ COMMIT PREPARED 'test_prepared#3'
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
+ COMMIT
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:9
+ COMMIT
+(27 rows)
+
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
 --------------------------
diff --git a/contrib/test_decoding/sql/prepared.sql b/contrib/test_decoding/sql/prepared.sql
index e726397..0647efd 100644
--- a/contrib/test_decoding/sql/prepared.sql
+++ b/contrib/test_decoding/sql/prepared.sql
@@ -45,6 +45,9 @@ DROP TABLE test_prepared1;
 DROP TABLE test_prepared2;
 
 -- show results
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
-SELECT pg_drop_replication_slot('regression_slot');
+-- same but with twophase decoding
+SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'twophase-decoding', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
\ No newline at end of file
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 21cfd67..6413085 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -24,6 +24,8 @@
 #include "replication/message.h"
 #include "replication/origin.h"
 
+#include "storage/procarray.h"
+
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -46,6 +48,7 @@ typedef struct
 	bool		skip_empty_xacts;
 	bool		xact_wrote_changes;
 	bool		only_local;
+	bool		twophase_decoding;
 } TestDecodingData;
 
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -68,6 +71,19 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 				  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 				  bool transactional, const char *prefix,
 				  Size sz, const char *message);
+static bool pg_filter_prepare(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn,
+				  char *gid);
+static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn,
+				  XLogRecPtr prepare_lsn);
+static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn,
+				  XLogRecPtr commit_lsn);
+static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn,
+				  XLogRecPtr abort_lsn);
+
 
 void
 _PG_init(void)
@@ -85,9 +101,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->begin_cb = pg_decode_begin_txn;
 	cb->change_cb = pg_decode_change;
 	cb->commit_cb = pg_decode_commit_txn;
+
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
+
+	cb->filter_prepare_cb = pg_filter_prepare;
+	cb->prepare_cb = pg_decode_prepare_txn;
+	cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
+	cb->abort_prepared_cb = pg_decode_abort_prepared_txn;
 }
 
 
@@ -107,6 +129,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
 	data->only_local = false;
+	data->twophase_decoding = false;
 
 	ctx->output_plugin_private = data;
 
@@ -176,6 +199,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
 						 strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "twophase-decoding") == 0)
+		{
+
+			if (elem->arg == NULL)
+				data->twophase_decoding = true;
+			else if (!parse_bool(strVal(elem->arg), &data->twophase_decoding))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
+						 strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -232,10 +266,142 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfoString(ctx->out, "COMMIT");
+
 	if (data->include_xids)
-		appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
-	else
-		appendStringInfoString(ctx->out, "COMMIT");
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+
+/* Filter out unnecessary two-phase transactions */
+static bool
+pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					char *gid)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	/* treat all transaction as one-phase */
+	if (!data->twophase_decoding)
+		return true;
+
+	/*
+	 * Two-phase transactions that accessed catalog require special treatment.
+	 *
+	 * Right now we don't have a save way to decode catalog changes made in
+	 * prepared transaction that was already aborted by the time of decoding.
+	 *
+	 * That kind of problem arises only when we are trying to retrospectively
+	 * decode aborted transactions. If one wants to code distributed commit
+	 * based on prepare decoding then commits/aborts will happend strictly after
+	 * decoding will be completed, so it is safe to skip any checks/locks here.
+	 */
+	if (txn->has_catalog_changes)
+	{
+		LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+		if (TransactionIdIsInProgress(txn->xid))
+		{
+			/*
+			 * For the sake of simplicity we just ignore in-progess transaction
+			 * in this extension, as they may abort during deconing.
+			 *
+			 * It is possible to move that LWLockRelease() to pg_decode_prepare_txn()
+			 * and allow decoding of running prepared tx, but such lock will prevent
+			 * any 2pc transaction commit during decoding time, that can be big
+			 * enough in case of massive changes/inserts in that tx.
+			 */
+			LWLockRelease(TwoPhaseStateLock);
+			return true;
+		}
+		else if (TransactionIdDidAbort(txn->xid))
+		{
+			/*
+			 * Here we know that it is already aborted and there is no
+			 * mush sence in doing something with this transaction.
+			 * Consequent ABORT PREPARED will be suppressed.
+			 */
+			LWLockRelease(TwoPhaseStateLock);
+			return true;
+		}
+
+		LWLockRelease(TwoPhaseStateLock);
+	}
+
+	return false;
+}
+
+
+/* PREPARE callback */
+static void
+pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr prepare_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "PREPARE '%s'", txn->gid);
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* COMMIT PREPARED callback */
+static void
+pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr commit_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "COMMIT PREPARED '%s'", txn->gid);
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* ABORT PREPARED callback */
+static void
+pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr abort_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "ABORT PREPARED '%s'", txn->gid);
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
 
 	if (data->include_timestamp)
 		appendStringInfo(ctx->out, " (at %s)",
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 735f8c5..ed75503 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -100,8 +100,13 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 		xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
 
 		parsed->twophase_xid = xl_twophase->xid;
-
 		data += sizeof(xl_xact_twophase);
+
+		if (parsed->xinfo & XACT_XINFO_HAS_GID)
+		{
+			strcpy(parsed->twophase_gid, data);
+			data += strlen(parsed->twophase_gid) + 1;
+		}
 	}
 
 	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -139,6 +144,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
 		data += sizeof(xl_xact_xinfo);
 	}
 
+	if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
+	{
+		xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
+
+		parsed->dbId = xl_dbinfo->dbId;
+		parsed->tsId = xl_dbinfo->tsId;
+
+		data += sizeof(xl_xact_dbinfo);
+	}
+
 	if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
 	{
 		xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
@@ -166,8 +181,26 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
 		xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
 
 		parsed->twophase_xid = xl_twophase->xid;
-
 		data += sizeof(xl_xact_twophase);
+
+		if (parsed->xinfo & XACT_XINFO_HAS_GID)
+		{
+			strcpy(parsed->twophase_gid, data);
+			data += strlen(parsed->twophase_gid) + 1;
+		}
+	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+	{
+		xl_xact_origin xl_origin;
+
+		/* we're only guaranteed 4 byte alignment, so copy onto stack */
+		memcpy(&xl_origin, data, sizeof(xl_origin));
+
+		parsed->origin_lsn = xl_origin.origin_lsn;
+		parsed->origin_timestamp = xl_origin.origin_timestamp;
+
+		data += sizeof(xl_xact_origin);
 	}
 }
 
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 83169cc..b58b9a3 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -129,7 +129,6 @@ int			max_prepared_xacts = 0;
  * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
  * specified in TwoPhaseFileHeader.
  */
-#define GIDSIZE 200
 
 typedef struct GlobalTransactionData
 {
@@ -187,12 +186,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
 								RelFileNode *rels,
 								int ninvalmsgs,
 								SharedInvalidationMessage *invalmsgs,
-								bool initfileinval);
+								bool initfileinval,
+								const char *gid);
 static void RecordTransactionAbortPrepared(TransactionId xid,
 							   int nchildren,
 							   TransactionId *children,
 							   int nrels,
-							   RelFileNode *rels);
+							   RelFileNode *rels,
+							   const char *gid);
 static void ProcessRecords(char *bufptr, TransactionId xid,
 			   const TwoPhaseCallback callbacks[]);
 static void RemoveGXact(GlobalTransaction gxact);
@@ -854,7 +855,7 @@ TwoPhaseGetDummyProc(TransactionId xid)
 /*
  * Header for a 2PC state file
  */
-#define TWOPHASE_MAGIC	0x57F94533		/* format identifier */
+#define TWOPHASE_MAGIC	0x57F94534		/* format identifier */
 
 typedef struct TwoPhaseFileHeader
 {
@@ -870,6 +871,8 @@ typedef struct TwoPhaseFileHeader
 	int32		ninvalmsgs;		/* number of cache invalidation messages */
 	bool		initfileinval;	/* does relcache init file need invalidation? */
 	uint16		gidlen;			/* length of the GID - GID follows the header */
+	XLogRecPtr	origin_lsn;		/* lsn of this record at origin node */
+	TimestampTz origin_timestamp; /* time of prepare at origin node */
 } TwoPhaseFileHeader;
 
 /*
@@ -1021,6 +1024,7 @@ EndPrepare(GlobalTransaction gxact)
 {
 	TwoPhaseFileHeader *hdr;
 	StateFileChunk *record;
+	bool			replorigin;
 
 	/* Add the end sentinel to the list of 2PC records */
 	RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -1031,6 +1035,21 @@ EndPrepare(GlobalTransaction gxact)
 	Assert(hdr->magic == TWOPHASE_MAGIC);
 	hdr->total_len = records.total_len + sizeof(pg_crc32c);
 
+	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+				  replorigin_session_origin != DoNotReplicateId);
+
+	if (replorigin)
+	{
+		Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
+		hdr->origin_lsn = replorigin_session_origin_lsn;
+		hdr->origin_timestamp = replorigin_session_origin_timestamp;
+	}
+	else
+	{
+		hdr->origin_lsn = InvalidXLogRecPtr;
+		hdr->origin_timestamp = 0;
+	}
+
 	/*
 	 * If the data size exceeds MaxAllocSize, we won't be able to read it in
 	 * ReadTwoPhaseFile. Check for that now, rather than fail in the case
@@ -1061,9 +1080,19 @@ EndPrepare(GlobalTransaction gxact)
 	MyPgXact->delayChkpt = true;
 
 	XLogBeginInsert();
+
 	for (record = records.head; record != NULL; record = record->next)
 		XLogRegisterData(record->data, record->len);
+
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
 	gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
+
+	if (replorigin)
+		/* Move LSNs forward for this replication origin */
+		replorigin_session_advance(replorigin_session_origin_lsn,
+								   gxact->prepare_end_lsn);
+
 	XLogFlush(gxact->prepare_end_lsn);
 
 	/* If we crash now, we have prepared: WAL replay will fix things */
@@ -1239,6 +1268,43 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
 	return buf;
 }
 
+/*
+ * ParsePrepareRecord
+ */
+void
+ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
+{
+	TwoPhaseFileHeader *hdr;
+	char *bufptr;
+
+	hdr = (TwoPhaseFileHeader *) xlrec;
+	bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
+
+	parsed->origin_lsn = hdr->origin_lsn;
+	parsed->origin_timestamp = hdr->origin_timestamp;
+	parsed->twophase_xid = hdr->xid;
+	parsed->dbId = hdr->database;
+	parsed->nsubxacts = hdr->nsubxacts;
+	parsed->ncommitrels = hdr->ncommitrels;
+	parsed->nabortrels = hdr->nabortrels;
+	parsed->nmsgs = hdr->ninvalmsgs;
+
+	strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
+	bufptr += MAXALIGN(hdr->gidlen);
+
+	parsed->subxacts = (TransactionId *) bufptr;
+	bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+
+	parsed->commitrels = (RelFileNode *) bufptr;
+	bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+
+	parsed->abortrels = (RelFileNode *) bufptr;
+	bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+
+	parsed->msgs = (SharedInvalidationMessage *) bufptr;
+	bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+}
+
 
 /*
  * Reads 2PC data from xlog. During checkpoint this data will be moved to
@@ -1392,11 +1458,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 										hdr->nsubxacts, children,
 										hdr->ncommitrels, commitrels,
 										hdr->ninvalmsgs, invalmsgs,
-										hdr->initfileinval);
+										hdr->initfileinval, gid);
 	else
 		RecordTransactionAbortPrepared(xid,
 									   hdr->nsubxacts, children,
-									   hdr->nabortrels, abortrels);
+									   hdr->nabortrels, abortrels,
+									   gid);
 
 	ProcArrayRemove(proc, latestXid);
 
@@ -2055,7 +2122,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
 								RelFileNode *rels,
 								int ninvalmsgs,
 								SharedInvalidationMessage *invalmsgs,
-								bool initfileinval)
+								bool initfileinval,
+								const char *gid)
 {
 	XLogRecPtr	recptr;
 	TimestampTz committs = GetCurrentTimestamp();
@@ -2082,7 +2150,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
 								 ninvalmsgs, invalmsgs,
 								 initfileinval, false,
 						 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
-								 xid);
+								 xid, gid);
 
 
 	if (replorigin)
@@ -2144,7 +2212,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
 							   int nchildren,
 							   TransactionId *children,
 							   int nrels,
-							   RelFileNode *rels)
+							   RelFileNode *rels,
+							   const char *gid)
 {
 	XLogRecPtr	recptr;
 
@@ -2166,7 +2235,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 								nchildren, children,
 								nrels, rels,
 						 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
-								xid);
+								xid, gid);
 
 	/* Always flush, since we're about to remove the 2PC state file */
 	XLogFlush(recptr);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index c8751c6..9e407d5 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1233,7 +1233,7 @@ RecordTransactionCommit(void)
 							nmsgs, invalMessages,
 							RelcacheInitFileInval, forceSyncCommit,
 							MyXactFlags,
-							InvalidTransactionId /* plain commit */ );
+							InvalidTransactionId, NULL /* plain commit */ );
 
 		if (replorigin)
 			/* Move LSNs forward for this replication origin */
@@ -1585,7 +1585,8 @@ RecordTransactionAbort(bool isSubXact)
 	XactLogAbortRecord(xact_time,
 					   nchildren, children,
 					   nrels, rels,
-					   MyXactFlags, InvalidTransactionId);
+					   MyXactFlags, InvalidTransactionId,
+					   NULL);
 
 	/*
 	 * Report the latest async abort LSN, so that the WAL writer knows to
@@ -3471,7 +3472,7 @@ BeginTransactionBlock(void)
  * resource owner, etc while executing inside a Portal.
  */
 bool
-PrepareTransactionBlock(char *gid)
+PrepareTransactionBlock(const char *gid)
 {
 	TransactionState s;
 	bool		result;
@@ -5110,7 +5111,8 @@ XactLogCommitRecord(TimestampTz commit_time,
 					int nrels, RelFileNode *rels,
 					int nmsgs, SharedInvalidationMessage *msgs,
 					bool relcacheInval, bool forceSync,
-					int xactflags, TransactionId twophase_xid)
+					int xactflags, TransactionId twophase_xid,
+					const char *twophase_gid)
 {
 	xl_xact_commit xlrec;
 	xl_xact_xinfo xl_xinfo;
@@ -5122,6 +5124,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_origin xl_origin;
 
 	uint8		info;
+	int			gidlen = 0;
 
 	Assert(CritSectionCount > 0);
 
@@ -5184,6 +5187,13 @@ XactLogCommitRecord(TimestampTz commit_time,
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
 		xl_twophase.xid = twophase_xid;
+		Assert(twophase_gid != NULL);
+
+		if (XLogLogicalInfoActive())
+		{
+			xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+			gidlen = strlen(twophase_gid) + 1; /* include '\0' */
+		}
 	}
 
 	/* dump transaction origin information */
@@ -5234,8 +5244,13 @@ XactLogCommitRecord(TimestampTz commit_time,
 	}
 
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+	{
 		XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
 
+		if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+			XLogRegisterData((char *) twophase_gid, gidlen);
+	}
+
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
 
@@ -5255,15 +5270,19 @@ XLogRecPtr
 XactLogAbortRecord(TimestampTz abort_time,
 				   int nsubxacts, TransactionId *subxacts,
 				   int nrels, RelFileNode *rels,
-				   int xactflags, TransactionId twophase_xid)
+				   int xactflags, TransactionId twophase_xid,
+				   const char *twophase_gid)
 {
 	xl_xact_abort xlrec;
 	xl_xact_xinfo xl_xinfo;
 	xl_xact_subxacts xl_subxacts;
 	xl_xact_relfilenodes xl_relfilenodes;
 	xl_xact_twophase xl_twophase;
+	xl_xact_dbinfo xl_dbinfo;
+	xl_xact_origin xl_origin;
 
 	uint8		info;
+	int			gidlen = 0;
 
 	Assert(CritSectionCount > 0);
 
@@ -5275,7 +5294,6 @@ XactLogAbortRecord(TimestampTz abort_time,
 	else
 		info = XLOG_XACT_ABORT_PREPARED;
 
-
 	/* First figure out and collect all the information needed */
 
 	xlrec.xact_time = abort_time;
@@ -5299,6 +5317,31 @@ XactLogAbortRecord(TimestampTz abort_time,
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
 		xl_twophase.xid = twophase_xid;
+		Assert(twophase_gid != NULL);
+
+		if (XLogLogicalInfoActive())
+		{
+			xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+			gidlen = strlen(twophase_gid) + 1; /* include '\0' */
+		}
+	}
+
+	if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive())
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+		xl_dbinfo.dbId = MyDatabaseId;
+		xl_dbinfo.tsId = MyDatabaseTableSpace;
+	}
+
+	/* dump transaction origin information only for abort prepared */
+	if ( (replorigin_session_origin != InvalidRepOriginId) &&
+			TransactionIdIsValid(twophase_xid) &&
+			XLogLogicalInfoActive())
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
+
+		xl_origin.origin_lsn = replorigin_session_origin_lsn;
+		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
 	if (xl_xinfo.xinfo != 0)
@@ -5313,6 +5356,9 @@ XactLogAbortRecord(TimestampTz abort_time,
 	if (xl_xinfo.xinfo != 0)
 		XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+		XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
 	{
 		XLogRegisterData((char *) (&xl_subxacts),
@@ -5330,8 +5376,22 @@ XactLogAbortRecord(TimestampTz abort_time,
 	}
 
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+	{
 		XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
 
+		if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+			XLogRegisterData((char *) twophase_gid, gidlen);
+	}
+
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+		XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
+		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
+
+	if (TransactionIdIsValid(twophase_xid))
+		XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
 	return XLogInsert(RM_XACT_ID, info);
 }
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5c13d26..52e701b 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -34,6 +34,7 @@
 #include "access/xlogutils.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecord.h"
+#include "access/twophase.h"
 
 #include "catalog/pg_control.h"
 
@@ -71,7 +72,9 @@ static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
 static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			 xl_xact_parsed_commit *parsed, TransactionId xid);
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
-			xl_xact_parsed_abort *parsed, TransactionId xid);
+			 xl_xact_parsed_abort *parsed, TransactionId xid);
+static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			 xl_xact_parsed_prepare *parsed);
 
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -277,17 +280,33 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				break;
 			}
 		case XLOG_XACT_PREPARE:
+			{
+				xl_xact_parsed_prepare parsed;
 
-			/*
-			 * Currently decoding ignores PREPARE TRANSACTION and will just
-			 * decode the transaction when the COMMIT PREPARED is sent or
-			 * throw away the transaction's contents when a ROLLBACK PREPARED
-			 * is received. In the future we could add code to expose prepared
-			 * transactions in the changestream allowing for a kind of
-			 * distributed 2PC.
-			 */
-			ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
-			break;
+				/* check that output plugin capable of twophase decoding */
+				if (!ctx->twophase_hadling)
+				{
+					ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
+					break;
+				}
+
+				/* ok, parse it */
+				ParsePrepareRecord(XLogRecGetInfo(buf->record),
+										XLogRecGetData(buf->record), &parsed);
+
+				/* does output plugin wants this particular transaction? */
+				if (ctx->callbacks.filter_prepare_cb &&
+					ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid,
+													 parsed.twophase_gid))
+				{
+					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
+												buf->origptr);
+					break;
+				}
+
+				DecodePrepare(ctx, buf, &parsed);
+				break;
+			}
 		default:
 			elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
 	}
@@ -607,9 +626,79 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 								 buf->origptr, buf->endptr);
 	}
 
+	if (TransactionIdIsValid(parsed->twophase_xid) &&
+								ReorderBufferTxnIsPrepared(ctx->reorder, xid))
+	{
+		/*
+		 * We are processing COMMIT PREPARED and know that reorder buffer is
+		 * empty. So we can skip use shortcut for coomiting bare xact.
+		 */
+		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+							commit_time, origin_id, origin_lsn, parsed->twophase_gid, true);
+	} else {
+		/* replay actions of all transaction + subtransactions in order */
+		ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
+							commit_time, origin_id, origin_lsn);
+	}
+}
+
+
+/*
+ * Decode PREPARE record. Same logic as in COMMIT, but diffent calls
+ * to SnapshotBuilder as we need to mark this transaction as commited
+ * instead of running to properly decode it. When prepared transation
+ * is decoded we mark it in snapshot as running again.
+ */
+static void
+DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			 xl_xact_parsed_prepare *parsed)
+{
+	XLogRecPtr	origin_lsn = parsed->origin_lsn;
+	TimestampTz	commit_time = parsed->origin_timestamp;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+	int			i;
+	TransactionId xid = parsed->twophase_xid;
+
+	/*
+	 * Process invalidation messages, even if we're not interested in the
+	 * transaction's contents, since the various caches need to always be
+	 * consistent.
+	 */
+	if (parsed->nmsgs > 0)
+	{
+		ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
+									  parsed->nmsgs, parsed->msgs);
+		ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+	}
+
+	SnapBuildPrepareTxnStart(ctx->snapshot_builder, buf->origptr, xid,
+					   parsed->nsubxacts, parsed->subxacts);
+
+	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+		(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
+		FilterByOrigin(ctx, origin_id))
+	{
+		for (i = 0; i < parsed->nsubxacts; i++)
+		{
+			ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
+		}
+		ReorderBufferForget(ctx->reorder, xid, buf->origptr);
+
+		return;
+	}
+
+	/* tell the reorderbuffer about the surviving subtransactions */
+	for (i = 0; i < parsed->nsubxacts; i++)
+	{
+		ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
+								 buf->origptr, buf->endptr);
+	}
+
 	/* replay actions of all transaction + subtransactions in order */
-	ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
-						commit_time, origin_id, origin_lsn);
+	ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr,
+						commit_time, origin_id, origin_lsn, parsed->twophase_gid);
+
+	SnapBuildPrepareTxnFinish(ctx->snapshot_builder, xid);
 }
 
 /*
@@ -621,6 +710,28 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid)
 {
 	int			i;
+	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
+	TimestampTz	commit_time = 0;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+
+	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+	{
+		origin_lsn = parsed->origin_lsn;
+		commit_time = parsed->origin_timestamp;
+	}
+
+	/*
+	 * If that is ROLLBACK PREPARED than send that to callbacks.
+	 */
+	if (TransactionIdIsValid(xid) &&
+			parsed->dbId == ctx->slot->data.database &&
+			ReorderBufferTxnIsPrepared(ctx->reorder, xid))
+	{
+		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+						commit_time, origin_id, origin_lsn,
+						parsed->twophase_gid, false);
+		return;
+	}
 
 	SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
 					  parsed->nsubxacts, parsed->subxacts);
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5529ac8..9a66194 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -58,6 +58,14 @@ static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions
 				   bool is_init);
 static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
 static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
+static bool filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  char *gid);
+static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr prepare_lsn);
+static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr commit_lsn);
+static void abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr abort_lsn);
 static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  XLogRecPtr commit_lsn);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -122,6 +130,7 @@ StartupDecodingContext(List *output_plugin_options,
 	MemoryContext context,
 				old_context;
 	LogicalDecodingContext *ctx;
+	int			twophase_callbacks;
 
 	/* shorter lines... */
 	slot = MyReplicationSlot;
@@ -179,8 +188,25 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->begin = begin_cb_wrapper;
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
+	ctx->reorder->filter_prepare = filter_prepare_cb_wrapper;
+	ctx->reorder->prepare = prepare_cb_wrapper;
+	ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
+	ctx->reorder->abort_prepared = abort_prepared_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
 
+	/* check that plugin implements all necessary callbacks to perform 2PC */
+	twophase_callbacks = (ctx->callbacks.prepare_cb != NULL) +
+		(ctx->callbacks.commit_prepared_cb != NULL) +
+		(ctx->callbacks.abort_prepared_cb != NULL);
+
+	ctx->twophase_hadling = (twophase_callbacks == 3);
+
+	if (twophase_callbacks != 3 && twophase_callbacks != 0)
+		ereport(WARNING,
+			(errmsg("Output plugin registered only %d twophase callbacks out of 3. "
+					"Twophase transactions will be decoded as ordinary ones.",
+					twophase_callbacks)));
+
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
@@ -650,6 +676,93 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 static void
+prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr prepare_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "prepare";
+	state.report_location = txn->final_lsn;		/* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr commit_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "commit_prepared";
+	state.report_location = txn->final_lsn;		/* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr abort_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "abort_prepared";
+	state.report_location = txn->final_lsn;		/* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.abort_prepared_cb(ctx, txn, abort_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
 change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  Relation relation, ReorderBufferChange *change)
 {
@@ -684,6 +797,34 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static bool
+filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, char *gid)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_prepare";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_prepare_cb(ctx, txn, gid);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+	return ret;
+}
+
 bool
 filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index b437799..2b2027b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1308,21 +1308,18 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
  * the top and subtransactions (using a k-way merge) and replay the changes in
  * lsn order.
  */
-void
-ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+static void
+ReorderBufferCommitInternal(ReorderBufferTXN *txn,
+					ReorderBuffer *rb, TransactionId xid,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 					TimestampTz commit_time,
 					RepOriginId origin_id, XLogRecPtr origin_lsn)
 {
-	ReorderBufferTXN *txn;
 	volatile Snapshot snapshot_now;
 	volatile CommandId command_id = FirstCommandId;
 	bool		using_subtxn;
 	ReorderBufferIterTXNState *volatile iterstate = NULL;
 
-	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
-								false);
-
 	/* unknown transaction, nothing to replay */
 	if (txn == NULL)
 		return;
@@ -1605,8 +1602,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
-		/* call commit callback */
-		rb->commit(rb, txn, commit_lsn);
+		/* call commit or prepare callback */
+		if (txn->prepared)
+			rb->prepare(rb, txn, commit_lsn);
+		else
+			rb->commit(rb, txn, commit_lsn);
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
@@ -1668,6 +1668,106 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 }
 
 /*
+ * Ask output plugin whether we want to skip this PREPARE and send
+ * this transaction as one-phase later on commit.
+ */
+bool
+ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, char *gid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+	return rb->filter_prepare(rb, txn, gid);
+}
+
+
+/*
+ * Commit non-twophase transaction. See comments to ReorderBufferCommitInternal()
+ */
+void
+ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn,
+										commit_time, origin_id, origin_lsn);
+}
+
+/*
+ * Prepare twophase transaction. It calls ReorderBufferCommitInternal()
+ * since all transaction changes should be decoded on PREPARE.
+ */
+void
+ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn,
+					char *gid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	txn->prepared = true;
+	strcpy(txn->gid, gid);
+
+	ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn,
+										commit_time, origin_id, origin_lsn);
+}
+
+/*
+ * Check whether this transaction was sent as prepared to receiver.
+ * Called upon commit/abort prepared.
+ */
+bool
+ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	return txn == NULL ? true : txn->prepared;
+}
+
+/*
+ * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED.
+ */
+void
+ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn,
+					char *gid, bool is_commit)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn,
+								true);
+
+	txn->final_lsn = commit_lsn;
+	txn->end_lsn = end_lsn;
+	txn->commit_time = commit_time;
+	txn->origin_id = origin_id;
+	txn->origin_lsn = origin_lsn;
+	strcpy(txn->gid, gid);
+
+	if (is_commit)
+		rb->commit_prepared(rb, txn, commit_lsn);
+	else
+		rb->abort_prepared(rb, txn, commit_lsn);
+
+}
+
+/*
  * Abort a transaction that possibly has previous changes. Needs to be first
  * called for subtransactions and then for the toplevel xid.
  *
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 2279604..c1ca998 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -885,7 +885,7 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
 	/* copy xids that still are interesting to workspace */
 	for (off = 0; off < builder->committed.xcnt; off++)
 	{
-		if (NormalTransactionIdPrecedes(builder->committed.xip[off],
+		if (TransactionIdPrecedes(builder->committed.xip[off],
 										builder->xmin))
 			;					/* remove */
 		else
@@ -1118,6 +1118,52 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 	}
 }
 
+/*
+ * Just a wrapper to clarify DecodePrepare().
+ * Right now we can't extract correct historic catalog data that
+ * was produced by aborted prepared transaction, so it work of
+ * decoding plugin to avoid such situation and here we just construct usual
+ * snapshot to able to decode prepare.
+ */
+void
+SnapBuildPrepareTxnStart(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
+				   int nsubxacts, TransactionId *subxacts)
+{
+	SnapBuildCommitTxn(builder, lsn, xid, nsubxacts, subxacts);
+}
+
+
+/*
+ * When decoding of preppare is finished we want should exclude our xid
+ * from list of committed xids to have correct snapshot between prepare
+ * and commit.
+ *
+ * However, this is not sctrictly needed. Prepared transaction holds locks
+ * between prepare and commit so nodody can produce new version of our
+ * catalog tuples. In case of abort we will have this xid in array of
+ * commited xids, but it also will not cause a problem since checks of
+ * HeapTupleHeaderXminInvalid() in HeapTupleSatisfiesHistoricMVCC()
+ * have higher priority then checks for xip array. Anyway let's be consistent
+ * about definitions and delete this xid from xip array.
+ */
+void
+SnapBuildPrepareTxnFinish(SnapBuild *builder, TransactionId xid)
+{
+	TransactionId *search = bsearch(&xid, builder->running.xip,
+				builder->running.xcnt, sizeof(TransactionId), xidComparator);
+
+	if (search == NULL)
+		return;
+
+	/* delete that xid */
+	memmove(search, search + 1,
+			((builder->running.xip + builder->running.xcnt - 1) - search) * sizeof(TransactionId));
+	builder->running.xcnt--;
+
+	/* update min/max */
+	builder->running.xmin = builder->running.xip[0];
+	builder->running.xmax = builder->running.xip[builder->running.xcnt - 1];
+}
 
 /* -----------------------------------
  * Snapshot building functions dealing with xlog records
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index b2b7848..6c0445a 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -15,6 +15,7 @@
 #define TWOPHASE_H
 
 #include "access/xlogdefs.h"
+#include "access/xact.h"
 #include "datatype/timestamp.h"
 #include "storage/lock.h"
 
@@ -46,6 +47,8 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid);
 
 extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
 							int *nxids_p);
+extern void ParsePrepareRecord(uint8 info, char *xlrec,
+							xl_xact_parsed_prepare *parsed);
 extern void StandbyRecoverPreparedTransactions(bool overwriteOK);
 extern void RecoverPreparedTransactions(void);
 
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 5b37c05..e8bf39b 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -21,6 +21,10 @@
 #include "storage/sinval.h"
 #include "utils/datetime.h"
 
+/*
+ * Maximum size of Global Transaction ID (including '\0').
+ */
+#define GIDSIZE 200
 
 /*
  * Xact isolation levels
@@ -157,6 +161,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XACT_XINFO_HAS_TWOPHASE			(1U << 4)
 #define XACT_XINFO_HAS_ORIGIN			(1U << 5)
 #define XACT_XINFO_HAS_AE_LOCKS			(1U << 6)
+#define XACT_XINFO_HAS_GID				(1U << 7)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -303,13 +308,40 @@ typedef struct xl_xact_parsed_commit
 	SharedInvalidationMessage *msgs;
 
 	TransactionId twophase_xid; /* only for 2PC */
+	char 		twophase_gid[GIDSIZE];
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
 } xl_xact_parsed_commit;
 
+typedef struct xl_xact_parsed_prepare
+{
+	Oid			dbId;			/* MyDatabaseId */
+
+	int			nsubxacts;
+	TransactionId *subxacts;
+
+	int			ncommitrels;
+	RelFileNode *commitrels;
+
+	int			nabortrels;
+	RelFileNode *abortrels;
+
+	int			nmsgs;
+	SharedInvalidationMessage *msgs;
+
+	TransactionId twophase_xid;
+	char 		twophase_gid[GIDSIZE];
+
+	XLogRecPtr	origin_lsn;
+	TimestampTz origin_timestamp;
+} xl_xact_parsed_prepare;
+
 typedef struct xl_xact_parsed_abort
 {
+	Oid			dbId;
+	Oid			tsId;
+
 	TimestampTz xact_time;
 	uint32		xinfo;
 
@@ -320,6 +352,10 @@ typedef struct xl_xact_parsed_abort
 	RelFileNode *xnodes;
 
 	TransactionId twophase_xid; /* only for 2PC */
+	char 		twophase_gid[GIDSIZE];
+
+	XLogRecPtr	origin_lsn;
+	TimestampTz origin_timestamp;
 } xl_xact_parsed_abort;
 
 
@@ -351,7 +387,7 @@ extern void CommitTransactionCommand(void);
 extern void AbortCurrentTransaction(void);
 extern void BeginTransactionBlock(void);
 extern bool EndTransactionBlock(void);
-extern bool PrepareTransactionBlock(char *gid);
+extern bool PrepareTransactionBlock(const char *gid);
 extern void UserAbortTransactionBlock(void);
 extern void ReleaseSavepoint(List *options);
 extern void DefineSavepoint(char *name);
@@ -385,12 +421,14 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
 					int nmsgs, SharedInvalidationMessage *msgs,
 					bool relcacheInval, bool forceSync,
 					int xactflags,
-					TransactionId twophase_xid);
+					TransactionId twophase_xid, const char *twophase_gid);
 
 extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
 				   int nsubxacts, TransactionId *subxacts,
 				   int nrels, RelFileNode *rels,
-				   int xactflags, TransactionId twophase_xid);
+				   int xactflags, TransactionId twophase_xid,
+				   const char *twophase_gid);
+
 extern void xact_redo(XLogReaderState *record);
 
 /* xactdesc.c */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 7d6c88e..7352b07 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -75,6 +75,11 @@ typedef struct LogicalDecodingContext
 	bool		prepared_write;
 	XLogRecPtr	write_location;
 	TransactionId write_xid;
+
+	/*
+	 * Capabilities of decoding plugin used.
+	 */
+	bool		twophase_hadling;
 } LogicalDecodingContext;
 
 
@@ -109,5 +114,4 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 extern void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
-
 #endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 08e962d..be32774 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -68,6 +68,38 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
 												   XLogRecPtr commit_lsn);
 
 /*
+ * Called before decoding of PREPARE record to decide whether this
+ * transaction should be decoded with separate calls to prepare
+ * and commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED
+ * and sent as usual transaction.
+ */
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+													  ReorderBufferTXN *txn,
+													  char *gid);
+
+/*
+ * Called for PREPARE record unless it was filtered by filter_prepare()
+ * callback.
+ */
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr prepare_lsn);
+
+/*
+ * Called for COMMIT PREPARED.
+ */
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr commit_lsn);
+
+/*
+ * Called for ROLLBACK PREPARED.
+ */
+typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr abort_lsn);
+
+/*
  * Called for the generic logical decoding messages.
  */
 typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
@@ -98,6 +130,10 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeBeginCB begin_cb;
 	LogicalDecodeChangeCB change_cb;
 	LogicalDecodeCommitCB commit_cb;
+	LogicalDecodeFilterPrepareCB filter_prepare_cb;
+	LogicalDecodePrepareCB prepare_cb;
+	LogicalDecodeCommitPreparedCB commit_prepared_cb;
+	LogicalDecodeAbortPreparedCB abort_prepared_cb;
 	LogicalDecodeMessageCB message_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 17e47b3..99aa17f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "access/twophase.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -144,6 +145,16 @@ typedef struct ReorderBufferTXN
 	 */
 	TransactionId xid;
 
+	/* In case of 2PC we need to pass GID to output plugin */
+	char		gid[GIDSIZE];
+
+	/*
+	 * By using filter_prepare() callback we can force decoding to treat
+	 * two-phase transaction as on ordinary one. This flag is set if we are
+	 * actually called prepape() callback in output plugin.
+	 */
+	bool		prepared;
+
 	/* did the TX have catalog changes */
 	bool		has_catalog_changes;
 
@@ -283,6 +294,29 @@ typedef void (*ReorderBufferCommitCB) (
 												   ReorderBufferTXN *txn,
 												   XLogRecPtr commit_lsn);
 
+typedef bool (*ReorderBufferFilterPrepareCB) (
+												   ReorderBuffer *rb,
+												   ReorderBufferTXN *txn,
+												   char *gid);
+
+/* prepare callback signature */
+typedef void (*ReorderBufferPrepareCB) (
+												   ReorderBuffer *rb,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr prepare_lsn);
+
+/* commit prepared callback signature */
+typedef void (*ReorderBufferCommitPreparedCB) (
+												   ReorderBuffer *rb,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr commit_lsn);
+
+/* abort prepared callback signature */
+typedef void (*ReorderBufferAbortPreparedCB) (
+												   ReorderBuffer *rb,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr abort_lsn);
+
 /* message callback signature */
 typedef void (*ReorderBufferMessageCB) (
 													ReorderBuffer *rb,
@@ -318,6 +352,10 @@ struct ReorderBuffer
 	ReorderBufferBeginCB begin;
 	ReorderBufferApplyChangeCB apply_change;
 	ReorderBufferCommitCB commit;
+	ReorderBufferFilterPrepareCB filter_prepare;
+	ReorderBufferPrepareCB prepare;
+	ReorderBufferCommitPreparedCB commit_prepared;
+	ReorderBufferAbortPreparedCB abort_prepared;
 	ReorderBufferMessageCB message;
 
 	/*
@@ -373,6 +411,11 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot
 void ReorderBufferCommit(ReorderBuffer *, TransactionId,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 	  TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
+void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn,
+					char *gid, bool is_commit);
 void		ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
 void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
 						 XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
@@ -396,6 +439,13 @@ void		ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog
 bool		ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
 bool		ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
 
+bool		ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, char *gid);
+bool		ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid);
+void		ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn,
+					char *gid);
 ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
 
 void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index a8ae631..400ffe1 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -72,6 +72,10 @@ extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
 extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
 				   TransactionId xid, int nsubxacts,
 				   TransactionId *subxacts);
+extern void SnapBuildPrepareTxnStart(SnapBuild *builder, XLogRecPtr lsn,
+				   TransactionId xid, int nsubxacts,
+				   TransactionId *subxacts);
+extern void SnapBuildPrepareTxnFinish(SnapBuild *builder, TransactionId xid);
 extern void SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn,
 				  TransactionId xid, int nsubxacts,
 				  TransactionId *subxacts);
