diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 0f0bb1b..aade24b 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -34,6 +34,19 @@
 #include "utils/syscache.h"
 #include "utils/typcache.h"
 
+#include "access/xact.h"
+#include "miscadmin.h"
+#include "executor/executor.h"
+#include "nodes/nodes.h"
+#include "postmaster/autovacuum.h"
+#include "replication/walsender.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
+#include "storage/ipc.h"
+#include "pgstat.h"
+#include "tcop/utility.h"
+#include "commands/portalcmds.h"
+
 PG_MODULE_MAGIC;
 
 /* These must be available to pg_dlsym() */
@@ -85,11 +98,234 @@ static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx,
 				  ReorderBufferTXN *txn,
 				  XLogRecPtr abort_lsn);
 
+static void test_decoding_xact_callback(XactEvent event, void *arg);
+
+static void test_decoding_process_utility(PlannedStmt *pstmt,
+					const char *queryString, ProcessUtilityContext context,
+					ParamListInfo params, QueryEnvironment *queryEnv,
+					DestReceiver *dest, char *completionTag);
+
+static bool test_decoding_twophase_commit();
+
+static void test_decoding_executor_finish(QueryDesc *queryDesc);
+
+static ProcessUtility_hook_type PreviousProcessUtilityHook;
+
+static ExecutorFinish_hook_type PreviousExecutorFinishHook;
+
+static bool CurrentTxContainsDML;
+static bool CurrentTxContainsDDL;
+static bool CurrentTxNonpreparable;
 
 void
 _PG_init(void)
 {
-	/* other plugins can perform things here */
+	PreviousExecutorFinishHook = ExecutorFinish_hook;
+	ExecutorFinish_hook = test_decoding_executor_finish;
+
+	PreviousProcessUtilityHook = ProcessUtility_hook;
+	ProcessUtility_hook = test_decoding_process_utility;
+
+	if (!IsUnderPostmaster)
+		RegisterXactCallback(test_decoding_xact_callback, NULL);
+}
+
+
+/* ability to hook into sigle-statement transaction */
+static void
+test_decoding_xact_callback(XactEvent event, void *arg)
+{
+	switch (event)
+	{
+		case XACT_EVENT_START:
+		case XACT_EVENT_ABORT:
+			CurrentTxContainsDML = false;
+			CurrentTxContainsDDL = false;
+			CurrentTxNonpreparable = false;
+			break;
+		case XACT_EVENT_COMMIT_COMMAND:
+			if (!IsTransactionBlock())
+				test_decoding_twophase_commit();
+			break;
+		default:
+			break;
+	}
+}
+
+/* find out whether transaction had wrote any data or not */
+static void
+test_decoding_executor_finish(QueryDesc *queryDesc)
+{
+	CmdType operation = queryDesc->operation;
+	EState *estate = queryDesc->estate;
+	if (estate->es_processed != 0 &&
+		(operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE))
+	{
+		int i;
+		for (i = 0; i < estate->es_num_result_relations; i++)
+		{
+			Relation rel = estate->es_result_relations[i].ri_RelationDesc;
+			if (RelationNeedsWAL(rel)) {
+				CurrentTxContainsDML = true;
+				break;
+			}
+		}
+	}
+
+	if (PreviousExecutorFinishHook != NULL)
+		PreviousExecutorFinishHook(queryDesc);
+	else
+		standard_ExecutorFinish(queryDesc);
+}
+
+
+/*
+ * Several things here:
+ * 1) hook into commit of transaction block
+ * 2) write logical message for DDL (default path)
+ * 3) prevent 2pc hook for tx that can not be prepared and
+ *    send them as logical nontransactional message.
+ */
+static void
+test_decoding_process_utility(PlannedStmt *pstmt,
+					const char *queryString, ProcessUtilityContext context,
+					ParamListInfo params, QueryEnvironment *queryEnv,
+					DestReceiver *dest, char *completionTag)
+{
+	Node	   *parsetree = pstmt->utilityStmt;
+
+	switch (nodeTag(parsetree))
+	{
+		case T_TransactionStmt:
+			{
+				TransactionStmt *stmt = (TransactionStmt *) parsetree;
+				switch (stmt->kind)
+				{
+					case TRANS_STMT_COMMIT:
+						if (test_decoding_twophase_commit())
+							return; /* do not proceed */
+						break;
+					default:
+						break;
+				}
+			}
+			break;
+
+		/* cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY */
+		case T_NotifyStmt:
+		case T_ListenStmt:
+		case T_UnlistenStmt:
+			CurrentTxNonpreparable = true;
+			break;
+
+		/* create/reindex/drop concurrently can not be execuled in prepared tx */
+		case T_ReindexStmt:
+			{
+				ReindexStmt *stmt = (ReindexStmt *) parsetree;
+				switch (stmt->kind)
+				{
+					case REINDEX_OBJECT_SCHEMA:
+					case REINDEX_OBJECT_SYSTEM:
+					case REINDEX_OBJECT_DATABASE:
+						CurrentTxNonpreparable = true;
+					default:
+						break;
+				}
+			}
+			break;
+		case T_IndexStmt:
+			{
+				IndexStmt *indexStmt = (IndexStmt *) parsetree;
+				if (indexStmt->concurrent)
+					CurrentTxNonpreparable = true;
+			}
+			break;
+		case T_DropStmt:
+			{
+				DropStmt *stmt = (DropStmt *) parsetree;
+				if (stmt->removeType == OBJECT_INDEX && stmt->concurrent)
+					CurrentTxNonpreparable = true;
+			}
+			break;
+
+		/* cannot PREPARE a transaction that has created a cursor WITH HOLD */
+		case T_DeclareCursorStmt:
+			{
+				DeclareCursorStmt *stmt = (DeclareCursorStmt *) parsetree;
+				if (stmt->options & CURSOR_OPT_HOLD)
+					CurrentTxNonpreparable = true;
+			}
+			break;
+
+		default:
+			LogLogicalMessage("D", queryString, strlen(queryString) + 1, true);
+			CurrentTxContainsDDL = true;
+			break;
+	}
+
+	/* Send non-transactional message then */
+	if (CurrentTxNonpreparable)
+		LogLogicalMessage("C", queryString, strlen(queryString) + 1, false);
+
+	if (PreviousProcessUtilityHook != NULL)
+	{
+		PreviousProcessUtilityHook(pstmt, queryString, context, params, queryEnv,
+								   dest, completionTag);
+	}
+	else
+	{
+		standard_ProcessUtility(pstmt, queryString, context, params, queryEnv,
+								dest, completionTag);
+	}
+}
+
+/*
+ * Change commit to prepare and wait on latch.
+ * WalSender will unlock us after decoding and we can proceed.
+ */
+static bool
+test_decoding_twophase_commit()
+{
+	int result = 0;
+	char gid[20];
+
+	if (IsAutoVacuumLauncherProcess() ||
+			!IsNormalProcessingMode() ||
+			am_walsender ||
+			IsBackgroundWorker ||
+			IsAutoVacuumWorkerProcess() ||
+			IsAbortedTransactionBlockState() ||
+			!(CurrentTxContainsDML || CurrentTxContainsDDL) ||
+			CurrentTxNonpreparable )
+		return false;
+
+	snprintf(gid, sizeof(gid), "test_decoding:%d", MyProc->pgprocno);
+
+	if (!IsTransactionBlock())
+	{
+		BeginTransactionBlock();
+		CommitTransactionCommand();
+		StartTransactionCommand();
+	}
+	if (!PrepareTransactionBlock(gid))
+	{
+		fprintf(stderr, "Can't prepare transaction '%s'\n", gid);
+	}
+	CommitTransactionCommand();
+
+	result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
+													WAIT_EVENT_REPLICATION_SLOT_SYNC);
+
+	if (result & WL_POSTMASTER_DEATH)
+		proc_exit(1);
+
+	if (result & WL_LATCH_SET)
+		ResetLatch(&MyProc->procLatch);
+
+
+	StartTransactionCommand();
+	FinishPreparedTransaction(gid, true);
+	return true;
 }
 
 /* specify output plugin callbacks */
