I ran a 5 cascaded setup of pub-subs on the latest patchset which starts pgbench on the first server and waits till the data on the fifth server matches the first. This is based on a test script created by Erik Rijkers. The tests run fine and the 5th server achieves data consistency in around a minute. Attaching the test script and the test run log.
regards, Ajin Cherian Fujitsu Australia
#!/bin/bash
# I compile postgres server versions into dir:
# $HOME/pg_stuff/pg_installations/pgsql.$project ( where project is a name )
#
# This script assumes that projects HEAD and head0 are present
# $HOME/pg_stuff/pg_installations/pgsql.HEAD --> git master as of today - friday 12 febr 2021
# $HOME/pg_stuff/pg_installations/pgsql.head0 --> 3063eb17593c so that's 11 febr, before the replication changes
#
# variables 'project' and 'BIN' reflect my set up (so should probably be changed)
#
# my instance HEAD has the bug: it keeps looping with 'NOK' (=Not OK): primary not identical to all replicas
# my instance head0 is ok - finishes in 20 s - with the replicas identical to primary
#
# Erik Rijkers
unset PGPORT PGSERVICE
export PGDATABASE=postgres
#if [[ "$1" == "HEAD" ]] ; then project=HEAD # new HEAD = 12 febr
#elif [[ "$1" == "head0" ]] ; then project=head0 # older: 3063eb17593c = 11 febr
#else
# echo "arg1 missing - "
# exit
#fi
#root_dir=/tmp/cascade/$project
project=two_phase
root_dir=/home/ajin/twophasetest/mar09
mkdir -p $root_dir
#BIN=$HOME/pg_stuff/pg_installations/pgsql.$project/bin
BIN=/home/ajin/install-oss/bin
export PATH=$BIN:$PATH
initdb=$BIN/initdb
postgres=$BIN/postgres
pg_ctl=$BIN/pg_ctl
baseport=6525
port1=$(( $baseport + 0 ))
port2=$(( $baseport + 1 ))
port3=$(( $baseport + 2 ))
port4=$(( $baseport + 3 ))
port5=$(( $baseport + 4 ))
appname=$project
num_instances=5 # or 2
scale=1 # pgbench -s
#clients=64 # pgbench -c
clients=1 # pgbench -c
duration=1 # how long to run: pgbench -T $duration
wait=1
if [[ -d $root_dir/instance1 ]]; then rm -rf $root_dir/instance1; fi
if [[ -d $root_dir/instance2 ]]; then rm -rf $root_dir/instance2; fi
if [[ -d $root_dir/instance3 ]]; then rm -rf $root_dir/instance3; fi
if [[ -d $root_dir/instance4 ]]; then rm -rf $root_dir/instance4; fi
if [[ -d $root_dir/instance5 ]]; then rm -rf $root_dir/instance5; fi
if [[ -d $root_dir/instance1 ]]; then exit ; fi
if [[ -d $root_dir/instance2 ]]; then exit ; fi
if [[ -d $root_dir/instance3 ]]; then exit ; fi
if [[ -d $root_dir/instance4 ]]; then exit ; fi
if [[ -d $root_dir/instance5 ]]; then exit ; fi
# devel_file=/tmp/bugs
# echo filterbug>$devel_file
for n in `seq 1 $num_instances`
do
instance=instance$n
server_dir=$root_dir/$instance
data_dir=$server_dir/data
port=$(( $baseport + $n -1 ))
logfile=$server_dir/logfile.$port
# echo "-- $initdb --pgdata=$data_dir --encoding=UTF8 --pwfile=$devel_file # $port"
# $initdb --pgdata=$data_dir --encoding=UTF8 --pwfile=$devel_file &> /dev/null
echo "-- $initdb --pgdata=$data_dir --encoding=UTF8 # $port"
$initdb --pgdata=$data_dir --encoding=UTF8 &> /dev/null
( $postgres -D $data_dir -p $port \
--wal_level=logical --logging_collector=on \
--client_min_messages=warning \
--log_directory=$server_dir --log_filename=logfile.${port} \
--log_replication_commands=on & ) &> /dev/null
done
# dbsize=$(echo " $scale * 100000 " | bc )
# sleep 1
echo "
drop table if exists pgbench_accounts;
drop table if exists pgbench_branches;
drop table if exists pgbench_tellers;
drop table if exists pgbench_history;" | psql -qXp $port1 \
&& echo "
drop table if exists pgbench_accounts;
drop table if exists pgbench_branches;
drop table if exists pgbench_tellers;
drop table if exists pgbench_history;" | psql -qXp $port2 \
&& echo "
drop table if exists pgbench_accounts;
drop table if exists pgbench_branches;
drop table if exists pgbench_tellers;
drop table if exists pgbench_history;" | psql -qXp $port3 \
&& echo "
drop table if exists pgbench_accounts;
drop table if exists pgbench_branches;
drop table if exists pgbench_tellers;
drop table if exists pgbench_history;" | psql -qXp $port4 \
&& pgbench -p $port1 -qis $scale \
&& echo "alter table pgbench_history add column hid serial primary key;" \
| psql -q1Xp $port1 && pg_dump -F c -p $port1 \
--exclude-table-data=pgbench_history \
--exclude-table-data=pgbench_accounts \
--exclude-table-data=pgbench_branches \
--exclude-table-data=pgbench_tellers \
-t pgbench_history -t pgbench_accounts \
-t pgbench_branches -t pgbench_tellers \
| pg_restore -1 -p $port2 -d postgres
if [[ $num_instances -eq 5 ]]
then
pg_dump -F c -p $port1 \
--exclude-table-data=pgbench_history \
--exclude-table-data=pgbench_accounts \
--exclude-table-data=pgbench_branches \
--exclude-table-data=pgbench_tellers \
-t pgbench_history -t pgbench_accounts \
-t pgbench_branches -t pgbench_tellers \
| pg_restore -1 -p $port3 -d postgres
pg_dump -F c -p $port1 \
--exclude-table-data=pgbench_history \
--exclude-table-data=pgbench_accounts \
--exclude-table-data=pgbench_branches \
--exclude-table-data=pgbench_tellers \
-t pgbench_history -t pgbench_accounts \
-t pgbench_branches -t pgbench_tellers \
| pg_restore -1 -p $port4 -d postgres
pg_dump -F c -p $port1 \
--exclude-table-data=pgbench_history \
--exclude-table-data=pgbench_accounts \
--exclude-table-data=pgbench_branches \
--exclude-table-data=pgbench_tellers \
-t pgbench_history -t pgbench_accounts \
-t pgbench_branches -t pgbench_tellers \
| pg_restore -1 -p $port5 -d postgres
fi
pub1=pub_${port1}_to_$port2 sub1=pub_${port2}_from_$port1
pub2=pub_${port2}_to_$port3 sub2=pub_${port3}_from_$port2
pub3=pub_${port3}_to_$port4 sub3=pub_${port4}_from_$port3
pub4=pub_${port4}_to_$port5 sub4=pub_${port5}_from_$port4
echo -ne "
create publication $pub1;
alter publication $pub1 add table pgbench_accounts;
alter publication $pub1 add table pgbench_branches;
alter publication $pub1 add table pgbench_tellers;
alter publication $pub1 add table pgbench_history;" | psql -p $port1 -aqtAX
if [[ $num_instances -eq 5 ]]; then
echo -ne "
create publication $pub2;
alter publication $pub2 add table pgbench_accounts;
alter publication $pub2 add table pgbench_branches;
alter publication $pub2 add table pgbench_tellers;
alter publication $pub2 add table pgbench_history;" | psql -p $port2 -aqtAX
echo -ne "
create publication $pub3;
alter publication $pub3 add table pgbench_accounts;
alter publication $pub3 add table pgbench_branches;
alter publication $pub3 add table pgbench_tellers;
alter publication $pub3 add table pgbench_history;" | psql -p $port3 -aqtAX
echo -ne "
create publication $pub4;
alter publication $pub4 add table pgbench_accounts;
alter publication $pub4 add table pgbench_branches;
alter publication $pub4 add table pgbench_tellers;
alter publication $pub4 add table pgbench_history;" | psql -p $port4 -aqtAX
fi
if [[ $num_instances -eq 5 ]]; then
echo "
create subscription $sub2 connection 'port=$port2 application_name=$appname'
publication $pub2 with(enabled=false);
alter subscription $sub2 enable;" | psql -p $port3 -aqtAX
echo "
create subscription $sub3 connection 'port=$port3 application_name=$appname'
publication $pub3 with(enabled=false);
alter subscription $sub3 enable;" | psql -p $port4 -aqtAX
echo "
create subscription $sub4 connection 'port=$port4 application_name=$appname'
publication $pub4 with(enabled=false);
alter subscription $sub4 enable;" | psql -p $port5 -aqtAX
fi
echo "
create subscription $sub1 connection 'port=$port1 application_name=$appname'
publication $pub1 with(enabled=false);
alter subscription $sub1 enable;" | psql -p $port2 -aqtAX
echo "-- pgbench -p $port1 -c $clients -j 8 -T $duration -n postgres # scale $scale"
pgbench -p $port1 -c $clients -j 8 -T $duration -n postgres # scale $scale
echo
echo " accounts branches tellers history"
echo " -------- -------- -------- --------"
while [[ 1 -eq 1 ]]
do
md5_6515=$port1 md5_6516=$port2 md5_6517=$port3 md5_6518=$port4 md5_6519=$port5
sql_a="select * from pgbench_accounts order by aid;"
sql_b="select * from pgbench_branches order by bid;"
sql_t="select * from pgbench_tellers order by tid;"
sql_h="select * from pgbench_history order by hid;"
md5_a=$(echo "$sql_a" | psql -qtAXp $port1 | md5sum | cut -b1-8)
md5_b=$(echo "$sql_b" | psql -qtAXp $port1 | md5sum | cut -b1-8)
md5_t=$(echo "$sql_t" | psql -qtAXp $port1 | md5sum | cut -b1-8)
md5_h=$(echo "$sql_h" | psql -qtAXp $port1 | md5sum | cut -b1-8)
md5_6515=$(echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-8)
echo -ne "$port1 $md5_a $md5_b $md5_t $md5_h $md5_6515 "
echo
md5_a=$(echo "$sql_a" | psql -qtAXp $port2 | md5sum | cut -b1-8)
md5_b=$(echo "$sql_b" | psql -qtAXp $port2 | md5sum | cut -b1-8)
md5_t=$(echo "$sql_t" | psql -qtAXp $port2 | md5sum | cut -b1-8)
md5_h=$(echo "$sql_h" | psql -qtAXp $port2 | md5sum | cut -b1-8)
md5_6516=$(echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-8)
echo "$port2 $md5_a $md5_b $md5_t $md5_h $md5_6516 "
if [[ $num_instances -eq 5 ]]; then
md5_a=$(echo "$sql_a" | psql -qtAXp $port3 | md5sum | cut -b1-8)
md5_b=$(echo "$sql_b" | psql -qtAXp $port3 | md5sum | cut -b1-8)
md5_t=$(echo "$sql_t" | psql -qtAXp $port3 | md5sum | cut -b1-8)
md5_h=$(echo "$sql_h" | psql -qtAXp $port3 | md5sum | cut -b1-8)
md5_6517=$(echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-8)
echo "$port3 $md5_a $md5_b $md5_t $md5_h $md5_6517 "
md5_a=$(echo "$sql_a" | psql -qtAXp $port4 | md5sum | cut -b1-8)
md5_b=$(echo "$sql_b" | psql -qtAXp $port4 | md5sum | cut -b1-8)
md5_t=$(echo "$sql_t" | psql -qtAXp $port4 | md5sum | cut -b1-8)
md5_h=$(echo "$sql_h" | psql -qtAXp $port4 | md5sum | cut -b1-8)
md5_6518=$(echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-8)
echo "$port4 $md5_a $md5_b $md5_t $md5_h $md5_6518 "
md5_a=$(echo "$sql_a" | psql -qtAXp $port5 | md5sum | cut -b1-8)
md5_b=$(echo "$sql_b" | psql -qtAXp $port5 | md5sum | cut -b1-8)
md5_t=$(echo "$sql_t" | psql -qtAXp $port5 | md5sum | cut -b1-8)
md5_h=$(echo "$sql_h" | psql -qtAXp $port5 | md5sum | cut -b1-8)
md5_6519=$(echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-8)
echo -ne "$port5 $md5_a $md5_b $md5_t $md5_h $md5_6519 "
else
echo
fi
if [[ "$md5_6515" == "$md5_6516" ]]
then
if [[ $num_instances -eq 2 ]]; then
echo " ok (2)"
break
elif [[ $num_instances -eq 5 ]]; then
if [[ "$md5_6515" == "$md5_6519" ]]
then
echo " ok"
break
fi
fi
fi
# not ready replicating yet, so: report, wait, and loop
echo -ne " NOK (${wait}s) [$project] scale [$scale]"
echo
echo
sleep $wait
done
echo
echo "select * from pg_publication" | psql -qXd $PGDATABASE -p $port1 | grep -v row
echo "select * from pg_subscription" | psql -qXd $PGDATABASE -p $port1 | grep -v row
echo
echo "select * from pg_publication" | psql -qXd $PGDATABASE -p $port2 | grep -v row
echo "select * from pg_subscription" | psql -qXd $PGDATABASE -p $port2 | grep -v row
echo
if [[ $num_instances -eq 5 ]]; then
echo "select * from pg_subscription" | psql -qXd $PGDATABASE -p $port3 | grep -v row
echo
echo "select * from pg_subscription" | psql -qXd $PGDATABASE -p $port4 | grep -v row
echo
echo "select * from pg_subscription" | psql -qXd $PGDATABASE -p $port5 | grep -v row
echo
fi
if [[ 1 -eq 1 ]]; then
stop instances
for n in `seq 1 $num_instances`
do
instance=instance$n
server_dir=$root_dir/$instance
data_dir=$server_dir/data
port=$(( $baseport + $n - 1 ))
logfile=$server_dir/logfile.$port
$pg_ctl stop -D $data_dir --mode=immediate --wait
done
fi
#
# delete everything
#
#if [[ 0 -eq 1 ]]
#then
# echo "rm -rf /tmp/cascade/$project/instance*"
# rm -rf /tmp/cascade/$project/instance*
#fi
erik_5_cascade_run.log
Description: Binary data
