I  ran a 5 cascaded setup of pub-subs on the latest patchset which starts
pgbench on the first server and waits till the data on the fifth server
matches the first.
This is based on a test script created by Erik Rijkers. The tests run fine
and the 5th server achieves data consistency in around a minute.
Attaching the test script and the test run log.

regards,
Ajin Cherian
Fujitsu Australia
#!/bin/bash

#  I compile postgres server versions into dir:
#      $HOME/pg_stuff/pg_installations/pgsql.$project   ( where project is a name )
#
#  This script assumes that projects HEAD and head0 are present
#      $HOME/pg_stuff/pg_installations/pgsql.HEAD   --> git master as of today - friday 12 febr 2021
#      $HOME/pg_stuff/pg_installations/pgsql.head0  --> 3063eb17593c  so that's 11 febr, before the replication changes
#
#  variables 'project' and 'BIN' reflect my set up (so should probably be changed)
#

#  my instance HEAD has the bug: it keeps looping with 'NOK' (=Not OK): primary not identical to all replicas
#  my instance head0 is ok - finishes in 20 s - with the replicas identical to primary
#
#  Erik Rijkers


unset PGPORT PGSERVICE
export PGDATABASE=postgres

#if   [[ "$1" == "HEAD"  ]] ; then project=HEAD   # new HEAD = 12 febr
#elif [[ "$1" == "head0" ]] ; then project=head0  # older: 3063eb17593c = 11 febr
#else
#     echo "arg1 missing -  "
#     exit
#fi


#root_dir=/tmp/cascade/$project
project=two_phase
root_dir=/home/ajin/twophasetest/mar09

mkdir -p $root_dir

#BIN=$HOME/pg_stuff/pg_installations/pgsql.$project/bin
BIN=/home/ajin/install-oss/bin

export PATH=$BIN:$PATH

  initdb=$BIN/initdb
postgres=$BIN/postgres
  pg_ctl=$BIN/pg_ctl

baseport=6525
   port1=$(( $baseport + 0 )) 
   port2=$(( $baseport + 1 ))
   port3=$(( $baseport + 2 ))
   port4=$(( $baseport + 3 ))
   port5=$(( $baseport + 4 ))
 appname=$project

num_instances=5   # or 2

        scale=1   #  pgbench -s
      #clients=64  #  pgbench -c
      clients=1  #  pgbench -c
     duration=1   #  how long to run: pgbench -T $duration 
         wait=1

if [[ -d $root_dir/instance1 ]]; then rm -rf $root_dir/instance1; fi
if [[ -d $root_dir/instance2 ]]; then rm -rf $root_dir/instance2; fi
if [[ -d $root_dir/instance3 ]]; then rm -rf $root_dir/instance3; fi
if [[ -d $root_dir/instance4 ]]; then rm -rf $root_dir/instance4; fi
if [[ -d $root_dir/instance5 ]]; then rm -rf $root_dir/instance5; fi
if [[ -d $root_dir/instance1 ]]; then exit ; fi
if [[ -d $root_dir/instance2 ]]; then exit ; fi
if [[ -d $root_dir/instance3 ]]; then exit ; fi
if [[ -d $root_dir/instance4 ]]; then exit ; fi
if [[ -d $root_dir/instance5 ]]; then exit ; fi

# devel_file=/tmp/bugs
# echo filterbug>$devel_file

for n in `seq 1 $num_instances`
do
  instance=instance$n
  server_dir=$root_dir/$instance
  data_dir=$server_dir/data
  port=$(( $baseport + $n -1 ))
  logfile=$server_dir/logfile.$port
# echo "-- $initdb --pgdata=$data_dir --encoding=UTF8 --pwfile=$devel_file  #  $port"
#          $initdb --pgdata=$data_dir --encoding=UTF8 --pwfile=$devel_file  &> /dev/null
  echo "-- $initdb --pgdata=$data_dir --encoding=UTF8                       #  $port"
           $initdb --pgdata=$data_dir --encoding=UTF8                       &> /dev/null

  ( $postgres  -D $data_dir -p $port \
    --wal_level=logical --logging_collector=on \
    --client_min_messages=warning \
    --log_directory=$server_dir --log_filename=logfile.${port} \
    --log_replication_commands=on & ) &> /dev/null
done 

# dbsize=$(echo " $scale * 100000 " | bc )

# sleep 1

echo "
  drop table if exists pgbench_accounts;
  drop table if exists pgbench_branches;
  drop table if exists pgbench_tellers;
  drop table if exists pgbench_history;" | psql -qXp $port1 \
&& echo "
  drop table if exists pgbench_accounts;
  drop table if exists pgbench_branches;
  drop table if exists pgbench_tellers;
  drop table if exists pgbench_history;" | psql -qXp $port2 \
&& echo "
  drop table if exists pgbench_accounts;
  drop table if exists pgbench_branches;
  drop table if exists pgbench_tellers;
  drop table if exists pgbench_history;" | psql -qXp $port3 \
&& echo "
  drop table if exists pgbench_accounts;
  drop table if exists pgbench_branches;
  drop table if exists pgbench_tellers;
  drop table if exists pgbench_history;" | psql -qXp $port4 \
&& pgbench -p $port1 -qis $scale \
&& echo "alter table pgbench_history add column hid serial primary key;" \
  | psql -q1Xp $port1 && pg_dump -F c -p $port1 \
     --exclude-table-data=pgbench_history  \
     --exclude-table-data=pgbench_accounts \
     --exclude-table-data=pgbench_branches \
     --exclude-table-data=pgbench_tellers  \
   -t pgbench_history -t pgbench_accounts \
   -t pgbench_branches -t pgbench_tellers \
  | pg_restore -1 -p $port2 -d postgres