@@ -297,74 +533,11 @@ static bool
 pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					char *gid)
 {
-	TestDecodingData *data = ctx->output_plugin_private;
-
-	/* treat all transaction as one-phase */
-	if (!data->twophase_decoding)
+	/* decode only tx that are prepared by our hook */
+	if (strncmp(gid, "test_decoding:", 14) == 0)
+		return false;
+	else
 		return true;
-
-	/*
-	 * Two-phase transactions that accessed catalog require special
-	 * treatment.
-	 *
-	 * Right now we don't have a safe way to decode catalog changes made in
-	 * prepared transaction that was already aborted by the time of
-	 * decoding.
-	 *
-	 * That kind of problem arises only when we are trying to
-	 * retrospectively decode aborted transactions with catalog changes -
-	 * including if a transaction aborts while we're decoding it. If one
-	 * wants to code distributed commit based on prepare decoding then
-	 * commits/aborts will happend strictly after decoding will be
-	 * completed, so it is possible to skip any checks/locks here.
-	 *
-	 * We'll also get stuck trying to acquire locks on catalog relations
-	 * we need for decoding if the prepared xact holds a strong lock on
-	 * one of them and we also need to decode row changes.
-	 */
-	if (txn->has_catalog_changes)
-	{
-		LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
-
-		if (TransactionIdIsInProgress(txn->xid))
-		{
-			/*
-			 * For the sake of simplicity, by default we just
-			 * ignore in-progess prepared transactions with catalog
-			 * changes in this extension. If they abort during
-			 * decoding then tuples we need to decode them may be
-			 * overwritten while we're still decoding, causing
-			 * wrong catalog lookups.
-			 *
-			 * It is possible to move that LWLockRelease() to
-			 * pg_decode_prepare_txn() and allow decoding of
-			 * running prepared tx, but such lock will prevent any
-			 * 2pc transaction commit during decoding time.  That
-			 * can be a long time in case of lots of
-			 * changes/inserts in that tx or if the downstream is
-			 * slow/unresonsive.
-			 *
-			 * (Continuing to decode without the lock is unsafe, XXX)
-			 */
-			LWLockRelease(TwoPhaseStateLock);
-			return !data->twophase_decode_with_catalog_changes;
-		}
-		else if (TransactionIdDidAbort(txn->xid))
-		{
-			/*
-			 * Here we know that it is already aborted and there is
-			 * not much sense in doing something with this
-			 * transaction.  Consequently ABORT PREPARED will be
-			 * suppressed.
-			 */
-			LWLockRelease(TwoPhaseStateLock);
-			return true;
-		}
-
-		LWLockRelease(TwoPhaseStateLock);
-	}
-
-	return false;
 }
 
 
