On 2018-11-01 01:29, Euler Taveira wrote:
Em qua, 28 de fev de 2018 às 20:03, Euler Taveira
<eu...@timbira.com.br> escreveu:
The attached patches add support for filtering rows in the publisher.


I ran pgbench-over-logical-replication with a WHERE-clause and could not get this to do a correct replication. Below is the output of the attached test program.


$ ./logrep_rowfilter.sh
--
/home/aardvark/pg_stuff/pg_installations/pgsql.logrep_rowfilter/bin.fast/initdb --pgdata=/tmp/cascade/instance1/data --encoding=UTF8 --pwfile=/tmp/bugs
--
/home/aardvark/pg_stuff/pg_installations/pgsql.logrep_rowfilter/bin.fast/initdb --pgdata=/tmp/cascade/instance2/data --encoding=UTF8 --pwfile=/tmp/bugs
--
/home/aardvark/pg_stuff/pg_installations/pgsql.logrep_rowfilter/bin.fast/initdb --pgdata=/tmp/cascade/instance3/data --encoding=UTF8 --pwfile=/tmp/bugs
sleep 3s
dropping old tables...
creating tables...
generating data...
100000 of 100000 tuples (100%) done (elapsed 0.09 s, remaining 0.00 s)
vacuuming...
creating primary keys...
done.
create publication pub_6515_to_6516;
alter publication pub_6515_to_6516 add table pgbench_accounts where (aid between 40000 and 60000-1) ; --> where 1
alter publication pub_6515_to_6516 add table pgbench_branches;
alter publication pub_6515_to_6516 add table pgbench_tellers;
alter publication pub_6515_to_6516 add table pgbench_history;
create publication pub_6516_to_6517;
alter publication pub_6516_to_6517 add table pgbench_accounts ; -- where (aid between 40000 and 60000-1) ; --> where 2
alter publication pub_6516_to_6517 add table pgbench_branches;
alter publication pub_6516_to_6517 add table pgbench_tellers;
alter publication pub_6516_to_6517 add table pgbench_history;

create subscription pub_6516_from_6515 connection 'port=6515 application_name=rowfilter'
       publication pub_6515_to_6516 with(enabled=false);
alter subscription pub_6516_from_6515 enable;
create subscription pub_6517_from_6516 connection 'port=6516 application_name=rowfilter'
       publication pub_6516_to_6517 with(enabled=false);
alter subscription pub_6517_from_6516 enable;
-- pgbench -p 6515 -c 16 -j 8 -T 5 -n postgres    #  scale 1
transaction type: <builtin: TPC-B (sort of)>
scaling factor: 1
query mode: simple
number of clients: 16
number of threads: 8
duration: 5 s
number of transactions actually processed: 80
latency average = 1178.106 ms
tps = 13.581120 (including connections establishing)
tps = 13.597443 (excluding connections establishing)

       accounts  branches   tellers   history
       --------- --------- --------- ---------
6515   6546b1f0f 2d328ed28 7406473b0 7c1351523    e8c07347b
6516   6546b1f0f 2d328ed28 d41d8cd98 d41d8cd98    e7235f541
6517   f7c0791c8 d9c63e471 d41d8cd98 d41d8cd98    30892eea1   NOK

6515   6546b1f0f 2d328ed28 7406473b0 7c1351523    e8c07347b
6516   6546b1f0f 2d328ed28 7406473b0 5a54cf7c5    191ae1af3
6517   6546b1f0f 2d328ed28 7406473b0 5a54cf7c5    191ae1af3   NOK

6515   6546b1f0f 2d328ed28 7406473b0 7c1351523    e8c07347b
6516   6546b1f0f 2d328ed28 7406473b0 5a54cf7c5    191ae1af3
6517   6546b1f0f 2d328ed28 7406473b0 5a54cf7c5    191ae1af3   NOK

[...]

I let that run for 10 minutes or so but that pgbench_history table md5-values (of ports 6516 and 6517) do not change anymore, which shows that it is and remains different from the original pgbench_history table in 6515.


When there is a where-clause this goes *always* wrong.

Without a where-clause all logical replication tests were OK. Perhaps the error is not in our patch but something in logical replication.

Attached is the test program (will need some tweaking of PATHs, PG-variables (PGPASSFILE) etc). This is the same program I used in march when you first posted a version of this patch alhough the error is different.


thanks,


Erik Rijkers





