At Thu, 20 Oct 2022 13:28:45 +0530, Bharath Rupireddy <bharath.rupireddyforpostg...@gmail.com> wrote in > On Thu, Oct 20, 2022 at 3:10 AM Andres Freund <and...@anarazel.de> wrote: > > > > Hi, > > > > While reviewing > > https://postgr.es/m/CAD21AoBe2o2D%3Dxyycsxw2bQOD%3DzPj7ETuJ5VYGN%3DdpoTiCMRJQ%40mail.gmail.com > > I noticed that pg_recvlogical prints > > "pg_recvlogical: error: unexpected termination of replication stream: " > > > > when signalled with SIGINT/TERM. > > > > Oddly enough, that looks to have "always" been the case, even though clearly > > the code tried to make provisions for a different outcome. > > > > > > It looks to me like all that's needed is to gate the block printing the > > message with an !time_to_abort.
+1 > +1. How about emitting a message like its friend pg_receivewal, like > the attached patch? I'm not a fan of treating SIGINT as an error in this case. It calls prepareToTerminate() when time_to_abort and everything goes fine after then. So I think we should do the same thing after receiving an interrupt. This also does file-sync naturally as a part of normal shutdown. I'm also not a fan of doing fsync at error. > > I also then noticed that we don't fsync the output file in cases of errors - > > that seems wrong to me? Looks to me like that block should be moved till > > after > > the error:? > > How about something like the attached patch? regards. -- Kyotaro Horiguchi NTT Open Source Software Center
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index 5f2e6af445..e33c204df0 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -55,6 +55,7 @@ static const char *plugin = "test_decoding"; /* Global State */ static int outfd = -1; static volatile sig_atomic_t time_to_abort = false; +static volatile sig_atomic_t interrupted = false; static volatile sig_atomic_t output_reopen = false; static bool output_isfile; static TimestampTz output_last_fsync = -1; @@ -206,6 +207,7 @@ StreamLogicalLog(void) char *copybuf = NULL; TimestampTz last_status = -1; int i; + XLogRecPtr cur_record_lsn = InvalidXLogRecPtr; PQExpBuffer query; output_written_lsn = InvalidXLogRecPtr; @@ -275,7 +277,6 @@ StreamLogicalLog(void) int bytes_written; TimestampTz now; int hdr_len; - XLogRecPtr cur_record_lsn = InvalidXLogRecPtr; if (copybuf != NULL) { @@ -487,7 +488,7 @@ StreamLogicalLog(void) if (endposReached) { - prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr); + cur_record_lsn = InvalidXLogRecPtr; time_to_abort = true; break; } @@ -527,7 +528,6 @@ StreamLogicalLog(void) */ if (!flushAndSendFeedback(conn, &now)) goto error; - prepareToTerminate(conn, endpos, false, cur_record_lsn); time_to_abort = true; break; } @@ -572,12 +572,14 @@ StreamLogicalLog(void) /* endpos was exactly the record we just processed, we're done */ if (!flushAndSendFeedback(conn, &now)) goto error; - prepareToTerminate(conn, endpos, false, cur_record_lsn); time_to_abort = true; break; } } + if (time_to_abort) + prepareToTerminate(conn, endpos, false, cur_record_lsn); + res = PQgetResult(conn); if (PQresultStatus(res) == PGRES_COPY_OUT) { @@ -657,6 +659,7 @@ static void sigexit_handler(SIGNAL_ARGS) { time_to_abort = true; + interrupted = true; } /* @@ -1031,6 +1034,8 @@ prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr l if (keepalive) pg_log_info("end position %X/%X reached by keepalive", LSN_FORMAT_ARGS(endpos)); + else if (interrupted) + pg_log_info("interrupted after %X/%X", LSN_FORMAT_ARGS(lsn)); else pg_log_info("end position %X/%X reached by WAL record at %X/%X", LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));