@@ -374,9 +547,10 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					XLogRecPtr prepare_lsn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
+	int		backend_procno;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
-		return;
+	// if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	// 	return;
 
 	OutputPluginPrepareWrite(ctx, true);
 
@@ -391,6 +565,10 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 						 timestamptz_to_str(txn->commit_time));
 
 	OutputPluginWrite(ctx, true);
+
+	/* Unlock backend */
+	sscanf(txn->gid, "test_decoding:%d", &backend_procno);
+	SetLatch(&ProcGlobal->allProcs[backend_procno].procLatch);
 }
 
 /* COMMIT PREPARED callback */
@@ -400,8 +578,8 @@ pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
-		return;
+	// if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	// 	return;
 
 	OutputPluginPrepareWrite(ctx, true);
 
@@ -425,8 +603,8 @@ pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
-	if (data->skip_empty_xacts && !data->xact_wrote_changes)
-		return;
+	// if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	// 	return;
 
 	OutputPluginPrepareWrite(ctx, true);
 
diff --git a/repconsumer.py b/repconsumer.py
new file mode 100644
index 0000000..b31cbb5
--- /dev/null
+++ b/repconsumer.py
@@ -0,0 +1,17 @@
+import psycopg2
+from psycopg2.extras import LogicalReplicationConnection
+
+conn = psycopg2.connect("dbname=regression", connection_factory=LogicalReplicationConnection)
+cur = conn.cursor()
+
+cur.create_replication_slot("slotpy",
+    slot_type=psycopg2.extras.REPLICATION_LOGICAL,
+    output_plugin='test_decoding')
+
+cur.start_replication("slotpy")
+
+def consumer(msg):
+    print(msg.payload)
+
+cur.consume_stream(consumer)
+
diff --git a/runtest.sh b/runtest.sh
new file mode 100644
index 0000000..1c8b594
--- /dev/null
+++ b/runtest.sh
@@ -0,0 +1,34 @@
+#!/bin/sh
+
+# this script assumes that postgres and test_decodong is installed
+# (srcdir)/tmp_install
+
+rm -rf tmp_install/data1
+./tmp_install/bin/initdb -D ./tmp_install/data1
+./tmp_install/bin/pg_ctl -w -D ./tmp_install/data1 -l logfile start
+./tmp_install/bin/createdb regression
+
+cat >> ./tmp_install/data1/postgresql.conf <<-CONF
+    wal_level=logical
+    max_replication_slots=4
+    max_prepared_transactions=20
+    shared_preload_libraries='test_decoding'
+    wal_sender_timeout=600000
+CONF
+./tmp_install/bin/pg_ctl -w -D ./tmp_install/data1 -l logfile restart
+
+python3 repconsumer.py > xlog_decoded &
+REPCONSUMER_PID=$!
+
+sleep 3
+
+cd src/test/regress
+
+./pg_regress --inputdir=. --bindir='../../../tmp_install/bin' --dlpath=. --schedule=./parallel_schedule --use-existing
+
+# ./pg_regress --inputdir=. --bindir='../../../tmp_install/bin' --dlpath=. --schedule=./serial_schedule --use-existing
+
+cd ../../..
+
+kill $REPCONSUMER_PID
+./tmp_install/bin/pg_ctl -D ./tmp_install/data1 -l logfile stop
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 9e407d5..322da32 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1928,6 +1928,7 @@ StartTransaction(void)
 	 */
 	s->state = TRANS_INPROGRESS;
 