#!/bin/bash

# postgres binary compiled with 
#
# 20181101
#   0001-Remove-unused-atttypmod-column-from-initial-table-sy.patch
#   0002-Store-number-of-tuples-in-WalRcvExecResult.patch
#   0003-Refactor-function-create_estate_for_relation.patch
#   0004-Rename-a-WHERE-node.patch
#   0005-Row-filtering-for-logical-replication.patch
#   0006-Print-publication-WHERE-condition-in-psql.patch
#   0007-Publication-where-condition-support-for-pg_dump.patch
#   0008-Debug-for-row-filtering.patch
#

unset PGDATABASE PGPORT PGSERVICE
export PGDATABASE=postgres

scale=1

root_dir=/tmp/cascade

BIN=$HOME/pg_stuff/pg_installations/pgsql.logrep_rowfilter/bin.fast

export PATH=$BIN:$PATH

  initdb=$BIN/initdb
postgres=$BIN/postgres
  pg_ctl=$BIN/pg_ctl
baseport=6515
 appname=rowfilter

if [[ -d $root_dir/instance1 ]]; then rm -rf $root_dir/instance1; fi
if [[ -d $root_dir/instance2 ]]; then rm -rf $root_dir/instance2; fi
if [[ -d $root_dir/instance3 ]]; then rm -rf $root_dir/instance3; fi
if [[ -d $root_dir/instance1 ]]; then exit ; fi
if [[ -d $root_dir/instance2 ]]; then exit ; fi
if [[ -d $root_dir/instance3 ]]; then exit ; fi

devel_file=/tmp/bugs
echo filterbug>$devel_file

num_instances=3

for n in `seq 1 $num_instances`
do
  instance=instance$n
  server_dir=$root_dir/$instance
  data_dir=$server_dir/data
  port=$(( 6515 + $n -1 ))
  logfile=$server_dir/logfile.$port
  echo "-- $initdb --pgdata=$data_dir --encoding=UTF8 --pwfile=$devel_file "
           $initdb --pgdata=$data_dir --encoding=UTF8 --pwfile=$devel_file  &> /dev/null
  ( $postgres  -D $data_dir -p $port \
    --wal_level=logical --logging_collector=on \
    --client_min_messages=warning \
    --log_directory=$server_dir --log_filename=logfile.${port} \
    --log_replication_commands=on & ) &> /dev/null
done 

echo "sleep 3s"
sleep 3

echo "
  drop table if exists pgbench_accounts;
  drop table if exists pgbench_branches;
  drop table if exists pgbench_tellers;
  drop table if exists pgbench_history;" | psql -qXp 6515 \
&& echo "
  drop table if exists pgbench_accounts;
  drop table if exists pgbench_branches;
  drop table if exists pgbench_tellers;
  drop table if exists pgbench_history;" | psql -qXp 6516 \
&& pgbench -p 6515 -qis $scale \
&& echo "alter table pgbench_history add column hid serial primary key;" \
  | psql -q1Xp 6515 && pg_dump -F c -p 6515 \
     --exclude-table-data=pgbench_history  \
     --exclude-table-data=pgbench_accounts \
     --exclude-table-data=pgbench_branches \
     --exclude-table-data=pgbench_tellers  \
   -t pgbench_history -t pgbench_accounts \
   -t pgbench_branches -t pgbench_tellers \
  | pg_restore -1 -p 6516 -d postgres

   where="where (aid between 40000 and 60000-1)"
  where2="where (aid between 40000 and 60000-1)"

  outcomment='; --'

if [[ $num_instances -eq 3 ]]; then

  pg_dump -F c -p 6515 \
     --exclude-table-data=pgbench_history  \
     --exclude-table-data=pgbench_accounts \
     --exclude-table-data=pgbench_branches \
     --exclude-table-data=pgbench_tellers  \
     -t pgbench_history -t pgbench_accounts \
     -t pgbench_branches -t pgbench_tellers \
  | pg_restore -1 -p 6517 -d postgres
fi
pub1=pub_6515_to_6516 sub1=pub_6516_from_6515
pub2=pub_6516_to_6517 sub2=pub_6517_from_6516
echo "
create publication $pub1;
alter publication $pub1 add table pgbench_accounts $where ; --> where 1
alter publication $pub1 add table pgbench_branches;
alter publication $pub1 add table pgbench_tellers;
alter publication $pub1 add table pgbench_history;
" | psql -p 6515 -aqtAX


