On 2018-11-01 01:29, Euler Taveira wrote:
Em qua, 28 de fev de 2018 às 20:03, Euler Taveira
<eu...@timbira.com.br> escreveu:
The attached patches add support for filtering rows in the publisher.
I ran pgbench-over-logical-replication with a WHERE-clause and could not
get this to do a correct replication. Below is the output of the
attached test program.
$ ./logrep_rowfilter.sh
--
/home/aardvark/pg_stuff/pg_installations/pgsql.logrep_rowfilter/bin.fast/initdb
--pgdata=/tmp/cascade/instance1/data --encoding=UTF8 --pwfile=/tmp/bugs
--
/home/aardvark/pg_stuff/pg_installations/pgsql.logrep_rowfilter/bin.fast/initdb
--pgdata=/tmp/cascade/instance2/data --encoding=UTF8 --pwfile=/tmp/bugs
--
/home/aardvark/pg_stuff/pg_installations/pgsql.logrep_rowfilter/bin.fast/initdb
--pgdata=/tmp/cascade/instance3/data --encoding=UTF8 --pwfile=/tmp/bugs
sleep 3s
dropping old tables...
creating tables...
generating data...
100000 of 100000 tuples (100%) done (elapsed 0.09 s, remaining 0.00 s)
vacuuming...
creating primary keys...
done.
create publication pub_6515_to_6516;
alter publication pub_6515_to_6516 add table pgbench_accounts where (aid
between 40000 and 60000-1) ; --> where 1
alter publication pub_6515_to_6516 add table pgbench_branches;
alter publication pub_6515_to_6516 add table pgbench_tellers;
alter publication pub_6515_to_6516 add table pgbench_history;
create publication pub_6516_to_6517;
alter publication pub_6516_to_6517 add table pgbench_accounts ; -- where
(aid between 40000 and 60000-1) ; --> where 2
alter publication pub_6516_to_6517 add table pgbench_branches;
alter publication pub_6516_to_6517 add table pgbench_tellers;
alter publication pub_6516_to_6517 add table pgbench_history;
create subscription pub_6516_from_6515 connection 'port=6515
application_name=rowfilter'
publication pub_6515_to_6516 with(enabled=false);
alter subscription pub_6516_from_6515 enable;
create subscription pub_6517_from_6516 connection 'port=6516
application_name=rowfilter'
publication pub_6516_to_6517 with(enabled=false);
alter subscription pub_6517_from_6516 enable;
-- pgbench -p 6515 -c 16 -j 8 -T 5 -n postgres # scale 1
transaction type: <builtin: TPC-B (sort of)>
scaling factor: 1
query mode: simple
number of clients: 16
number of threads: 8
duration: 5 s
number of transactions actually processed: 80
latency average = 1178.106 ms
tps = 13.581120 (including connections establishing)
tps = 13.597443 (excluding connections establishing)
accounts branches tellers history
--------- --------- --------- ---------
6515 6546b1f0f 2d328ed28 7406473b0 7c1351523 e8c07347b
6516 6546b1f0f 2d328ed28 d41d8cd98 d41d8cd98 e7235f541
6517 f7c0791c8 d9c63e471 d41d8cd98 d41d8cd98 30892eea1 NOK
6515 6546b1f0f 2d328ed28 7406473b0 7c1351523 e8c07347b
6516 6546b1f0f 2d328ed28 7406473b0 5a54cf7c5 191ae1af3
6517 6546b1f0f 2d328ed28 7406473b0 5a54cf7c5 191ae1af3 NOK
6515 6546b1f0f 2d328ed28 7406473b0 7c1351523 e8c07347b
6516 6546b1f0f 2d328ed28 7406473b0 5a54cf7c5 191ae1af3
6517 6546b1f0f 2d328ed28 7406473b0 5a54cf7c5 191ae1af3 NOK
[...]
I let that run for 10 minutes or so but that pgbench_history table
md5-values (of ports 6516 and 6517) do not change anymore, which shows
that it is and remains different from the original pgbench_history table
in 6515.
When there is a where-clause this goes *always* wrong.
Without a where-clause all logical replication tests were OK. Perhaps
the error is not in our patch but something in logical replication.
Attached is the test program (will need some tweaking of PATHs,
PG-variables (PGPASSFILE) etc). This is the same program I used in
march when you first posted a version of this patch alhough the error is
different.
thanks,
Erik Rijkers
#!/bin/bash
# postgres binary compiled with
#
# 20181101
# 0001-Remove-unused-atttypmod-column-from-initial-table-sy.patch
# 0002-Store-number-of-tuples-in-WalRcvExecResult.patch
# 0003-Refactor-function-create_estate_for_relation.patch
# 0004-Rename-a-WHERE-node.patch
# 0005-Row-filtering-for-logical-replication.patch
# 0006-Print-publication-WHERE-condition-in-psql.patch
# 0007-Publication-where-condition-support-for-pg_dump.patch
# 0008-Debug-for-row-filtering.patch
#
unset PGDATABASE PGPORT PGSERVICE
export PGDATABASE=postgres
scale=1
root_dir=/tmp/cascade
BIN=$HOME/pg_stuff/pg_installations/pgsql.logrep_rowfilter/bin.fast
export PATH=$BIN:$PATH
initdb=$BIN/initdb
postgres=$BIN/postgres
pg_ctl=$BIN/pg_ctl
baseport=6515
appname=rowfilter
if [[ -d $root_dir/instance1 ]]; then rm -rf $root_dir/instance1; fi
if [[ -d $root_dir/instance2 ]]; then rm -rf $root_dir/instance2; fi
if [[ -d $root_dir/instance3 ]]; then rm -rf $root_dir/instance3; fi
if [[ -d $root_dir/instance1 ]]; then exit ; fi
if [[ -d $root_dir/instance2 ]]; then exit ; fi
if [[ -d $root_dir/instance3 ]]; then exit ; fi
devel_file=/tmp/bugs
echo filterbug>$devel_file
num_instances=3
for n in `seq 1 $num_instances`
do
instance=instance$n
server_dir=$root_dir/$instance
data_dir=$server_dir/data
port=$(( 6515 + $n -1 ))
logfile=$server_dir/logfile.$port
echo "-- $initdb --pgdata=$data_dir --encoding=UTF8 --pwfile=$devel_file "
$initdb --pgdata=$data_dir --encoding=UTF8 --pwfile=$devel_file &> /dev/null
( $postgres -D $data_dir -p $port \
--wal_level=logical --logging_collector=on \
--client_min_messages=warning \
--log_directory=$server_dir --log_filename=logfile.${port} \
--log_replication_commands=on & ) &> /dev/null
done
echo "sleep 3s"
sleep 3
echo "
drop table if exists pgbench_accounts;
drop table if exists pgbench_branches;
drop table if exists pgbench_tellers;
drop table if exists pgbench_history;" | psql -qXp 6515 \
&& echo "
drop table if exists pgbench_accounts;
drop table if exists pgbench_branches;
drop table if exists pgbench_tellers;
drop table if exists pgbench_history;" | psql -qXp 6516 \
&& pgbench -p 6515 -qis $scale \
&& echo "alter table pgbench_history add column hid serial primary key;" \
| psql -q1Xp 6515 && pg_dump -F c -p 6515 \
--exclude-table-data=pgbench_history \
--exclude-table-data=pgbench_accounts \
--exclude-table-data=pgbench_branches \
--exclude-table-data=pgbench_tellers \
-t pgbench_history -t pgbench_accounts \
-t pgbench_branches -t pgbench_tellers \
| pg_restore -1 -p 6516 -d postgres
where="where (aid between 40000 and 60000-1)"
where2="where (aid between 40000 and 60000-1)"
outcomment='; --'
if [[ $num_instances -eq 3 ]]; then
pg_dump -F c -p 6515 \
--exclude-table-data=pgbench_history \
--exclude-table-data=pgbench_accounts \
--exclude-table-data=pgbench_branches \
--exclude-table-data=pgbench_tellers \
-t pgbench_history -t pgbench_accounts \
-t pgbench_branches -t pgbench_tellers \
| pg_restore -1 -p 6517 -d postgres
fi
pub1=pub_6515_to_6516 sub1=pub_6516_from_6515
pub2=pub_6516_to_6517 sub2=pub_6517_from_6516
echo "
create publication $pub1;
alter publication $pub1 add table pgbench_accounts $where ; --> where 1
alter publication $pub1 add table pgbench_branches;
alter publication $pub1 add table pgbench_tellers;
alter publication $pub1 add table pgbench_history;
" | psql -p 6515 -aqtAX
if [[ $num_instances -eq 3 ]]; then
echo "
create publication $pub2;
alter publication $pub2 add table pgbench_accounts $outcomment $where2 ; --> where 2
alter publication $pub2 add table pgbench_branches;
alter publication $pub2 add table pgbench_tellers;
alter publication $pub2 add table pgbench_history;
" | psql -p 6516 -aqtAX
fi
echo "
create subscription $sub1 connection 'port=6515 application_name=$appname'
publication $pub1 with(enabled=false);
alter subscription $sub1 enable;" | psql -p 6516 -aqtAX
if [[ $num_instances -eq 3 ]]; then
echo "
create subscription $sub2 connection 'port=6516 application_name=$appname'
publication $pub2 with(enabled=false);
alter subscription $sub2 enable;" | psql -p 6517 -aqtAX
fi
echo "-- pgbench -p 6515 -c 16 -j 8 -T 5 -n postgres # scale $scale"
pgbench -p 6515 -c 16 -j 8 -T 5 -n postgres # scale $scale
echo
echo " accounts branches tellers history"
echo " --------- --------- --------- ---------"
while [[ 1 -eq 1 ]]
do
md5_6515=6515 md5_6516=6516 md5_6517=6517
md5_a=$(echo "select * from pgbench_accounts $where order by aid" | psql -qtAXp 6515 | md5sum | cut -b1-9)
md5_b=$(echo "select * from pgbench_branches order by bid" | psql -qtAXp 6515 | md5sum | cut -b1-9)
md5_t=$(echo "select * from pgbench_tellers order by tid" | psql -qtAXp 6515 | md5sum | cut -b1-9)
md5_h=$(echo "select * from pgbench_history order by hid" | psql -qtAXp 6515 | md5sum | cut -b1-9)
md5_6515=$( echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-9)
echo "6515 $md5_a $md5_b $md5_t $md5_h $md5_6515 "
md5_a=$(echo "select * from pgbench_accounts $where order by aid" | psql -qtAXp 6516 | md5sum | cut -b1-9)
md5_b=$(echo "select * from pgbench_branches order by bid" | psql -qtAXp 6516 | md5sum | cut -b1-9)
md5_t=$(echo "select * from pgbench_tellers order by tid" | psql -qtAXp 6516 | md5sum | cut -b1-9)
md5_h=$(echo "select * from pgbench_history order by hid" | psql -qtAXp 6516 | md5sum | cut -b1-9)
md5_6516=$( echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-9)
echo "6516 $md5_a $md5_b $md5_t $md5_h $md5_6516 "
if [[ $num_instances -eq 3 ]]; then
md5_a=$(echo "select * from pgbench_accounts $where order by aid" | psql -qtAXp 6517 | md5sum | cut -b1-9)
md5_b=$(echo "select * from pgbench_branches order by bid" | psql -qtAXp 6517 | md5sum | cut -b1-9)
md5_t=$(echo "select * from pgbench_tellers order by tid" | psql -qtAXp 6517 | md5sum | cut -b1-9)
md5_h=$(echo "select * from pgbench_history order by hid" | psql -qtAXp 6517 | md5sum | cut -b1-9)
md5_6517=$( echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-9)
echo -ne "6517 $md5_a $md5_b $md5_t $md5_h $md5_6517 "
else
echo
fi
if [[ "$md5_6515" == "$md5_6516" ]]
then
if [[ $num_instances -eq 2 ]]; then
echo " ok (2)"
break
elif [[ $num_instances -eq 3 ]]; then
if [[ "$md5_6515" == "$md5_6517" ]]
then
echo " ok"
break
fi
fi
fi
echo " NOK"
echo
sleep 10
done
echo
# stop instances
for n in `seq 1 $num_instances`
do
instance=instance$n
server_dir=$root_dir/$instance
data_dir=$server_dir/data
port=$(( 6515 + $n - 1 ))
logfile=$server_dir/logfile.$port
$pg_ctl stop -D $data_dir --mode=immediate --wait
done
# delete everything
echo "rm -rf /tmp/cascade/instance*"
rm -rf /tmp/cascade/instance*