On Wed, Dec 30, 2020 at 5:08 PM Peter Smith <smithpb2...@gmail.com> wrote:
> > PSA my v9 WIP patch for the Solution1 which addresses some recent > review comments, and other minor changes. I did some tests using the test suite prepared by Erik Rijkers in [1] during the initial design of tablesync. Back then, they had seen some errors while doing multiple commits in initial tablesync. So I've rerun the test script on the v9 patch applied on HEAD and found no errors. The script runs pgbench, creates a pub/sub on a standby server, and all of the pgbench tables are replicated to the standby. The contents of the tables are compared at the end of each run to make sure they are identical. I have run it for around 12 hours, and it worked without any errors. Attaching the script I used. regards, Ajin Cherian Fujitsu Australia [1]- https://www.postgresql.org/message-id/93d02794068482f96d31b002e0eb248d%40xs4all.nl
#!/bin/bash # assume both instances are running, on port 6972 (master) and 6973 (replica) port1=6972 port2=6973 logfile1=$HOME/dataoss/logfile logfile2=$HOME/dataoss2/logfile # clear logs echo > $logfile1 echo > $logfile2 logfile_ts=$( date +"%Y%m%d_%H_%M_%S" ) scale=1; if [[ ! "$1" == "" ]]; then scale=$1; fi clients=1; if [[ ! "$2" == "" ]]; then clients=$2; fi #INIT_WAIT=0; if [[ ! "$3" == "" ]]; then INIT_WAIT=$3; fi duration=60; if [[ ! "$3" == "" ]]; then duration=$3; fi date_str=$logfile_ts; if [[ ! "$4" == "" ]]; then date_str=$4; fi CLEAN_ONLY=''; if [[ ! "$5" == "" ]]; then CLEAN_ONLY=$5; fi threads=8 # threads=1 master_version=$( psql postgres -qtAXc "show server_version" -p $port1 ) replica_version=$( psql postgres -qtAXc "show server_version" -p $port2 ) master_commit_hash=$( psql postgres -qtAXc "show server_version" -p $port1 | cut -d _ -f 6 ) replica_commit_hash=$( psql postgres -qtAXc "show server_version" -p $port2 | cut -d _ -f 6 ) master_start_time=$( psql postgres -qtAXc "select pg_postmaster_start_time()" -p $port1 ) replica_start_time=$( psql postgres -qtAXc "select pg_postmaster_start_time()" -p $port2 ) master_patch_md5=$(echo "select md5(comments) from information_schema.sql_features where feature_name = 'patch md5'"|psql postgres -qtAXp $port1) replica_patch_md5=$(echo "select md5(comments) from information_schema.sql_features where feature_name = 'patch md5'"|psql postgres -qtAXp $port2) master_s_c=$( psql postgres -qtAXc "show synchronous_commit" -p $port1 ) replica_s_c=$( psql postgres -qtAXc "show synchronous_commit" -p $port2 ) master_assert=$( psql postgres -qtAXc "show debug_assertions" -p $port1 ) replica_assert=$( psql postgres -qtAXc "show debug_assertions" -p $port2 ) echo "============================================================================" echo "-- scale $scale clients $clients duration $duration CLEAN_ONLY=$CLEAN_ONLY" echo "============================================================================" echo "-- hostname: "$( hostname -s ) echo "-- timestamp: $date_str" #echo -n "-- "; ps -ef f | grep 6972 | grep -Evw 'grep|xterm|screen|SCREEN' | less -iS #echo -n "-- "; ps -ef f | grep 6973 | grep -Evw 'grep|xterm|screen|SCREEN' | less -iS echo "-- master_start_time $master_start_time replica_start_time $replica_start_time" if [[ "$master_patch_md5" == "$replica_patch_md5" ]]; then echo "-- master patch-md5 [$master_patch_md5]" echo "-- replica patch-md5 [$replica_patch_md5] (ok)" else echo "-- master patch-md5 [$master_patch_md5] - replica patch-md5 NOT the same, bailing out" echo "-- replica patch-md5 [$replica_patch_md5] - replica patch-md5 NOT the same, bailing out" exit -1 fi echo "-- synchronous_commit, master [$master_s_c] replica [$replica_s_c]" echo "-- master_assert [$master_assert] replica_assert [$replica_assert]" echo "-- self md5 "$( md5sum $0 ) unset PGSERVICEFILE PGSERVICE # PGPORT PGDATA PGHOST export PGDATABASE=postgres pgdata_master=$HOME/dataoss pgdata_replica=$HOME/dataoss2 function cb() { # display the 4 pgbench tables' accumulated content as md5s # a,b,t,h stand for: pgbench_accounts, -branches, -tellers, -history md5_total_6972='-1' md5_total_6973='-2' num_tables=$( echo "select count(*) from pg_class where relkind = 'r' and relname ~ '^pgbench_'" | psql postgres -qtAXp 6972 ) if [[ $num_tables -ne 4 ]] then echo "pgbench tables not 4 - exit" >> $outf exit fi for port in $port1 $port2 do md5_a=$(echo "select * from pgbench_accounts order by aid"|psql postgres -qtAXp$port|md5sum|cut -b 1-9) md5_b=$(echo "select * from pgbench_branches order by bid"|psql postgres -qtAXp$port|md5sum|cut -b 1-9) md5_t=$(echo "select * from pgbench_tellers order by tid"|psql postgres -qtAXp$port|md5sum|cut -b 1-9) md5_h=$(echo "select * from pgbench_history order by hid"|psql postgres -qtAXp$port|md5sum|cut -b 1-9) cnt_a=$(echo "select count(*) from pgbench_accounts"|psql postgres -qtAXp $port) cnt_b=$(echo "select count(*) from pgbench_branches"|psql postgres -qtAXp $port) cnt_t=$(echo "select count(*) from pgbench_tellers" |psql postgres -qtAXp $port) cnt_h=$(echo "select count(*) from pgbench_history" |psql postgres -qtAXp $port) md5_total[$port]=$( echo "${md5_a} ${md5_b} ${md5_t} ${md5_h}" | md5sum ) printf "$port a,b,t,h: %8d %6d %6d %6d" $cnt_a $cnt_b $cnt_t $cnt_h echo -n " $md5_a $md5_b $md5_t $md5_h" if [[ $port -eq $port1 ]]; then echo " master" elif [[ $port -eq $port2 ]]; then echo -n " replica" else echo " ERROR " fi done if [[ "${md5_total[$port1]}" == "${md5_total[$port2]}" ]] then echo " ok" else echo " NOK" fi } # function cb_new() # { # # display the 4 pgbench tables' accumulated content as md5s # # a,b,t,h stand for: pgbench_accounts, -branches, -tellers, -history # md5_total_6972='-1' # md5_total_6973='-2' # num_tables=$( echo "select count(*) from pg_class where relkind = 'r' and relname ~ '^pgbench_'" | psql -qtAX ) # if [[ $num_tables -ne 4 ]] # then # echo "pgbench tables not 4 - exit" >> $outf # exit # fi # for port in $port1 $port2 # do # arr=$( echo " select count(*) from pgbench_accounts # union all select count(*) from pgbench_branches # union all select count(*) from pgbench_tellers # union all select count(*) from pgbench_history " | psql -qtAXp $port ) # set -- $arr # cnt_a=$1 # cnt_b=$2 # cnt_t=$3 # cnt_h=$4 # # md5_a=$(echo "select * from pgbench_accounts order by aid"|psql -qtAXp$port|md5sum|cut -b 1-9) # # md5_a=$(echo "select sum(aid),sum(bid),sum(abalance),sum(length(filler)) from pgbench_accounts"|psql -qtAXp$port|md5sum|cut -b 1-9) # arr=$(echo "select array_to_string( array ( # (select md5(cast(sum(aid)+sum(bid)+sum(abalance)+coalesce(sum(length(filler)),0) as text)) from pgbench_accounts a) # union all (select case when $cnt_b = 0 then md5('') else md5(cast( sum(bid)+sum(bbalance)+coalesce(sum(length(filler)),0) as text)) end from pgbench_branches b) # union all (select case when $cnt_t = 0 then md5('') else md5(cast( sum(tid)+sum(bid)+sum(tbalance)+coalesce(sum(length(filler)),0) as text)) end from pgbench_tellers t) # union all (select case when $cnt_h = 0 then md5('') else md5(cast(( sum(tid) + sum(bid) + sum(aid) + sum(delta) + coalesce( cast(sum(extract(epoch from mtime)) as bigint), 0) # + coalesce(sum(length(filler)), 0) + sum(hid) ) as text)) end from pgbench_history h) # ), ' ')"|psql -qtAXp $port ) # set -- $arr # md5_a=$1 md5_b=$2 md5_t=$3 md5_h=$4 # # md5_a='' md5_b=$1 md5_t=$2 md5_h=$3 # # echo -ne "md5_b=[$md5_b]\nmd5_t=[$md5_t]\nmd5_h=[$md5_h]" # # # md5_a=$(echo "select sum(aid),sum(bid),sum(abalance),sum(length(filler)) from pgbench_accounts"|psql -qtAXp$port|md5sum|cut -b 1-9) # # md5_b=$(echo "select * from pgbench_branches order by bid"|psql -qtAXp$port|md5sum|cut -b 1-9) # # md5_t=$(echo "select * from pgbench_tellers order by tid"|psql -qtAXp$port|md5sum|cut -b 1-9) # ## md5_h=$(echo "select * from pgbench_history order by hid"|psql -qtAXp$port|md5sum|cut -b 1-9) # # md5_h=$(echo "select sum(tid),sum(bid),sum(aid),sum(delta),cast(sum(extract(epoch from mtime)) as bigint),sum(length(filler)), sum(hid) from pgbench_history"|psql -qtAXp$port|md5sum|cut -b 1-9) # # cnt_a='' #$(echo "select count(*) from pgbench_accounts"|psql -qtAXp $port) # # cnt_b=$(echo "select count(*) from pgbench_branches"|psql -qtAXp $port) # # cnt_t=$(echo "select count(*) from pgbench_tellers" |psql -qtAXp $port) # # cnt_h=$(echo "select count(*) from pgbench_history" |psql -qtAXp $port) # md5_total[$port]=$( echo "${md5_a} ${md5_b} ${md5_t} ${md5_h}" | md5sum ) # printf "$port a,b,t,h: %8d %6d %6d %6d" ${cnt_a} ${cnt_b} ${cnt_t} ${cnt_h} # # printf "$port a,b,t,h: ? %6d %6d %6d" ${cnt_b} ${cnt_t} ${cnt_h} # echo -n " ${md5_a:1:9} ${md5_b:1:9} ${md5_t:1:9} ${md5_h:1:9}" # # echo -n " ${md5_b:1:9} ${md5_t:1:9} ${md5_h:1:9}" # if [[ $port -eq $port1 ]]; then echo " master" # elif [[ $port -eq $port2 ]]; then echo -n " replica" # else echo " ERROR " # fi # done # if [[ "${md5_total[$port1]}" == "${md5_total[$port2]}" ]] # then # echo " ok" # else # echo " NOK" # fi # } function clean_pubsub() { if [[ 1 -eq 1 ]] then echo "$1" sub_count=$( echo "select count(*) from pg_subscription" | psql postgres -qtAXp $port2 ) if [[ $sub_count -ne 0 ]] then echo "sub_count -ne 0 : deleting sub1 (plain)" echo "drop subscription if exists sub1" | psql postgres -qXp $port2 fi sub_repl_slot_count=$( echo "select count(*) from pg_replication_slots" | psql postgres -qtAXp $port2 ) if [[ $sub_repl_slot_count -ne 0 ]] then echo "sub_repl_slot_count -ne 0 - deleting" echo "select pg_drop_replication_slot('sub1')" | psql postgres -Xp $port1 fi pub_count=$( echo "select count(*) from pg_publication" | psql postgres -qtAXp $port1 ) if [[ $pub_count -ne 0 ]] then echo "pub_count -ne 0 - deleting pub1" echo "drop publication if exists pub1" | psql postgres -qXp $port1 fi pub_repl_slot_count=$( echo "select count(*) from pg_replication_slots" | psql postgres -qtAXp $port1 ) if [[ $pub_repl_slot_count -ne 0 ]] then echo "pub_repl_slot_count -ne 0 - deleting (sub1)" echo "select pg_drop_replication_slot('sub1')" | psql postgres -qXp $port1 fi pub_count=$( echo "select count(*) from pg_publication" | psql postgres -qtAXp $port1 ) pub_repl_slot_count=$( echo "select count(*) from pg_replication_slots" | psql postgres -qtAXp $port1 ) sub_count=$( echo "select count(*) from pg_subscription" | psql postgres -qtAXp $port2 ) sub_repl_slot_count=$( echo "select count(*) from pg_replication_slots" | psql postgres -qtAXp $port2 ) if [[ $pub_count -ne 0 ]] \ || [[ $sub_count -ne 0 ]] \ || [[ $pub_repl_slot_count -ne 0 ]] \ || [[ $sub_repl_slot_count -ne 0 ]] \ ; then echo " pub_count $pub_count pub_repl_slot_count $pub_repl_slot_count sub_count $sub_count sub_repl_slot_count $sub_repl_slot_count " uncleanf=unclean.$date_str.txt.bz2 echo "-- imperfect cleanup, pg_waldump to $uncleanf, waiting 60 s, then exit" for w in $( ls -1 $pgdata_master/pg_wal/0* ) do echo "--" echo "-- file: $w" echo "--" pg_waldump $w done 2>&1 | bzip2 > $uncleanf sleep 60 exit 1 fi fi } # pg_subscription_rel, pg_replication_origin_status on subscriber and # pg_replication_slots on publisher at the end of the failed run wal_info_on_fail() { now_str=$(date +"%H%M") walfile1=$( echo "select pg_walfile_name(lsn) from (values (pg_current_wal_location())) as f(lsn)" | psql postgres -qtAXp $port1 ) walfile2=$( echo "select pg_walfile_name(lsn) from (values (pg_current_wal_location())) as f(lsn)" | psql ipostgres -qtAXp $port2 ) for x in \ "$pgdata_master/pg_wal/${walfile1} 1" \ "$pgdata_replica/pg_wal/${walfile2} 2" \ ; do echo "-- parsing [$x]" set -- $x walf=$1 nr=$2 bz2f=waldump.${date_str}_${now_str}.$nr.$unchanged.$walfile1.txt.bz2 echo "-- walf [$walf]" echo "-- bz2f [$bz2f]" lines=$( pg_waldump $walf 2>&1 | wc -l ) echo "-- lines $lines" echo "-- (before) pg_waldump $walf" if [[ $lines -gt 0 ]] then echo -en "-- file: $walf\n-- $lines lines\n" echo -en "-- pg_waldump $walf 2>&1 | bzip2 > $bz2f\n" pg_waldump $walf 2>&1 | bzip2 > "$bz2f" rc=$? #echo -en "--\n-- the above was: $walf\n--\n" else echo "-- walfile_name [$walf]" echo "-- $lines lines" fi done } table_info_on_fail() { # And finally if you could dump the contents of pg_subscription_rel, # pg_replication_origin_status on subscriber and pg_replication_slots on # publisher at the end of the failed run that would also help. echo "table pg_subscription_rel; table pg_replication_origin_status;" | psql postgres -aqX -p $port2 # on subscriber and echo "table pg_replication_slots;" | psql postgres -aqX -p $port1 # ... on publisher } # invoke the function: clean_pubsub "clean-at-start-call" if [[ ! "$CLEAN_ONLY" == "" ]] then exit 0 fi # logfile_ts=$( date +"%Y%m%d_%H%M_%s%N" ) echo "drop table if exists pgbench_accounts; drop table if exists pgbench_branches; drop table if exists pgbench_tellers; drop table if exists pgbench_history;" | psql postgres -qXp $port1 \ && echo "drop table if exists pgbench_accounts; drop table if exists pgbench_branches; drop table if exists pgbench_tellers; drop table if exists pgbench_history;" | psql postgres -qXp $port2 \ && pgbench -p $port1 -qis ${scale//_/} && echo " alter table pgbench_history add column hid serial primary key; -- alter table pgbench_history replica identity full; -- delete from pgbench_accounts where aid > 40; " | psql postgres -q1Xp $port1 \ && pg_dump postgres -F c -p $port1 \ -t pgbench_history \ -t pgbench_accounts \ -t pgbench_branches \ -t pgbench_tellers \ | pg_restore -1 -p $port2 -d postgres # echo "-- (no diffs expected... )" # echo "$(cb)" # cb_text0=$(cb) # empty the tables on replica: # echo " delete from pgbench_accounts; delete from pgbench_branches; delete from pgbench_tellers ; delete from pgbench_history ; " | psql -q -X -p $port2 echo "truncate pgbench_accounts,pgbench_branches,pgbench_tellers,pgbench_history ; " | psql postgres -qXp $port2 # echo "-- (pre-replication, diffs *are* expected... )" # echo "$(cb)" echo "create publication pub1 for all tables;" | psql postgres -p $port1 -aqtAX echo "create subscription sub1 connection 'host=localhost port=${port1} dbname=postgres' publication pub1 with (enabled = false); alter subscription sub1 enable; " | psql postgres -p $port2 -aqtAX ## if [[ ${INIT_WAIT//_/} -gt 0 ]] ## then ## #echo "select 'ok' as ok, now() as ts, pid, state, usename, backend_start backend_xmin, state, sent_location application_name from pg_stat_replication" | psql -qXp $port1 ## echo "-- wait ${INIT_WAIT//_/}s" ## sleep ${INIT_WAIT//_/} ## fi # # state1=$(echo "select string_agg(state,',') states from pg_stat_replication" | psql -qtAXp $port1 ) # startup_count=$(echo "select count(*) from pg_stat_replication where state = 'startup'" | psql -qtAXp $port1 ) # # #while [[ ! "$state1" == "streaming" ]] # while [[ "$startup_count" -gt "0" ]] # do # echo "-- delay" # echo "select * from pg_stat_replication" | psql -qXp $port1 # echo "-- delay, master not yet in 'streaming' state [$state1]" # sleep 1 # startup_count=$(echo "select count(*) from pg_stat_replication where state = 'startup'" | psql -qtAXp $port1 ) # #state1=$(echo "select state from pg_stat_replication" | psql -qtAXp $port1 ) # done # echo "-- state ok" # echo "select 'ok' as ok, now() as ts, pid, state, usename, backend_start backend_xmin, state, sent_location application_name from pg_stat_replication" | psql -qXp $port1 RUN_PGBENCH=1 if [[ $RUN_PGBENCH -eq 1 ]] then pseconds=$( echo "$duration / 5" | bc ) echo "-- pgbench -p 6972 -c $clients -j $threads -T $duration -P $pseconds -n -- scale $scale postgres" pgbench -p 6972 -c $clients -j $threads -T $duration -P $pseconds -n # scale $scale postgres else echo "-- not running pgbench..." fi # md5_a_primary=$(echo "select * from pgbench_accounts order by aid"|psql -qtAXp $port1|md5sum|cut -b 1-9) # md5_b_primary=$(echo "select * from pgbench_branches order by bid"|psql -qtAXp $port1|md5sum|cut -b 1-9) # md5_t_primary=$(echo "select * from pgbench_tellers order by tid"|psql -qtAXp $port1|md5sum|cut -b 1-9) # md5_h_primary=$(echo "select * from pgbench_history order by hid"|psql -qtAXp $port1|md5sum|cut -b 1-9) # cnt_a_primary=$(echo "select count(*) from pgbench_accounts" |psql -qtAXp $port1) # cnt_b_primary=$(echo "select count(*) from pgbench_branches" |psql -qtAXp $port1) # cnt_t_primary=$(echo "select count(*) from pgbench_tellers" |psql -qtAXp $port1) # cnt_h_primary=$(echo "select count(*) from pgbench_history" |psql -qtAXp $port1) # waiting1=$(( ${scale//_/} * 4 )) # if [[ $waiting1 -gt 60 ]] # then # waiting1=60 # fi # echo "-- waiting ${waiting1}s... (scale * 4, or 60)" # sleep $waiting1 waiting1=0 echo "-- waiting ${waiting1}s... (always)" sleep $waiting1 wait_total=$waiting1 date +"%Y.%m.%d %H:%M:%S" echo "-- getting md5 (cb)" cb_text1=$(cb) cb_text1_md5=$( echo "$cb_text1" | md5sum ) echo "${cb_text1} ${cb_text1_md5:1:9}" date +"%Y.%m.%d %H:%M:%S" loop_counter=0 wait_chunk=5 wait_max=3600 unchanged=0 while [[ 1 -eq 1 ]]; do if echo "$cb_text1" | grep -qw 'replica ok'; then echo "-- All is well." echo "-- ${wait_total} seconds total. scale $scale clients $clients -T $duration" break fi if [[ $unchanged -eq 5 ]] then #now_str=$(date +"%M%S") #unfinishedf1=unfinished.${date_str}_${now_str}.1.$unchanged.txt.bz2 #unfinishedf2=unfinished.${date_str}_${now_str}.2.$unchanged.txt.bz2 #echo -ne "--\n-- pg_waldump to $unfinishedf1 (and replica)...\n--\n" #for w in $(ls -1 $pgdata_master/pg_wal/0* ); do echo -ne "--\n-- file: ${w}\n--\n"; pg_waldump $w; done 2>&1 | bzip2 > $unfinishedf1 #for w in $(ls -1 $pgdata_replica/pg_wal/0* ); do echo -ne "--\n-- file: ${w}\n--\n"; pg_waldump $w; done 2>&1 | bzip2 > $unfinishedf2 table_info_on_fail wal_info_on_fail fi waited_already=$(( $loop_counter * $wait_chunk )) if [[ $waited_already -gt $wait_max ]] then echo "-- Not good, but breaking out of wait (waited more than ${wait_max} s)" info_on_fail wal_info_on_fail echo "-- (wait_total ${wait_total} s)" break fi echo "-- wait another ${wait_chunk} s (total ${wait_total} s) (unchanged $unchanged)" sleep $wait_chunk; wait_total=$(( $wait_total + $wait_chunk )) echo "-- getting md5 (cb)" cb_text1=$(cb) cb_text1_md5_new=$( echo "$cb_text1" | md5sum ) if [[ "$cb_text1_md5_new" == "$cb_text1_md5" ]] then unchanged=$(( $unchanged + 1 )) else unchanged=0 fi echo "${cb_text1} ${cb_text1_md5_new:1:9}" if [[ $unchanged -gt 20 ]] then echo "-- Not good, but breaking out of wait ($unchanged times no change)" table_info_on_fail wal_info_on_fail echo "-- (wait_total ${wait_total} s)" break fi cb_text1_md5=$cb_text1_md5_new loop_counter=$(( loop_counter + 1 )) done # wait 20s, then invoke the cleanup function: echo "-- waiting 20s, then end-cleaning" sleep 20 clean_pubsub "clean-at-end-call" # dest_dir='.' dest_dir='logfiles' _time_=$( date +"%H%M") if [[ ! -d logfiles ]] then mkdir logfiles fi if echo "${cb_text1}" | grep -qw 'replica ok' then cp $logfile1 ${dest_dir}/logrep.$date_str.1.${_time_}.scale_${scale}.clients_$clients.ok.log cp $logfile2 ${dest_dir}/logrep.$date_str.2.${_time_}.scale_${scale}.clients_$clients.ok.log else cp $logfile1 ${dest_dir}/logrep.$date_str.1.${_time_}.scale_${scale}.clients_$clients.NOK.log cp $logfile2 ${dest_dir}/logrep.$date_str.2.${_time_}.scale_${scale}.clients_$clients.NOK.log fi