if [[ $num_instances -eq 3 ]]; then
  echo "
create publication $pub2;
alter publication $pub2 add table pgbench_accounts $outcomment $where2 ; --> where 2
alter publication $pub2 add table pgbench_branches;
alter publication $pub2 add table pgbench_tellers;
alter publication $pub2 add table pgbench_history;
  " | psql -p 6516 -aqtAX
fi

echo "
create subscription $sub1 connection 'port=6515 application_name=$appname'
       publication $pub1 with(enabled=false);
alter subscription $sub1 enable;" | psql -p 6516 -aqtAX

if [[ $num_instances -eq 3 ]]; then
echo "
create subscription $sub2 connection 'port=6516 application_name=$appname'
       publication $pub2 with(enabled=false);
alter subscription $sub2 enable;" | psql -p 6517 -aqtAX

fi

echo "-- pgbench -p 6515 -c 16 -j 8 -T 5 -n postgres    #  scale $scale"
         pgbench -p 6515 -c 16 -j 8 -T 5 -n postgres    #  scale $scale
echo

echo "       accounts  branches   tellers   history"
echo "       --------- --------- --------- ---------"

while [[ 1 -eq 1 ]]
do
  md5_6515=6515 md5_6516=6516 md5_6517=6517

  md5_a=$(echo "select * from pgbench_accounts $where order by aid" | psql -qtAXp 6515 | md5sum | cut -b1-9)
  md5_b=$(echo "select * from pgbench_branches order by bid" | psql -qtAXp 6515 | md5sum | cut -b1-9)
  md5_t=$(echo "select * from pgbench_tellers  order by tid" | psql -qtAXp 6515 | md5sum | cut -b1-9)
  md5_h=$(echo "select * from pgbench_history  order by hid" | psql -qtAXp 6515 | md5sum | cut -b1-9)
  md5_6515=$( echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-9)
  echo      "6515   $md5_a $md5_b $md5_t $md5_h    $md5_6515 "

  md5_a=$(echo "select * from pgbench_accounts $where order by aid" | psql -qtAXp 6516 | md5sum | cut -b1-9)
  md5_b=$(echo "select * from pgbench_branches order by bid" | psql -qtAXp 6516 | md5sum | cut -b1-9)
  md5_t=$(echo "select * from pgbench_tellers  order by tid" | psql -qtAXp 6516 | md5sum | cut -b1-9)
  md5_h=$(echo "select * from pgbench_history  order by hid" | psql -qtAXp 6516 | md5sum | cut -b1-9)
  md5_6516=$( echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-9)
  echo      "6516   $md5_a $md5_b $md5_t $md5_h    $md5_6516 "

  if [[ $num_instances -eq 3 ]]; then
  md5_a=$(echo "select * from pgbench_accounts $where order by aid" | psql -qtAXp 6517 | md5sum | cut -b1-9)
  md5_b=$(echo "select * from pgbench_branches order by bid" | psql -qtAXp 6517 | md5sum | cut -b1-9)
  md5_t=$(echo "select * from pgbench_tellers  order by tid" | psql -qtAXp 6517 | md5sum | cut -b1-9)
  md5_h=$(echo "select * from pgbench_history  order by hid" | psql -qtAXp 6517 | md5sum | cut -b1-9)
  md5_6517=$( echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-9)
  echo -ne  "6517   $md5_a $md5_b $md5_t $md5_h    $md5_6517 "
  else
    echo
  fi

  if [[ "$md5_6515" == "$md5_6516" ]]
  then
    if [[ $num_instances -eq 2 ]]; then
      echo "   ok (2)"
      break
    elif [[ $num_instances -eq 3 ]]; then
      if [[ "$md5_6515" == "$md5_6517" ]]
      then
        echo "   ok"
        break
      fi
    fi
  fi
  echo "  NOK"
  echo
  sleep 10
done
echo

#  stop instances
for n in `seq 1 $num_instances`
do
  instance=instance$n
  server_dir=$root_dir/$instance
  data_dir=$server_dir/data
  port=$(( 6515 + $n - 1 ))
  logfile=$server_dir/logfile.$port
  $pg_ctl stop -D $data_dir --mode=immediate --wait
done

#  delete everything
echo "rm -rf /tmp/cascade/instance*"
      rm -rf /tmp/cascade/instance*


Reply via email to