At Thu, 20 Oct 2022 13:28:45 +0530, Bharath Rupireddy 
<bharath.rupireddyforpostg...@gmail.com> wrote in 
> On Thu, Oct 20, 2022 at 3:10 AM Andres Freund <and...@anarazel.de> wrote:
> >
> > Hi,
> >
> > While reviewing
> > https://postgr.es/m/CAD21AoBe2o2D%3Dxyycsxw2bQOD%3DzPj7ETuJ5VYGN%3DdpoTiCMRJQ%40mail.gmail.com
> > I noticed that pg_recvlogical prints
> > "pg_recvlogical: error: unexpected termination of replication stream: "
> >
> > when signalled with SIGINT/TERM.
> >
> > Oddly enough, that looks to have "always" been the case, even though clearly
> > the code tried to make provisions for a different outcome.
> >
> >
> > It looks to me like all that's needed is to gate the block printing the
> > message with an !time_to_abort.

+1

> +1. How about emitting a message like its friend pg_receivewal, like
> the attached patch?

I'm not a fan of treating SIGINT as an error in this case. It calls
prepareToTerminate() when time_to_abort and everything goes fine after
then. So I think we should do the same thing after receiving an
interrupt.  This also does file-sync naturally as a part of normal
shutdown.  I'm also not a fan of doing fsync at error.

> > I also then noticed that we don't fsync the output file in cases of errors -
> > that seems wrong to me? Looks to me like that block should be moved till 
> > after
> > the error:?
> 
> How about something like the attached patch?

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c 
b/src/bin/pg_basebackup/pg_recvlogical.c
index 5f2e6af445..e33c204df0 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -55,6 +55,7 @@ static const char *plugin = "test_decoding";
 /* Global State */
 static int     outfd = -1;
 static volatile sig_atomic_t time_to_abort = false;
+static volatile sig_atomic_t interrupted = false;
 static volatile sig_atomic_t output_reopen = false;
 static bool output_isfile;
 static TimestampTz output_last_fsync = -1;
@@ -206,6 +207,7 @@ StreamLogicalLog(void)
        char       *copybuf = NULL;
        TimestampTz last_status = -1;
        int                     i;
+       XLogRecPtr      cur_record_lsn = InvalidXLogRecPtr;
        PQExpBuffer query;
 
        output_written_lsn = InvalidXLogRecPtr;
@@ -275,7 +277,6 @@ StreamLogicalLog(void)
                int                     bytes_written;
                TimestampTz now;
                int                     hdr_len;
-               XLogRecPtr      cur_record_lsn = InvalidXLogRecPtr;
 
                if (copybuf != NULL)
                {
@@ -487,7 +488,7 @@ StreamLogicalLog(void)
 
                        if (endposReached)
                        {
-                               prepareToTerminate(conn, endpos, true, 
InvalidXLogRecPtr);
+                               cur_record_lsn = InvalidXLogRecPtr;
                                time_to_abort = true;
                                break;
                        }
@@ -527,7 +528,6 @@ StreamLogicalLog(void)
                         */
                        if (!flushAndSendFeedback(conn, &now))
                                goto error;
-                       prepareToTerminate(conn, endpos, false, cur_record_lsn);
                        time_to_abort = true;
                        break;
                }
@@ -572,12 +572,14 @@ StreamLogicalLog(void)
                        /* endpos was exactly the record we just processed, 
we're done */
                        if (!flushAndSendFeedback(conn, &now))
                                goto error;
-                       prepareToTerminate(conn, endpos, false, cur_record_lsn);
                        time_to_abort = true;
                        break;
                }
        }
 
+       if (time_to_abort)
+               prepareToTerminate(conn, endpos, false, cur_record_lsn);
+
        res = PQgetResult(conn);
        if (PQresultStatus(res) == PGRES_COPY_OUT)
        {
@@ -657,6 +659,7 @@ static void
 sigexit_handler(SIGNAL_ARGS)
 {
        time_to_abort = true;
+       interrupted = true;
 }
 
 /*
@@ -1031,6 +1034,8 @@ prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool 
keepalive, XLogRecPtr l
                if (keepalive)
                        pg_log_info("end position %X/%X reached by keepalive",
                                                LSN_FORMAT_ARGS(endpos));
+               else if (interrupted)
+                       pg_log_info("interrupted after %X/%X", 
LSN_FORMAT_ARGS(lsn));
                else
                        pg_log_info("end position %X/%X reached by WAL record 
at %X/%X",
                                                LSN_FORMAT_ARGS(endpos), 
LSN_FORMAT_ARGS(lsn));

Reply via email to