On 2017-12-20 06:27, Michael Paquier wrote:
On Wed, Dec 20, 2017 at 7:46 AM, Erik Rijkers <e...@xs4all.nl> wrote:

TRAP: FailedAssertion("!(TransactionIdPrecedesOrEquals(safeXid, snap->xmin))", File: "snapbuild.c", Line: 580)

Sorry, that was probably too terse, I should explain that a little.

After initing 50 instances, I set up and run a pgbench session in the master
session; the pgbench lines are:

  init: pgbench --port=6515 --quiet --initialize --scale=1 postgres
run: pgbench -M prepared -c 16 -j 8 -T 1 -P 1 -n postgres -- scale 1

the other instances then catch up.  The whole takes 5 minutes or so

I vary scale, duration, and number of instances. I haven't had it fail in this way yet but I mostly tried with lower number of instances (up to 25 or
so).

Hm. Are you saying that it takes at least 50 cascading instances to
see the problem you are seeing? And that you are not seeing any
problems with a lower number of cascading instances? Are you enabling
hot_standby_feedback?

That sounds more definitive than I meant it, but yes, only now that I tried a higher number of instances did I see this. But is also often succeeds at up to 100 instances (100 is the highest I have tried).

These 50 instances were a logical replication chain, and hot_standby_feedback is off.

Overnight I ran 80x the test that failed yesterday: now they all 80 succeeded. I am not sure what causes failure over success.

