From cf9a73649e99dbaf9b8c8747c19ca3dac3396a40 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Wed, 4 Nov 2020 18:11:58 +1100
Subject: [PATCH v16] Support 2PC txn - subscriber tests.

This patch adds the two-phase commit subscriber test code (streaming and not streaming).
---
 src/test/subscription/t/020_twophase.pl           | 345 ++++++++++++++
 src/test/subscription/t/021_twophase_streaming.pl | 521 ++++++++++++++++++++++
 2 files changed, 866 insertions(+)
 create mode 100644 src/test/subscription/t/020_twophase.pl
 create mode 100644 src/test/subscription/t/021_twophase_streaming.pl

diff --git a/src/test/subscription/t/020_twophase.pl b/src/test/subscription/t/020_twophase.pl
new file mode 100644
index 0000000..f489f47
--- /dev/null
+++ b/src/test/subscription/t/020_twophase.pl
@@ -0,0 +1,345 @@
+# logical replication of 2PC test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 21;
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->start;
+
+# Create some pre-existing content on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_full (a int PRIMARY KEY)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_full SELECT generate_series(1,10)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_full2 (x text)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_full (a int PRIMARY KEY)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_full2 (x text)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub");
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub ADD TABLE tab_full, tab_full2");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',	"
+	CREATE SUBSCRIPTION tap_sub
+	CONNECTION '$publisher_connstr application_name=$appname'
+	PUBLICATION tap_pub");
+
+# Wait for subscriber to finish initialization
+my $caughtup_query =
+	"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';";
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# Also wait for initial table sync to finish
+my $synced_query =
+	"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+###############################
+# check that 2PC gets replicated to subscriber
+# then COMMIT PREPARED
+###############################
+
+$node_publisher->safe_psql('postgres', "
+	BEGIN;
+	INSERT INTO tab_full VALUES (11);
+	PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# check that 2PC gets committed on subscriber
+$node_publisher->safe_psql('postgres',
+	"COMMIT PREPARED 'test_prepared_tab_full';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a = 11;");
+is($result, qq(1), 'Row inserted via 2PC has committed on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+is($result, qq(0), 'transaction is committed on subscriber');
+
+###############################
+# check that 2PC gets replicated to subscriber
+# then ROLLBACK PREPARED
+###############################
+
+$node_publisher->safe_psql('postgres',"
+	BEGIN;
+	INSERT INTO tab_full VALUES (12);
+	PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# check that 2PC gets aborted on subscriber
+$node_publisher->safe_psql('postgres',
+	"ROLLBACK PREPARED 'test_prepared_tab_full';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is aborted on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a = 12;");
+is($result, qq(0), 'Row inserted via 2PC is not present on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# Check that ROLLBACK PREPARED is decoded properly on crash restart
+# (publisher and subscriber crash)
+###############################
+
+$node_publisher->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab_full VALUES (12);
+    INSERT INTO tab_full VALUES (13);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+
+$node_subscriber->stop('immediate');
+$node_publisher->stop('immediate');
+
+$node_publisher->start;
+$node_subscriber->start;
+
+# rollback post the restart
+$node_publisher->safe_psql('postgres',
+	"ROLLBACK PREPARED 'test_prepared_tab';");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check inserts are rolled back
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a IN (12,13);");
+is($result, qq(0), 'Rows inserted via 2PC are visible on the subscriber');
+
+###############################
+# Check that COMMIT PREPARED is decoded properly on crash restart
+# (publisher and subscriber crash)
+###############################
+
+$node_publisher->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab_full VALUES (12);
+    INSERT INTO tab_full VALUES (13);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+
+$node_subscriber->stop('immediate');
+$node_publisher->stop('immediate');
+
+$node_publisher->start;
+$node_subscriber->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres',
+	"COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a IN (12,13);");
+is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber');
+
+###############################
+# Check that COMMIT PREPARED is decoded properly on crash restart
+# (subscriber only crash)
+###############################
+
+$node_publisher->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab_full VALUES (14);
+    INSERT INTO tab_full VALUES (15);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+
+$node_subscriber->stop('immediate');
+$node_subscriber->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres',
+	"COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a IN (14,15);");
+is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber');
+
+###############################
+# Check that COMMIT PREPARED is decoded properly on crash restart
+# (publisher only crash)
+###############################
+
+$node_publisher->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab_full VALUES (16);
+    INSERT INTO tab_full VALUES (17);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+
+$node_publisher->stop('immediate');
+$node_publisher->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres',
+	"COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a IN (16,17);");
+is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber');
+
+###############################
+# Test nested transaction with 2PC
+###############################
+
+# check that 2PC gets replicated to subscriber
+$node_publisher->safe_psql('postgres', "
+	BEGIN;
+	INSERT INTO tab_full VALUES (21);
+	SAVEPOINT sp_inner;
+	INSERT INTO tab_full VALUES (22);
+	ROLLBACK TO SAVEPOINT sp_inner;
+	PREPARE TRANSACTION 'outer';
+	");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'outer';");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# COMMIT
+$node_publisher->safe_psql('postgres', "
+	COMMIT PREPARED 'outer';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check the tx state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'outer';");
+is($result, qq(0), 'transaction is ended on subscriber');
+
+# check inserts are visible. 22 should be rolled back. 21 should be committed.
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a IN (21);");
+is($result, qq(1), 'Rows committed are on the subscriber');
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_full where a IN (22);");
+is($result, qq(0), 'Rows rolled back are not on the subscriber');
+
+###############################
+# Test using empty GID
+###############################
+
+# check that 2PC gets replicated to subscriber
+$node_publisher->safe_psql('postgres', "
+	BEGIN;
+	INSERT INTO tab_full VALUES (51);
+	PREPARE TRANSACTION '';");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = '';");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# ROLLBACK
+$node_publisher->safe_psql('postgres',
+	"ROLLBACK PREPARED '';");
+
+# check that 2PC gets aborted on subscriber
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = '';");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# Test cases involving DDL.
+###############################
+
+# TODO This can be added after we add functionality to replicate DDL changes to subscriber.
+
+###############################
+# check all the cleanup
+###############################
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0),
+	'check subscription relation status was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
diff --git a/src/test/subscription/t/021_twophase_streaming.pl b/src/test/subscription/t/021_twophase_streaming.pl
new file mode 100644
index 0000000..9a03b83
--- /dev/null
+++ b/src/test/subscription/t/021_twophase_streaming.pl
@@ -0,0 +1,521 @@
+# logical replication of 2PC test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 28;
+
+###############################
+# Test setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', qq(max_prepared_transactions = 10));
+$node_publisher->append_conf('postgresql.conf', qq(logical_decoding_work_mem = 64kB));
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf', qq(max_prepared_transactions = 10));
+$node_subscriber->start;
+
+# Create some pre-existing content on publisher (uses same DDL as 015_stream test)
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber (columns a and b are compatible with same table name on publisher)
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+
+# Setup logical replication (streaming = on)
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres', "
+	CREATE SUBSCRIPTION tap_sub
+	CONNECTION '$publisher_connstr application_name=$appname'
+	PUBLICATION tap_pub
+	WITH (streaming = on)");
+
+# Wait for subscriber to finish initialization
+my $caughtup_query =
+	"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';";
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# Also wait for initial table sync to finish
+my $synced_query =
+	"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+	or die "Timed out while waiting for subscriber to synchronize data";
+
+###############################
+# Check initial data was copied to subscriber
+###############################
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check initial data was copied to subscriber');
+
+###############################
+# Test 2PC PREPARE / COMMIT PREPARED
+# 1. Data is streamed as a 2PC transaction.
+# 2. Then do commit prepared.
+#
+# Expect all data is replicated on subscriber side after the commit.
+###############################
+
+# check that 2PC gets replicated to subscriber
+# Insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# 2PC gets committed
+$node_publisher->safe_psql('postgres',
+	"COMMIT PREPARED 'test_prepared_tab';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults');
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';");
+is($result, qq(0), 'transaction is committed on subscriber');
+
+###############################
+# Test 2PC PREPARE / ROLLBACK PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC
+# 3. Do rollback prepared.
+#
+# Expect data rolls back leaving only the original 2 rows.
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', q{
+    DELETE FROM test_tab WHERE a > 2;});
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# 2PC tx gets aborted
+$node_publisher->safe_psql('postgres',
+	"ROLLBACK PREPARED 'test_prepared_tab';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is aborted on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check initial data was copied to subscriber');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# Check that 2PC ROLLBACK PREPARED is decoded properly on crash restart.
+# 1. insert, update and delete enough rows to exceed the 64kB limit.
+# 2. Then server crashes before the 2PC transaction is rolled back.
+# 3. After servers are restarted the pending transaction is rolled back.
+#
+# Expect all inserted data is gone.
+# (Note: both publisher and subscriber crash/restart)
+###############################
+
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_subscriber->stop('immediate');
+$node_publisher->stop('immediate');
+
+$node_publisher->start;
+$node_subscriber->start;
+
+# rollback post the restart
+$node_publisher->safe_psql('postgres',
+	"ROLLBACK PREPARED 'test_prepared_tab';");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check inserts are rolled back
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'Rows inserted by 2PC are rolled back, leaving only the original 2 rows');
+
+###############################
+# Check that 2PC COMMIT PREPARED is decoded properly on crash restart.
+# 1. insert, update and delete enough rows to exceed the 64kB limit.
+# 2. Then server crashes before the 2PC transaction is committed.
+# 3. After servers are restarted the pending transaction is committed.
+#
+# Expect all data is replicated on subscriber side after the commit.
+# (Note: both publisher and subscriber crash/restart)
+###############################
+
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_subscriber->stop('immediate');
+$node_publisher->stop('immediate');
+
+$node_publisher->start;
+$node_subscriber->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres',
+	"COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults');
+
+###############################
+# Check that 2PC COMMIT PREPARED is decoded properly on crash restart.
+# 1. insert, update and delete enough rows to exceed the 64kB limit.
+# 2. Then 1 server crashes before the 2PC transaction is committed.
+# 3. After servers are restarted the pending transaction is committed.
+#
+# Expect all data is replicated on subscriber side after the commit.
+# (Note: only subscriber crashes)
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', q{
+    DELETE FROM test_tab WHERE a > 2;});
+
+# insert, update, delete enough data to cause streaming
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_subscriber->stop('immediate');
+$node_subscriber->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres',
+	"COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber');
+
+###############################
+# Check that 2PC COMMIT PREPARED is decoded properly on crash restart.
+# 1. insert, update and delete enough rows to exceed the 64kB limit.
+# 2. Then 1 server crashes before the 2PC transaction is committed.
+# 3. After servers are restarted the pending transaction is committed.
+#
+# Expect all data is replicated on subscriber side after the commit.
+# (Note: only publisher crashes)
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', q{
+    DELETE FROM test_tab WHERE a > 2;});
+
+# insert, update, delete enough data to cause streaming
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->stop('immediate');
+$node_publisher->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres',
+	"COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber');
+
+###############################
+# Do INSERT after the PREPARE but before ROLLBACK PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC.
+# 3. A single row INSERT is done which is after the PREPARE
+# 4. Then do a ROLLBACK PREPARED.
+#
+# Expect the 2PC data rolls back leaving only 3 rows on the subscriber.
+# (the original 2 + inserted 1)
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', q{
+    DELETE FROM test_tab WHERE a > 2;});
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# Insert a different record (now we are outside of the 2PC tx)
+# Note: the 2PC tx still holds row locks so make sure this insert is for a separate primary key
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (99999, 'foobar')");
+
+# 2PC tx gets aborted
+$node_publisher->safe_psql('postgres',
+	"ROLLBACK PREPARED 'test_prepared_tab';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is aborted on subscriber,
+# but the extra INSERT outside of the 2PC still was replicated
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3|3|3), 'check the outside insert was copied to subscriber');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# Do INSERT after the PREPARE but before COMMIT PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC.
+# 3. A single row INSERT is done which is after the PREPARE.
+# 4. Then do a COMMIT PREPARED.
+#
+# Expect 2PC data + the extra row are on the subscriber.
+# (the 3334 + inserted 1 = 3335)
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', q{
+    DELETE FROM test_tab WHERE a > 2;});
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# Insert a different record (now we are outside of the 2PC tx)
+# Note: the 2PC tx still holds row locks so make sure this insert is for a separare primary key
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (99999, 'foobar')");
+
+# 2PC tx gets committed
+$node_publisher->safe_psql('postgres',
+	"COMMIT PREPARED 'test_prepared_tab';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3335|3335|3335), 'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';");
+is($result, qq(0), 'transaction is committed on subscriber');
+
+###############################
+# Do DELETE after PREPARE but before COMMIT PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC.
+# 3. A single row DELETE is done for one of the records that was inserted by the 2PC transaction
+# 4. Then there is a COMMIT PREPARED.
+#
+# Expect all the 2PC data rows on the subscriber (since in fact delete at step 3 would do nothing
+# because that record was not yet committed at the time of the delete).
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', q{
+    DELETE FROM test_tab WHERE a > 2;});
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# DELETE one of the prepared 2PC records before they get committed (we are outside of the 2PC tx)
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test_tab WHERE a = 5");
+
+# 2PC tx gets committed
+$node_publisher->safe_psql('postgres',
+	"COMMIT PREPARED 'test_prepared_tab';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber. Nothing was deleted');
+
+# confirm the "deleted" row was in fact not deleted
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM test_tab WHERE a = 5");
+is($result, qq(1), 'The row we deleted before the commit till exists on subscriber.');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab';");
+is($result, qq(0), 'transaction is committed on subscriber');
+
+###############################
+# Try 2PC tx works using an empty GID literal
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', q{
+    DELETE FROM test_tab WHERE a > 2;});
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION '';});
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts where gid = '';");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# 2PC tx gets committed
+$node_publisher->safe_psql('postgres',
+	"COMMIT PREPARED '';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber');
+
+###############################
+# Test cases involving DDL
+###############################
+
+# TODO This can be added after we add functionality to replicate DDL changes to subscriber.
+
+###############################
+# check all the cleanup
+###############################
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0),
+	'check subscription relation status was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
1.8.3.1