+	CallXactCallbacks(XACT_EVENT_START);
 	ShowTransactionState("StartTransaction");
 }
 
@@ -2264,9 +2265,12 @@ PrepareTransaction(void)
 	 * transaction.  That seems to require much more bookkeeping though.
 	 */
 	if ((MyXactFlags & XACT_FLAGS_ACCESSEDTEMPREL))
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("cannot PREPARE a transaction that has operated on temporary tables")));
+	{
+		if (strncmp(prepareGID, "test_decoding:", 14) != 0)
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("cannot PREPARE a transaction that has operated on temporary tables")));
+	}
 
 	/*
 	 * Likewise, don't allow PREPARE after pg_export_snapshot.  This could be
@@ -2749,6 +2753,8 @@ CommitTransactionCommand(void)
 {
 	TransactionState s = CurrentTransactionState;
 
+	CallXactCallbacks(XACT_EVENT_COMMIT_COMMAND);
+
 	switch (s->blockState)
 	{
 			/*
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index e8bf39b..e884138 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -102,6 +102,7 @@ extern int  MyXactFlags;
  */
 typedef enum
 {
+	XACT_EVENT_START,
 	XACT_EVENT_COMMIT,
 	XACT_EVENT_PARALLEL_COMMIT,
 	XACT_EVENT_ABORT,
@@ -109,7 +110,8 @@ typedef enum
 	XACT_EVENT_PREPARE,
 	XACT_EVENT_PRE_COMMIT,
 	XACT_EVENT_PARALLEL_PRE_COMMIT,
-	XACT_EVENT_PRE_PREPARE
+	XACT_EVENT_PRE_PREPARE,
+	XACT_EVENT_COMMIT_COMMAND
 } XactEvent;
 
 typedef void (*XactCallback) (XactEvent event, void *arg);
diff --git a/src/test/regress/sql/transactions.sql b/src/test/regress/sql/transactions.sql
index bf9cb05..de440e9 100644
--- a/src/test/regress/sql/transactions.sql
+++ b/src/test/regress/sql/transactions.sql
@@ -39,11 +39,11 @@ SELECT * FROM aggtest;
 CREATE TABLE writetest (a int);
 CREATE TEMPORARY TABLE temptest (a int);
 
-BEGIN;
-SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE; -- ok
-SELECT * FROM writetest; -- ok
-SET TRANSACTION READ WRITE; --fail
-COMMIT;
+-- BEGIN;
+-- SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE; -- ok
+-- SELECT * FROM writetest; -- ok
+-- SET TRANSACTION READ WRITE; --fail
+-- COMMIT;
 
 BEGIN;
 SET TRANSACTION READ ONLY; -- ok