if [[ $num_instances -eq 5 ]]
then
  pg_dump -F c -p $port1 \
     --exclude-table-data=pgbench_history  \
     --exclude-table-data=pgbench_accounts \
     --exclude-table-data=pgbench_branches \
     --exclude-table-data=pgbench_tellers  \
     -t pgbench_history -t pgbench_accounts \
     -t pgbench_branches -t pgbench_tellers \
  | pg_restore -1 -p $port3 -d postgres
  pg_dump -F c -p $port1 \
     --exclude-table-data=pgbench_history  \
     --exclude-table-data=pgbench_accounts \
     --exclude-table-data=pgbench_branches \
     --exclude-table-data=pgbench_tellers  \
     -t pgbench_history -t pgbench_accounts \
     -t pgbench_branches -t pgbench_tellers \
  | pg_restore -1 -p $port4 -d postgres
  pg_dump -F c -p $port1 \
     --exclude-table-data=pgbench_history  \
     --exclude-table-data=pgbench_accounts \
     --exclude-table-data=pgbench_branches \
     --exclude-table-data=pgbench_tellers  \
     -t pgbench_history -t pgbench_accounts \
     -t pgbench_branches -t pgbench_tellers \
  | pg_restore -1 -p $port5 -d postgres
fi
pub1=pub_${port1}_to_$port2 sub1=pub_${port2}_from_$port1
pub2=pub_${port2}_to_$port3 sub2=pub_${port3}_from_$port2
pub3=pub_${port3}_to_$port4 sub3=pub_${port4}_from_$port3
pub4=pub_${port4}_to_$port5 sub4=pub_${port5}_from_$port4
echo -ne "
create publication $pub1;
alter  publication $pub1 add table pgbench_accounts;
alter  publication $pub1 add table pgbench_branches;
alter  publication $pub1 add table pgbench_tellers;
alter  publication $pub1 add table pgbench_history;" | psql -p $port1 -aqtAX

if [[ $num_instances -eq 5 ]]; then
  echo -ne "
create publication $pub2;
alter  publication $pub2 add table pgbench_accounts;
alter  publication $pub2 add table pgbench_branches;
alter  publication $pub2 add table pgbench_tellers;
alter  publication $pub2 add table pgbench_history;" | psql -p $port2 -aqtAX
  echo -ne "
create publication $pub3;
alter  publication $pub3 add table pgbench_accounts;
alter  publication $pub3 add table pgbench_branches;
alter  publication $pub3 add table pgbench_tellers;
alter  publication $pub3 add table pgbench_history;" | psql -p $port3 -aqtAX
  echo -ne "
create publication $pub4;
alter  publication $pub4 add table pgbench_accounts;
alter  publication $pub4 add table pgbench_branches;
alter  publication $pub4 add table pgbench_tellers;
alter  publication $pub4 add table pgbench_history;" | psql -p $port4 -aqtAX
fi

if [[ $num_instances -eq 5 ]]; then
echo "
create subscription $sub2 connection 'port=$port2 application_name=$appname'
       publication  $pub2 with(enabled=false);
alter  subscription $sub2 enable;" | psql -p $port3 -aqtAX
echo "
create subscription $sub3 connection 'port=$port3 application_name=$appname'
       publication  $pub3 with(enabled=false);
alter  subscription $sub3 enable;" | psql -p $port4 -aqtAX
echo "
create subscription $sub4 connection 'port=$port4 application_name=$appname'
       publication  $pub4 with(enabled=false);
alter  subscription $sub4 enable;" | psql -p $port5 -aqtAX

fi

echo "
create subscription $sub1 connection 'port=$port1 application_name=$appname'
       publication  $pub1 with(enabled=false);
alter  subscription $sub1 enable;" | psql -p $port2 -aqtAX

echo "-- pgbench -p $port1 -c $clients -j 8 -T $duration -n postgres    #  scale $scale"
         pgbench -p $port1 -c $clients -j 8 -T $duration -n postgres    #  scale $scale
echo

echo "       accounts branches tellers  history"
echo "       -------- -------- -------- --------"

