Hello! Added a TAP test for this case.
On 30.08.2022 10:09, Anton A. Melnikov wrote:
With best regards,Hello! The patch was rebased on current master. And here is a simplified crash reproduction: 1) On primary with 'wal_level = logical' execute: CREATE TABLE public.test (id int NOT NULL, val integer); CREATE PUBLICATION test_pub FOR TABLE test; 2) On replica replace XXXX in the repcmd.sql attached with primary port and execute it: psql -f repcmd.sql 3) On master execute command: INSERT INTO test VALUES ('1');
-- Anton A. Melnikov Postgres Professional: http://www.postgrespro.com The Russian Postgres Company
commit d539e1e36ef7af33e1a89eaee00183739c716797 Author: Anton A. Melnikov <a.melni...@postgrespro.ru> Date: Sun Aug 21 18:27:44 2022 +0300 Fix logical replica crash if there was an error in a function. diff --git a/src/backend/catalog/pg_proc.c b/src/backend/catalog/pg_proc.c index a9fe45e347..1381fae575 100644 --- a/src/backend/catalog/pg_proc.c +++ b/src/backend/catalog/pg_proc.c @@ -1007,8 +1007,9 @@ sql_function_parse_error_callback(void *arg) * anonymous-block handler, not only for SQL-language functions. * It is assumed that the syntax error position is initially relative to the * function body string (as passed in). If possible, we adjust the position - * to reference the original command text; if we can't manage that, we set - * up an "internal query" syntax error instead. + * to reference the original command text; if we can't manage that or + * can't get the original command text when ActivePortal is not defined, + * we set up an "internal query" syntax error instead. * * Returns true if a syntax error was processed, false if not. */ @@ -1016,7 +1017,7 @@ bool function_parse_error_transpose(const char *prosrc) { int origerrposition; - int newerrposition; + int newerrposition = 0; const char *queryText; /* @@ -1034,12 +1035,15 @@ function_parse_error_transpose(const char *prosrc) return false; } - /* We can get the original query text from the active portal (hack...) */ - Assert(ActivePortal && ActivePortal->status == PORTAL_ACTIVE); - queryText = ActivePortal->sourceText; + if (ActivePortal) + { + /* We can get the original query text from the active portal (hack...) */ + Assert(ActivePortal->status == PORTAL_ACTIVE); + queryText = ActivePortal->sourceText; - /* Try to locate the prosrc in the original text */ - newerrposition = match_prosrc_to_query(prosrc, queryText, origerrposition); + /* Try to locate the prosrc in the original text */ + newerrposition = match_prosrc_to_query(prosrc, queryText, origerrposition); + } if (newerrposition > 0) { @@ -1052,7 +1056,8 @@ function_parse_error_transpose(const char *prosrc) else { /* - * If unsuccessful, convert the position to an internal position + * If unsuccessful or ActivePortal not defined and original command + * text is unreachable, convert the position to an internal position * marker and give the function text as the internal query. */ errposition(0); diff --git a/src/test/recovery/t/034_logical_replica_on_error.pl b/src/test/recovery/t/034_logical_replica_on_error.pl new file mode 100644 index 0000000000..380ad74590 --- /dev/null +++ b/src/test/recovery/t/034_logical_replica_on_error.pl @@ -0,0 +1,105 @@ +# Copyright (c) 2022, Postgres Professional + +# There was an assertion if function text contains an error. See PGPRO-7025 +# Тhis file has a prefix (120_) to prevent prefix collision with +# upstream test files. + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More tests => 2; + +# Initialize primary node +my $node_primary = PostgreSQL::Test::Cluster->new('primary'); +$node_primary->init(allows_streaming => 1); +$node_primary->append_conf( + 'postgresql.conf', qq( +wal_level = logical +)); +$node_primary->start; + +$node_primary->safe_psql('postgres', + 'CREATE TABLE public.test (id int NOT NULL, val integer);'); + +$node_primary->safe_psql('postgres', + 'CREATE PUBLICATION test_pub FOR TABLE test;'); + +# Initialize logical replica node +my $node_replica = PostgreSQL::Test::Cluster->new('replica'); +$node_replica->init(has_streaming => 1, + has_restoring => 1); +$node_replica->start; + +$node_replica->safe_psql('postgres', + 'CREATE TABLE test (id int NOT NULL, val integer);'); + +$node_replica->safe_psql('postgres', q{ + create or replace procedure rebuild_test( + ) as + $body$ + declare + l_code text:= E'create or replace function public.test_selector( + ) returns setof public.test as + \$body\$ + select * from error_name + \$body\$ language sql;'; + begin + execute l_code; + perform public.test_selector(); + end + $body$ language plpgsql; +}); + +$node_replica->safe_psql('postgres', ' + create or replace function test_trg() + returns trigger as + $body$ + declare + begin + call public.rebuild_test(); + return NULL; + end + $body$ language plpgsql; +'); + +$node_replica->safe_psql('postgres', + 'create trigger test_trigger after insert or update or delete on test for each row execute function test_trg();'); + +$node_replica->safe_psql('postgres', + 'alter table test enable replica trigger test_trigger;'); + +my $primary_port = $node_primary->port; +my $conn_params = "port=" . $primary_port . " dbname=postgres"; + +my ($stdin, $stdout, $stderr) = $node_replica->psql('postgres', + qq[CREATE SUBSCRIPTION test_sub CONNECTION \'$conn_params\' PUBLICATION test_pub;] +); + +ok($stderr =~ /NOTICE: created replication slot \"test_sub\" on publisher/, + "Logical decoding correctly fails on function error"); + +my $worker_pid = + $node_replica->safe_psql('postgres', q{ + select pid from pg_stat_subscription where subname = 'test_sub' limit 1; + }); + +$node_primary->safe_psql('postgres', q{ + INSERT INTO test VALUES ('1'); + }); + +$node_replica->poll_query_until('postgres', qq[ + select $worker_pid != any(array_agg(pid)) as ready + from pg_stat_subscription where subname = \'test_sub\'; + ]) + or die "Timed out while waiting for new WAL with error from primary"; + +$node_replica->stop; +$node_primary->stop; + +my $log = $node_replica->logfile(); + +my $log_contents = slurp_file($log); + +ok($log_contents =~ /ERROR: relation \"error_name\" does not exist at character/, + "Logical decoding correctly fails on function error");
repcmd.sql
Description: application/sql