Aha, I see, thanks.  Here's a complete fix with included testcase.  In
an unpatched assert-enabled build, this crashes this

TRAP: FailedAssertion("!(ActiveSnapshotSet())", File: 
"/pgsql/source/REL_10_STABLE/src/backend/tcop/postgres.c", Line: 788)

Will push on Monday.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 4fc7fca136c58a94051b955b3f9a8b541c48c6dc Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Sat, 28 Jul 2018 13:50:13 -0400
Subject: [PATCH] Fix crash in logical rep subscriber

---
 src/backend/replication/logical/worker.c          |  4 +-
 src/test/subscription/t/009_domain_constraints.pl | 64 +++++++++++++++++++++++
 2 files changed, 67 insertions(+), 1 deletion(-)
 create mode 100644 src/test/subscription/t/009_domain_constraints.pl

diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index 39bfb6d39c..f4742c4d46 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -607,13 +607,15 @@ apply_handle_insert(StringInfo s)
        remoteslot = ExecInitExtraTupleSlot(estate);
        ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
 
+       /* Input functions may want a snapshot set */
+       PushActiveSnapshot(GetTransactionSnapshot());
+
        /* Process and store remote tuple in the slot */
        oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
        slot_store_cstrings(remoteslot, rel, newtup.values);
        slot_fill_defaults(rel, estate, remoteslot);
        MemoryContextSwitchTo(oldctx);
 
-       PushActiveSnapshot(GetTransactionSnapshot());
        ExecOpenIndices(estate->es_result_relation_info, false);
 
        /* Do the insert. */
diff --git a/src/test/subscription/t/009_domain_constraints.pl 
b/src/test/subscription/t/009_domain_constraints.pl
new file mode 100644
index 0000000000..5aacde37dd
--- /dev/null
+++ b/src/test/subscription/t/009_domain_constraints.pl
@@ -0,0 +1,64 @@
+# Basic logical replication test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create structure on both sides
+$node_publisher->safe_psql('postgres',
+       'CREATE FUNCTION my_constr(int) RETURNS bool LANGUAGE sql AS $$ select 
count(*) > 0 from pg_authid; $$;
+        CREATE DOMAIN pos_int AS int CHECK (my_constr(VALUE));
+        CREATE TABLE tab_rep (a pos_int);
+        INSERT INTO tab_rep VALUES (10)');
+
+$node_subscriber->safe_psql('postgres',
+       'CREATE FUNCTION my_constr(int) RETURNS bool LANGUAGE sql AS $$ select 
count(*) > 0 from pg_authid; $$;
+        CREATE DOMAIN pos_int AS int CHECK (my_constr(VALUE));
+        CREATE TABLE tab_rep (a pos_int);');
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub");
+$node_publisher->safe_psql('postgres',
+"ALTER PUBLICATION tap_pub ADD TABLE tab_rep"
+);
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr 
application_name=$appname' PUBLICATION tap_pub"
+);
+
+# restart the subscriber
+$node_subscriber->restart;
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_rep VALUES (11)");
+
+# Wait for subscriber to finish initialization
+my $caughtup_query =
+"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE 
application_name = '$appname';";
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+# Also wait for initial table sync to finish
+my $synced_query =
+"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 
's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+  $node_subscriber->safe_psql('postgres', "SELECT sum(a) FROM tab_rep");
+is($result, '21', 'replicated table contains data on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
2.11.0

Reply via email to