use strict;
use warnings;
use DBI; 
use IO::Pipe;
use Storable;
use Data::Dumper;



{
  my $dbh = DBI->connect("dbi:Pg:", "", "", {AutoCommit => 1, RaiseError=>1, PrintError=>0});
  $dbh->do("truncate foo");
  my $sth=$dbh->prepare("insert into foo (index, count) values (?,0)");
  $dbh->begin_work();
  $sth->execute($_) foreach 0..9999;
  $dbh->commit();
};
warn "init";

my @child_pipe;
my $pipe_up;
foreach (1.. ((@ARGV and $ARGV[0]>0) ? $ARGV[0] : 4)) {
    my $pipe = new IO::Pipe;
    defined (my $fork = fork) or die "fork failed: $!";
    if ($fork) {
      push @child_pipe, {pipe => $pipe->reader(), pid => $fork};
    } else {
      $pipe_up=$pipe->writer();
      @child_pipe=();
      last;
    };
};


if (@child_pipe) {
  my %in_flight; 
  my %count;
  ### harvest children data, which consists of the in-flight item, plus a hash with the counts of all confirmed-committed items
  local $/;
  foreach my $handle ( @child_pipe ) {
    my $data=Storable::fd_retrieve($handle->{pipe});
    $in_flight{$data->[0]}=();
    while (my ($k,$v)=each %{$data->[1]}) {
       $count{$k}+=$v;
    };
    close $handle->{pipe} or die "$$ closing child failed with bang $!, and question $?";
    my $pid =waitpid $handle->{pid}, 0 ;
    die "$$: my child $pid exited with non-zero status $?" if $?;
  };
  my $dbh;
  foreach (1..300) { 
       sleep 1;
       $dbh = DBI->connect("dbi:Pg:", "", "", {AutoCommit => 1, RaiseError=>0,PrintError=>0});
       last if $dbh;
  };
  die "Database didn't recover after 5 minutes" unless $dbh;
  $dbh->{RaiseError} = 1;
  warn "sum is ", $dbh->selectrow_array("select sum(count) from foo"), "\n";
  warn "count is ", $dbh->selectrow_array("select count(*) from foo"), "\n";
  my $dat = $dbh->selectall_arrayref("select index, count from foo");
  foreach (@$dat) {
    no warnings 'uninitialized';
    warn "For $_->[0], $_->[1] != $count{$_->[0]}", exists $in_flight{$_->[0]}? " in flight":""  if $_->[1] != $count{$_->[0]};
    delete $count{$_->[0]};
  };
  warn "Left over in %count" if %count;

  exit;
};



my $dbh = DBI->connect("dbi:Pg:", "", "", {AutoCommit => 1, RaiseError=>1,PrintError=>0,PrintWarn=>0});

my %h; # how many time has each item been incremented
my $i; # in flight item which is not reported to have been committed

eval {
  # $dbh->do("SET SESSION synchronous_commit = false");
  my $sth=$dbh->prepare("update foo set count=count+1 where index=?");
  foreach (1..1e6) {
    $i=int rand(10_000);
#warn "$$ $i";
    $sth->execute($i);
    $h{$i}++;
    undef $i;
  };
};

Storable::nstore_fd([$i,\%h],$pipe_up);
close $pipe_up or die "$! $?";