while [[ 1 -eq 1 ]]
do
  md5_6515=$port1  md5_6516=$port2  md5_6517=$port3 md5_6518=$port4 md5_6519=$port5

  sql_a="select * from pgbench_accounts        order by aid;"
  sql_b="select * from pgbench_branches        order by bid;"
  sql_t="select * from pgbench_tellers         order by tid;"
  sql_h="select * from pgbench_history         order by hid;"

  md5_a=$(echo "$sql_a"    | psql -qtAXp $port1 | md5sum | cut -b1-8)
  md5_b=$(echo "$sql_b"    | psql -qtAXp $port1 | md5sum | cut -b1-8)
  md5_t=$(echo "$sql_t"    | psql -qtAXp $port1 | md5sum | cut -b1-8)
  md5_h=$(echo "$sql_h"    | psql -qtAXp $port1 | md5sum | cut -b1-8)
  md5_6515=$(echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-8)
  echo -ne  "$port1   $md5_a $md5_b $md5_t $md5_h    $md5_6515 "
  echo

  md5_a=$(echo "$sql_a"    | psql -qtAXp $port2 | md5sum | cut -b1-8)
  md5_b=$(echo "$sql_b"    | psql -qtAXp $port2 | md5sum | cut -b1-8)
  md5_t=$(echo "$sql_t"    | psql -qtAXp $port2 | md5sum | cut -b1-8)
  md5_h=$(echo "$sql_h"    | psql -qtAXp $port2 | md5sum | cut -b1-8)
  md5_6516=$(echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-8)
  echo      "$port2   $md5_a $md5_b $md5_t $md5_h    $md5_6516 "

  if [[ $num_instances -eq 5 ]]; then
    md5_a=$(echo "$sql_a"    | psql -qtAXp $port3 | md5sum | cut -b1-8)
    md5_b=$(echo "$sql_b"    | psql -qtAXp $port3 | md5sum | cut -b1-8)
    md5_t=$(echo "$sql_t"    | psql -qtAXp $port3 | md5sum | cut -b1-8)
    md5_h=$(echo "$sql_h"    | psql -qtAXp $port3 | md5sum | cut -b1-8)
    md5_6517=$(echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-8)
    echo   "$port3   $md5_a $md5_b $md5_t $md5_h    $md5_6517 "
    md5_a=$(echo "$sql_a"    | psql -qtAXp $port4 | md5sum | cut -b1-8)
    md5_b=$(echo "$sql_b"    | psql -qtAXp $port4 | md5sum | cut -b1-8)
    md5_t=$(echo "$sql_t"    | psql -qtAXp $port4 | md5sum | cut -b1-8)
    md5_h=$(echo "$sql_h"    | psql -qtAXp $port4 | md5sum | cut -b1-8)
    md5_6518=$(echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-8)
    echo   "$port4   $md5_a $md5_b $md5_t $md5_h    $md5_6518 "
    md5_a=$(echo "$sql_a"    | psql -qtAXp $port5 | md5sum | cut -b1-8)
    md5_b=$(echo "$sql_b"    | psql -qtAXp $port5 | md5sum | cut -b1-8)
    md5_t=$(echo "$sql_t"    | psql -qtAXp $port5 | md5sum | cut -b1-8)
    md5_h=$(echo "$sql_h"    | psql -qtAXp $port5 | md5sum | cut -b1-8)
    md5_6519=$(echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-8)
    echo -ne  "$port5   $md5_a $md5_b $md5_t $md5_h    $md5_6519 "
  else
    echo
  fi

  if [[ "$md5_6515" == "$md5_6516" ]]
  then
    if [[ $num_instances -eq 2 ]]; then
      echo "   ok (2)"
      break
    elif [[ $num_instances -eq 5 ]]; then
      if [[ "$md5_6515" == "$md5_6519" ]]
      then
        echo "   ok"
        break
      fi
    fi
  fi

  # not ready replicating yet, so:  report, wait, and loop

  echo -ne "  NOK (${wait}s)                         [$project]   scale [$scale]"  
  echo
  echo
  sleep $wait
done
echo



echo "select * from pg_publication"  | psql -qXd $PGDATABASE -p $port1 | grep -v row
echo "select * from pg_subscription" | psql -qXd $PGDATABASE -p $port1 | grep -v row
echo
echo "select * from pg_publication"  | psql -qXd $PGDATABASE -p $port2 | grep -v row
echo "select * from pg_subscription" | psql -qXd $PGDATABASE -p $port2 | grep -v row
echo

if [[ $num_instances -eq 5 ]]; then
  echo "select * from pg_subscription" | psql -qXd $PGDATABASE -p $port3 | grep -v row
  echo
  echo "select * from pg_subscription" | psql -qXd $PGDATABASE -p $port4 | grep -v row
  echo
  echo "select * from pg_subscription" | psql -qXd $PGDATABASE -p $port5 | grep -v row
  echo
fi

if [[ 1 -eq 1 ]]; then


  stop instances

for n in `seq 1 $num_instances`
do
  instance=instance$n
  server_dir=$root_dir/$instance
  data_dir=$server_dir/data
  port=$(( $baseport + $n - 1 ))
  logfile=$server_dir/logfile.$port
  $pg_ctl stop -D $data_dir --mode=immediate --wait
done

fi

#
#  delete everything
#
#if [[ 0 -eq 1 ]]
#then
#   echo "rm -rf /tmp/cascade/$project/instance*"
#         rm -rf /tmp/cascade/$project/instance*
#fi


Attachment: erik_5_cascade_run.log
Description: Binary data

Reply via email to