(logical replication does the initial syncing of the instances one by one (sequentially) so it isn't as busy as expected; it just takes a long time)

I wrote a simple perl program to test logical replication (attached, FWIW), running:

./cascade.pl --instances=50 --scale=1 --clients=16 --threads=8 --duration=1 --repeats=3 --waiting=10

This cascade.pl program uses knowledge of my setup so probably won't run elsewhere as is but it shows how the failing test was done.


Erik
#!/usr/bin/env perl
#!/opt/perl-5.26/bin/perl
#!/home/aardvark/perl-5.27/bin/perl
use strict; use warnings;
use DBI; use Digest::MD5; use Tie::Comma; use Getopt::Long;
use IPC::Run qw/run start pump finish timeout/;
use Time::HiRes qw/tv_interval gettimeofday/;
use File::Temp qw/tempfile tempdir/;
use POSIX qw/strftime/;
use constant { BASE_PORT => 6515, };
our $USER         = 'aardvark';
our %pg_data_dir  = ();
our %pg_xlog_dir  = ();
our $ROOT_DIR     = "/home/$USER";
our $ROOT_TMPDIR  = $ROOT_DIR . "/tmp/cascade";
our $PG_STUFF_DIR = $ROOT_DIR . "/pg_stuff";
our $ASSERTIONS   = 1;
our $PGVERSION    = 
      "HEAD"
    # "REL_10_STABLE"
;
our $BIN_DIR      = $ASSERTIONS == 1 ? "bin" : "bin.fast" ;
our $POSTGRES     = $PG_STUFF_DIR .  "/pg_installations/pgsql.$PGVERSION/$BIN_DIR/postgres";
our $INITDB       = $PG_STUFF_DIR .  "/pg_installations/pgsql.$PGVERSION/$BIN_DIR/initdb";
our $PG_CTL       = $PG_STUFF_DIR .  "/pg_installations/pgsql.$PGVERSION/$BIN_DIR/pg_ctl";
our $DEVEL_FILE   = $PG_STUFF_DIR . '/.11devel'; # password  (same for 10, 11)
our $num_ok  = 0;
our $num_NOK = 0;
our @prj = (); our @port = (); our @dbh  = ();
$| = 1;
main();
exit;
sub settings {
  $BIN_DIR      = $ASSERTIONS == 1 ? "bin" : "bin.fast" ;
  $POSTGRES     = $PG_STUFF_DIR .  "/pg_installations/pgsql.$PGVERSION/$BIN_DIR/postgres";
  $INITDB       = $PG_STUFF_DIR .  "/pg_installations/pgsql.$PGVERSION/$BIN_DIR/initdb";
  $PG_CTL       = $PG_STUFF_DIR .  "/pg_installations/pgsql.$PGVERSION/$BIN_DIR/pg_ctl";
  $DEVEL_FILE   = $PG_STUFF_DIR . '/.11devel'; # password  (same for 10, 11)
}
sub main {
  print sprintf('-- perl ' . "%vd\n", $^V);
  my $repeats  = 1;
  my $scale    = 1;
  my $clients  = 8;
  my $duration = 1;
  my $num_proj = 3;
  my $waiting  = 5;
  my $threads  = 8;
  my $verbose  = 0;
  my $fast     = $ASSERTIONS == 0 ? 1 : 0;
  my $help     = undef;
  GetOptions(  "repeats=i"   => \$repeats
             , "scale=i"     => \$scale
             , "clients=i"   => \$clients
             , "threads=i"   => \$threads
             , "duration=i"  => \$duration
             , "instances=i" => \$num_proj
             , "waiting=i"   => \$waiting
             , "verbose"     => \$verbose
             , "fast"        => \$fast
             , "help"        => \$help
  ) or die("Error in command line arguments\n");
  if ($help) {
    usage();
    exit(0);
  }

  if ($fast) { $ASSERTIONS = 0 }
  else       { $ASSERTIONS = 1 }

  settings();

  print "-- ", construct_cmdline($num_proj, $scale, $clients, $threads, $duration, $repeats, $waiting, $verbose) , "\n";;

  for (1..$num_proj) { add_instance(); }
  print "\n";
  test_pgbench_derail($scale, $clients, $threads, $duration, $repeats, $waiting, $verbose);

  if (1) {
    print "-- stopping instances ";
    stop_instances();
    print "\n";
  }

  print "-- done.\n";

}
sub construct_cmdline {
  my ($num_proj, $scale, $clients, $threads, $duration, $repeats, $waiting, $verbose) = @_;
  return "./cascade.pl --instances=$num_proj --scale=$scale --clients=$clients --threads=$threads --duration=$duration --repeats=$repeats --waiting=$waiting " . ($verbose ? "--verbose" : "");
}
sub stop_instances {
  for my $n (0 .. scalar(@prj)-1) {
    print ".";
    stop_instance($prj[$n], $port[$n]);
  }
}
sub readdir_for_port {
  my ($port) = @_;
  my $dir = $ROOT_TMPDIR;
  opendir(my $dh, $dir) || die "Can't open $dir: $!";
  my ($in, $out, $err);
  my ($dir_found) = undef;
  #print "\n";
  while (readdir $dh) {
    next if ($_ =~ m/^[.]+$/);
    my $conf = "$dir/$_/data/postgresql.conf";
    #print "$port: conf = $conf\n";
    if ( -e $conf )
    {
      #print "-- found: $conf   now grepping\n";
      #print "-- grep -l $port $conf\n";
      run [ 'grep', '-l', $port, $conf ], \$in, \$out, \$err;
      chomp $out;
      #print "out   [", $out, "]\n";
      if ($out) {
         #print "for $port: grep out:  $port found in:  ", $out, "\n";
      }
      else {
         #print "for $port: grep out:  $port NOT found\n";
         next;
      }
      $dir_found = $out;
      if ($dir_found =~ m,^(.*)/data/postgresql.conf,) {
         $dir_found = $1;
      }
      last;
    }
    #else {
    #    print "no such file:  $conf\n";
    #}
  }
  closedir $dh;
  return $dir_found;
}
sub add_instance {
  my $n = scalar(@prj) ;
  my $project = "proj" . sprintf("%02d",$n);
  my $port = BASE_PORT + $n; # BASE_PORT is 6515
  if    ($n == 0) { print "-- "}
  elsif ($n == 1) { print "-- "}
  else            { print " " }
# print "$project";
  print sprintf("%02d/%d",$n, $port);
  push(@prj, $project);
  push(@port, $port);
  my $good_dir = readdir_for_port($port);
  my $INIT = 0;
  if (defined $good_dir &&  -d $good_dir)
  {
    #print "-- using previous directory  (to wit: $good_dir  )\n";
    $pg_data_dir{ $port } = $good_dir . "/data";
    $pg_xlog_dir{ $port } = $good_dir . "/wal";
  }
  else
  {
    #print "-- creating new directory\n";
    init_instance($project, $port);
    $INIT = 1;
  }
  start_instance($project, $port);
  my $dbh = connectdb($port);
  if ($INIT eq 1) {
    my $rc = $dbh->do( "
      create extension adminpack;
      create extension amcheck;
      create extension bloom;
      create extension btree_gin;
      create extension btree_gist;
      create extension citext;
      create extension cube;
      create extension file_fdw;
      create extension hstore;
      create extension intarray;
      create extension pg_stat_statements;
      create extension pgstattuple;
      create extension pg_trgm;
      create extension pgcrypto;
      create extension postgres_fdw;
    ");
    if ($rc ne "0E0") {
      print "installing modules returned [$rc]\n";
      sleep 3;
    }
  }
  push(@dbh, $dbh);
  if ( $n == 0 ) {
    print " ";
    show_instance($dbh);
  }
}
sub show_instance {
  my ($dbh) = @_;
  print $dbh->selectrow_arrayref( "select current_setting('port') || ' ' || current_setting('data_directory') || '  ' || version()
    || ', debug ' || current_setting('debug_assertions') " )->[0], "\n";
}
sub show_instances {
  for my $dbh (@dbh) {
    show_instance($dbh); # print $dbh->selectrow_arrayref( "select current_setting('port') || ' ' || current_setting('data_directory') || '  ' || version()" )->[0], "\n";
  }
  if ( 0 ) {
    for my $dbh (@dbh) {
      print
        $dbh->selectrow_arrayref( "select current_setting('port')")->[0],
        "  debug_assertions [", $dbh->selectrow_arrayref( "select current_setting('debug_assertions') " )->[0], "]\n";
    }
  }
}
sub test_pgbench_derail {

  my ($scale, $clients, $threads, $duration, $total, $waiting, $verbose) = @_;

  my @durations = ();
  my $dur_total = 0;
  print "-- test: test_pgbench_derail (", $total, "x)\n";
  for my $n (1..$total) {
    print "-- start\n";
    my $t0 = [gettimeofday];
    pgbench_derail2( $scale, $clients, $threads, $duration, $total, $waiting, $verbose);
    my $d0  = tv_interval($t0,[gettimeofday]);
    $dur_total += $d0;
    push(@durations,$d0);
    my $avg =
    print "-- end   ", sprintf("%6.3f", $d0), "    ok  $comma{$num_ok} (of $comma{$total})   avg "
                     , sprintf("%6.3f", $dur_total / $n), "\n";
    if ($num_NOK > 0) {
      print "                      NOK  $num_NOK\n";
    }
  }
  if ($total > 1) {
    printf "--                 ok %4d\n", $num_ok;
    printf "--                NOK %4d\n", $num_NOK;
  }
}
sub pgbench_drop_tables{
  my ($dbh) = @_;

  $dbh->do( "drop table if exists pgbench_accounts;
             drop table if exists pgbench_branches;
             drop table if exists pgbench_tellers;
             drop table if exists pgbench_history; " );
}
sub pgbench_init_cmds {
  my ($port, $dbname, $scale) = @_;
  my ($in, $out, $err);
  my @cmd = ();
  push(@cmd, "pgbench");
  push(@cmd, "--port=$port");
  push(@cmd, "--quiet");
  push(@cmd, "--initialize");
  push(@cmd, "--scale=$scale");
  push(@cmd, $dbname );
  (\@cmd);
}
sub pgbench_initialise {
  my ($dbh,$rcmd) = @_;
  my ($in,$out,$err);
  run $rcmd, \$in, \$out, \$err;
  #print( ($in  ? $in  : '')
  #     , ($out ? $out : '')
  #    #, ($err ? $err : '')
  #) ;
  $dbh->do("alter table pgbench_history add column hid serial primary key;");
}
sub pgbench_tables_dump_restore {
  my ($port1, $dbname1, $port2, $dbname2) = @_;
  my ($in, $out, $err) = (undef,undef,undef);
  run  [    'pg_dump'
          , '-Fc'
          , "-p$port1"
          , '--exclude-table-data=pgbench_history'
          , '--exclude-table-data=pgbench_accounts'
          , '--exclude-table-data=pgbench_branches'
          , '--exclude-table-data=pgbench_tellers'
          , '-tpgbench_history'
          , '-tpgbench_accounts'
          , '-tpgbench_branches'
          , '-tpgbench_tellers'
          , $dbname1
        ]
  , '|'
  ,     [   'pg_restore'
          , '-1'
          , '-p', $port2
          , '-d', $dbname2
        ]
  , \$in, \$out, \$err ;
  chomp $in;
  chomp $out;
  chomp $err;
  print $in , $out , $err ;
}
sub pgbench_run {

  my ($protocol,$clients,$threads,$duration,$pseconds,$port,$dbname, $scale) = @_;

  my ($in, $out, $err) = (undef, undef, undef);
  my $msg1 = "-- pgbench -M $protocol -c $clients -j $threads -T $duration -P $pseconds -n $dbname -- scale $scale";
  print $msg1;
  my $t0 = [gettimeofday];
  run ["pgbench", "-M$protocol", "-c$clients", "-j$threads", "-T$duration", "-P$pseconds", "-n", "-p$port", $dbname ]
       , \$in, \$out, \$err;
  chomp $out;
  my  @rows_processed = ($out =~ /actually processed: (\d+)/);
  my (@tps) = $out =~ /\ntps = (\d+)[.]\d+ .including/g;
  my $msg2 = "  change " . $rows_processed[0] . " rows, " . $tps[0] . " tps" ;
  print $msg2;
  print " "x(line_length()  - length($msg1 . $msg2)), sprintf("%7.3f\n", tv_interval($t0, [gettimeofday]) );
}
sub line_length { (5 * md5_width()) + 64 }
sub pgbench_change_subscriber_struct {
  my ($dbh,$port, $dbname) = @_;
  for my $n (1..5) {
    my $rc1 = $dbh->do("alter table pgbench_accounts add column dummy$n text default 'hah!'; ");
#   my $rc2 = $dbh->do("alter table pgbench_accounts add column id_$n serial ; ");
    #print $rc1, " (added dummy$n column to pgbench_accounts)\n";
  }
}
sub pgbench_derail2 {

#   pgbench_derail2( $scale, $clients, $threads, $duration, $total, $waiting );
  my               ($scale, $clients, $threads, $duration, $total, $waiting, $verbose) = @_;

  my $rc;
  my $dbname = "postgres";
  my $pseconds = int($duration / 5); if ($pseconds < 1) { $pseconds = 1; }
  my $wait     = defined $waiting ? $waiting :
                 $scale <=   5 ?  5 :
                 $scale <=  50 ? 20 :
                 $scale <= 100 ? 30 : 60 ;
  for my $n (0 .. scalar(@dbh)-1) { drop_subscriptions($dbh[$n]); }
  for my $n (0 .. scalar(@dbh)-1) { drop_publications ($dbh[$n]); }
  for (@dbh) { pgbench_drop_tables($_); }
  my ($rcmd) = pgbench_init_cmds($port[0], $dbname, $scale);
  my $init_msg = "-- " . join(" ", @$rcmd);
  print $init_msg;
  my $t0 = [gettimeofday];
  pgbench_initialise($dbh[0], $rcmd);
  print " "x(line_length() - length($init_msg)), sprintf("%7.3f\n", tv_interval($t0, [gettimeofday])  );
  my $pg_size_pretty = $dbh[0]->selectrow_arrayref("select
      pg_size_pretty( pg_relation_size('pgbench_accounts')\n"
                . " + pg_relation_size('pgbench_branches')\n"
                . " + pg_relation_size('pgbench_tellers' )\n"
                . " + pg_relation_size('pgbench_history' )\n )")->[0] ;
  print "-- size ", $pg_size_pretty, "\n";

  for my $n (1 .. scalar(@dbh)-1) {
      pgbench_tables_dump_restore($port[0], $dbname, $port[$n], $dbname);
  }

# pgbench_change_subscriber_struct($db2,$port2, $dbname2);
#
  create_pubsub_cascade($dbname);

  my $protocol = "prepared";
  pgbench_run($protocol,$clients,$threads,$duration,$pseconds,$port[0],$dbname,$scale);

  my $display_level = 0;
  if ( $verbose ) {
     $display_level = 1;
  }

  my ($rdigest1, $rdigest2, $rdigest3, $rline1, $rline2, $rline3) = (\'1' , \'2', \'3', \'', \'', \'');
  my %seen = ();
  my ($rdigest0, $rline0) = get_md5_digest_pgbench($dbh[0], $port[0], $dbname);
  print "\n", $$rline0, "\n";
  if ( $display_level  == 1) { print "\n"; }
  my $count = 0;
  my $limit = 1;
# my $t0 = [gettimeofday];
  while (1) {
    $count++;
    if (1) {
      my %tops = ();
      for my $n (0, scalar(@dbh)-1) {
        my $t0 = [gettimeofday];
        my ($rdigest_top, $rline_top) = get_md5_digest_pgbench_top($dbh[$n], $port[$n], $dbname, $limit);
        if ( $display_level  == 1 ) {
          print "", $$rline_top, "   ", $$rdigest_top eq $$rdigest0 ? "          " : "          ";
          printf "  %7.3f", tv_interval($t0, [gettimeofday]);
        }
        if (! exists $tops{ $$rdigest_top }) {
          $tops{ $$rdigest_top } = 1;
        }
        my $num_keys = keys %tops;
        if ($num_keys > 1) {
          if ( $display_level == 1) { print "\n"; }
          last;
        }
        if ( $display_level == 1) {
          print "\n";
        }
        else {
          print "x";
          if    ($count % 50 == 0) { print "\n"; }
          elsif ($count % 10 == 0) { print " ";  }
        }
      }
      my $num_keys = keys %tops;
      if ($num_keys > 1) {
        if ( $display_level == 1) { print "      ${wait}s..."; }
        sleep $wait;
        if ( $display_level == 1) { print " run\n"; }
        next;
      }
    }
    if ($display_level == 1) { print "\n"; }
    else                     { print "\nfull: "; }

    my (@digest,@line)=((),());
    for my $n (0 .. scalar(@dbh)-1) {
      my $t0 = [gettimeofday];
      my ($rdigest, $rline) = get_md5_digest_pgbench($dbh[$n], $port[$n], $dbname);
      if ($display_level == 1) {
        print $$rline, "   ", $$rdigest eq $$rdigest0 ? "replica ok" : "       NOK";
        printf "  %7.3f\n", tv_interval($t0, [gettimeofday]);
      }
      else {
        print $$rdigest eq $$rdigest0 ? "o" : "n";
      }
      push(@digest, $$rdigest);
      push(@line  , $$rline);
      if (not exists $seen{ $$rdigest } ) {
        $seen{ $$rdigest } = 1;
      }
    }
    print "\n";
    my $NOK = 0;
    for my $dig (@digest) {
       if ($dig ne $$rdigest0) {
         $NOK = 1;
       }
    }
    if ($NOK == 0) {
       $num_ok ++;
       print $line[ $#line ], "\n";
       print "-- All is well.\n";
       last;
    }
    else {
       print "-- Not good.\n";
    }
    sleep $wait;
  }
  for my $n (0 .. scalar(@dbh)-1) { drop_subscriptions($dbh[$n]); }
  for my $n (0 .. scalar(@dbh)-1) { drop_publications ($dbh[$n]); }
}
sub drop_subscriptions {
  my ($dbh) = @_;
  while ( (my $count = $dbh->selectrow_arrayref( "select count(*) from pg_subscription limit 1")->[0]) > 0 ) {
    my $subname = $dbh->selectrow_arrayref( "select subname from pg_subscription limit 1")->[0];
    #print "-- drop subscription if exists $subname\n";
    $dbh->do("drop subscription if exists $subname" );
  }
}
sub drop_publications {
  my ($dbh) = @_;
  while ( (my $count = $dbh->selectrow_arrayref( "select count(*) from pg_publication limit 1")->[0]) > 0 ) {
    my $pubname = $dbh->selectrow_arrayref( "select pubname from pg_publication limit 1")->[0];
    #print "-- drop publication  if exists $pubname\n";
    $dbh->do("drop publication  if exists $pubname" );
  }
}
sub create_pubsub {
  my ($dbname) = @_;
  my $pubport = $port[0];
  for my $n (0 .. (scalar(@dbh)-1)) {
    my $pubname = "pub1";
    my $subname = "sub1";
    my $dbh = $dbh[$n];
    my $subport = $port[$n];
    if ($n == 0) { create_publication(  $dbh, $pubname ); }
    else         { create_subscription( $dbh, $dbname, $subname, $subport, $pubname, $pubport ); }
  }
}
sub create_pubsub_cascade {
  my ($dbname) = @_;
  for my $n (0 .. (scalar(@dbh)-1)) {
    my $dbh = $dbh[$n];
    if    ($n == 0)  {
      my $pubport = $port[$n];
      my $pubname = "pub${n}_${pubport}";
      create_publication( $dbh, $pubname );
    }
    elsif ($n == (scalar(@dbh)-1)) {
      my $subport    = $port[$n];
      my $subname    = "sub${n}_${subport}";
      my $prepubport = $port[$n - 1];
      my $prepubname = "pub" . ($n - 1) . "_${prepubport}";
      create_subscription( $dbh, $dbname, $subname, $subport, $prepubname, $prepubport );
    }
    else {
      my $subport    = $port[$n];
      my $subname    = "sub${n}_${subport}";
      my $prepubport = $port[$n - 1];
      my $prepubname = "pub" . ($n - 1) . "_${prepubport}";
      create_subscription( $dbh, $dbname, $subname, $subport, $prepubname, $prepubport );
      my $pubport = $port[$n];
      my $pubname = "pub${n}_${pubport}";
      create_publication(  $dbh, $pubname );
    }
  }
}
sub get_md5_digest_pgbench_top {
  my ($dbh,$port,$dbname,$limit) = @_;
  my $md5_a_top = get_md5_limit($port, $dbname, "pgbench_accounts", "aid", ["aid","bid","abalance","filler"]                 , $limit );
  my $md5_b_top = get_md5_limit($port, $dbname, "pgbench_branches", "bid", ["bid","bbalance","filler"]                       , $limit );
  my $md5_t_top = get_md5_limit($port, $dbname, "pgbench_tellers" , "tid", ["tid","bid","tbalance","filler"]                 , $limit );
  my $md5_h_top = get_md5_limit($port, $dbname, "pgbench_history" , "hid", ["hid","bid","aid","delta","mtime","filler","hid"], $limit );
  my $ctx = Digest::MD5->new;
  $ctx->add("$md5_a_top $md5_b_top $md5_t_top $md5_h_top");
  my $digest = $ctx->hexdigest;
  my $line = sprintf("$port %11s %7s %7s %7s   %-s %-s %-s %-s   %s"
       , '', '', '', ''
       , $md5_a_top, $md5_b_top, $md5_t_top, $md5_h_top,  substr($digest,0,md5_width()) );
  (\$digest, \$line);
}
sub get_md5_digest_pgbench {
  my ($dbh,$port,$dbname) = @_;
  my $cnt_a = $dbh->selectrow_arrayref("select count(*) from pgbench_accounts")->[0];
  my $cnt_b = $dbh->selectrow_arrayref("select count(*) from pgbench_branches")->[0];
  my $cnt_t = $dbh->selectrow_arrayref("select count(*) from pgbench_tellers" )->[0];
  my $cnt_h = $dbh->selectrow_arrayref("select count(*) from pgbench_history" )->[0];
  my $md5_a = get_md5($port, $dbname, "pgbench_accounts", "aid", ["aid","bid","abalance","filler"]);
  my $md5_b = get_md5($port, $dbname, "pgbench_branches", "bid", ["bid","bbalance","filler"]);
  my $md5_t = get_md5($port, $dbname, "pgbench_tellers" , "tid", ["tid","bid","tbalance","filler"] );
  my $md5_h = get_md5($port, $dbname, "pgbench_history" , "hid", ["hid","bid","aid","delta","mtime","filler","hid"]);
  my $ctx = Digest::MD5->new;
  $ctx->add("$md5_a $md5_b $md5_t $md5_h");
  my $digest = $ctx->hexdigest;
  my $line = sprintf("$port %11s %7s %7s %7s   %-s %-s %-s %-s   %s"
       , $comma{ $cnt_a }, $comma{ $cnt_b }, $comma{ $cnt_t }, $comma{ $cnt_h }
       , $md5_a, $md5_b, $md5_t, $md5_h,  substr($digest,0,md5_width()) );
  (\$digest, \$line);
}
sub md5_width { 7 }
sub get_md5 {
  my ($port, $dbname, $table, $key, $rcols) = @_;
  my ($out, $err);
# run        [ 'echo', "select " . join(",", @$rcols) . " from $table order by $key" ]
#     , '|', [ 'psql', '-qtAXp', $port, '-d', $dbname ]
#     , '|', [ 'md5sum' ]
#     , '|', [ 'cut', '-b', '1-'.md5_width() ], \$out, \$err ;
  my $outf = 'cascade.' . $port . '.' . $table . '.md5' ;
  run        [ 'echo', "select " . join(",", @$rcols) . " from $table order by $key" ]
      , '|', [ 'psql', '-qtAXp', $port, '-d', $dbname ]
      , '>', $outf;
  run        [ 'md5sum', $outf ], 
      , '|', [ 'cut', '-b', '1-'.md5_width() ], \$out, \$err ;
  chomp $out;
  $out; # md5
}
sub get_md5_limit {
  my ($port, $dbname, $table, $key, $rcols, $limit) = @_;
  my ($out, $err);
  run        [ 'echo', "select " . join(",", @$rcols) . " from $table order by $key /*desc*/ limit $limit" ]
      , '|', [ 'psql', '-qtAXp', $port, '-d', $dbname ]
      , '|', [ 'md5sum' ]
      , '|', [ 'cut', '-b', '1-'.md5_width() ], \$out, \$err ;
  chomp $out;
  $out; # md5
}
sub connectdb {
    my ($port) = @_;
    my $dbh;
    my ($tries,$max_tries) = (0, 5);
    while (( ! defined $dbh) && $tries <= $max_tries) {
      $tries++;
      eval {
        $dbh = DBI->connect( "dbi:Pg:port=$port;db=postgres;", undef, undef );
      };
      if ($@) {
        print "error while connecting (try $tries) to db postgres on port $port - $!\n";
        sleep 2;
        next;
      }
    }
    if (! defined $dbh) {
        die "error: \$dbh is invalid\n";
    }
    $ENV{ PGAPPNAME } = "cascade.pl:" . $port;
    $dbh;
}
sub init_instance            { my($project, $port) = @_; action_instance( "init"        , $project, $port); }
sub start_instance           { my($project, $port) = @_; action_instance( "start"       , $project, $port); }
sub start_instance_as_master { my($project, $port) = @_; action_instance( "start master", $project, $port); }
sub start_instance_as_slave  { my($project, $port) = @_; action_instance( "start slave" , $project, $port); }
sub stop_instance            { my($project, $port) = @_; action_instance( "stop"        , $project, $port); }
sub action_instance {
  my ($action, $project, $port) = @_;
  my ($data_dir, $xlog_dir) = get_PGDATA($port);
  my $pg_stuff_dir   = $PG_STUFF_DIR ;
  my $logfile        = $data_dir . "/../logfile.$port"; # 
  my $pg_version     = $PGVERSION;
  my $bin_dir        = $BIN_DIR  ;
  my $postgres       = $POSTGRES ;
  my $initdb         = $INITDB   ;
  my $pg_ctl         = $PG_CTL   ;
  my $pwd_encr       = 'scram-sha-256';  # 'md5';
  my $devel_file     = $DEVEL_FILE;
  my $data_checksums = undef;  # '--data-checksums';

  my @startdb = ();
  push(@startdb, $pg_ctl);
  push(@startdb, "--pgdata=$data_dir");
  push(@startdb, "--log=$logfile");
  push(@startdb, "--wait");
  push(@startdb, "start");

  if ($action eq "init") {
    my @initdb = ();
    push(@initdb, $initdb);
    push(@initdb, "--pgdata=$data_dir");
    push(@initdb, "--encoding=UTF8");
    push(@initdb, "--auth=$pwd_encr");
    push(@initdb, "--pwfile=$devel_file");
    push(@initdb, "--waldir=$xlog_dir");
    if ($data_checksums) { push(@initdb, "$data_checksums"); }
    my ($in, $out, $err);
    run \@initdb, \$in, \$out, \$err;
    if ($in ) { print ">", $in , "< (in)\n";  }
  # if ($out) { print ">", $out, "< (out)\n"; }
    if ($err) { print ">", $err, "< (err)\n"; }
    my $conf = $data_dir . "/postgresql.conf";
    open(my $fh, '>>', $conf) or die "error - could not open file '$conf' $!";
    print $fh "port = $port\n";
    print $fh "client_min_messages = warning\n";
    close $fh;
  }
  elsif ($action eq "start") {
    my @start = ();
    push(@start, $postgres);
    push(@start, "-D"); push(@start, "$data_dir");
    push(@start, "-p"); push(@start, "$port");
    my @options = @{ get_options($port) };
    for my $o (@options) { push(@start, substr($o,0,1) eq '-' ? '' : '--' . $o); }
    my ($in, $out, $err) = (undef,undef,undef);
    start \@start, \$in, \$out, \$err;
    if ($in ) { print ">", $in , "< (in)\n";  }
    if ($out) { print "" , $out, " (out)\n"; }
    if ($err) { print ">", $err, "< (err)\n"; }

    #  run pg_isready till the database is available:
    while ( 1 ) {
      run ['pg_isready', "--timeout=10", "-p$port", "-dpostgres" ], \$in, \$out, \$err;
      chomp $out;
      if ($in ) { print ">", $in , "< (in)\n";  }
     #if ($out) { print "" , $out, " (out)\n"; }
      if ($err) { print ">", $err, "< (err)\n"; }
      last if (index($out, 'accepting connections' ) >= 0);
      sleep 1;
    }
    my $dbh = connectdb($port);
    drop_subscriptions($dbh);
    drop_publications($dbh);
    $dbh->disconnect;
  }
  elsif ($action eq "stop") {
    my @stopdb = ();
    push(@stopdb, $pg_ctl);
    push(@stopdb, "--pgdata=$data_dir");
    push(@stopdb, "--wait");
    push(@stopdb, "stop");
    my ($in, $out, $err) = (undef,undef,undef);
    run \@stopdb, \$in, \$out, \$err;
    if ($in ) { print ">", $in , "< (in)\n";  }
   #if ($out) { print "", $out, "\n"; }
    if ($err) { print ">", $err, "< (err)\n"; }
  }
  else {
    print "-- invalid action [$action]\n";
  }
}
sub create_publication {
  my ($dbh,$pubname) = @_;
  my $rc = $dbh->do("create publication $pubname for all tables;");
}
sub create_subscription {
 my ($dbh, $dbname, $subname, $subport, $pubname, $pubport) = @_;
 my $appname = "casc:" . $subport . "<" . $pubport ;
 my $sql = "create subscription $subname
            connection 'port=" . $pubport . " dbname=${dbname} application_name=${appname}'
            publication $pubname with(enabled=false, slot_name=${subname}_${subport});" ;
 my $rc = $dbh->do( $sql );
 $rc = $dbh->do("alter subscription $subname enable;");
}
sub usage {
  print '
  cascade.pl usage:
  GetOptions(  "repeats=i"   => \$repeats  # 1
             , "scale=i"     => \$scale    # 1
             , "clients=i"   => \$clients  # 8
             , "threads=i"   => \$threads  # 8
             , "duration=i"  => \$duration # 1
             , "instances=i" => \$num_proj # 6
             , "waiting=i"   => \$waiting  # 5
             , "help"        => \$help
  )

 example:

 ./cascade.pl --instances=6 --scale=1 --clients=64 --duration=1 --repeats=1
'
}
sub get_PGDATA{
  my ($port) = @_;
  if (! exists $pg_data_dir{$port} )
  {
    my $dir  = tempdir( CLEANUP => 0, DIR => "${ROOT_TMPDIR}", TEMPLATE => $port . 'XXXXXX'  );
  # my @name = $dir =~ ,([a-zA-Z0-9_]+)$,g;
    $pg_data_dir{ $port } = $dir . "/data";
    $pg_xlog_dir{ $port } = $dir . "/wal";
  # $pg_data_dir{ $name } = $dir . "/data";
  # $pg_xlog_dir{ $name } = $dir . "/wal";
  }
  ($pg_data_dir{$port}, $pg_xlog_dir{$port});
}
sub get_master_options { my ($port) = @_; get_options($port); }
sub get_slave_options  { my ($port) = @_; get_options($port); }
sub get_options {
  my ($port) = @_;
  my ($data_dir, $xlog_dir) = get_PGDATA($port);
  my $server_dir                        = $data_dir . "/.." ;
# my $max_wal_senders                   = 40;  # publication side
# my $max_replication_slots             = 40;  # publication side and subscription side
# my $max_worker_processes              = 42;  # subscription side
# my $max_logical_replication_workers   = 40;  # subscription side
# my $max_sync_workers_per_subscription =  6;  # subscription side
  my @o = ();
  push(@o, "wal_level=logical");
# push(@o, "max_replication_slots=$max_replication_slots");
# push(@o, "max_worker_processes=$max_worker_processes");
# push(@o, "max_logical_replication_workers=$max_logical_replication_workers");
# push(@o, "max_wal_senders=$max_wal_senders");
# push(@o, "max_sync_workers_per_subscription=$max_sync_workers_per_subscription");
  push(@o, "logging_collector=on");
  push(@o, "log_directory=$server_dir");
  push(@o, "log_filename=logfile.${port}");
  push(@o, "log_replication_commands=on");
# push(@o, "max_connections=320");
  push(@o, "autovacuum=off");
  \@o;
}

Reply via email to