On 2021-03-10 17:08, Fujii Masao wrote:
On 2021/03/10 14:11, Masahiro Ikeda wrote:
On 2021-03-09 17:51, Fujii Masao wrote:
On 2021/03/05 8:38, Masahiro Ikeda wrote:
On 2021-03-05 01:02, Fujii Masao wrote:
On 2021/03/04 16:14, Masahiro Ikeda wrote:
On 2021-03-03 20:27, Masahiro Ikeda wrote:
On 2021-03-03 16:30, Fujii Masao wrote:
On 2021/03/03 14:33, Masahiro Ikeda wrote:
On 2021-02-24 16:14, Fujii Masao wrote:
On 2021/02/15 11:59, Masahiro Ikeda wrote:
On 2021-02-10 00:51, David G. Johnston wrote:
On Thu, Feb 4, 2021 at 4:45 PM Masahiro Ikeda
<ikeda...@oss.nttdata.com> wrote:
I pgindented the patches.
... <function>XLogWrite</function>, which is invoked during
an
<function>XLogFlush</function> request (see ...). This is
also
incremented by the WAL receiver during replication.
("which normally called" should be "which is normally
called" or
"which normally is called" if you want to keep true to the
original)
You missed the adding the space before an opening
parenthesis here and
elsewhere (probably copy-paste)
is ether -> is either
"This parameter is off by default as it will repeatedly
query the
operating system..."
", because" -> "as"
Thanks, I fixed them.
wal_write_time and the sync items also need the note: "This
is also
incremented by the WAL receiver during replication."
I skipped changing it since I separated the stats for the WAL
receiver
in pg_stat_wal_receiver.
"The number of times it happened..." -> " (the tally of this
event is
reported in wal_buffers_full in....) This is undesirable
because ..."
Thanks, I fixed it.
I notice that the patch for WAL receiver doesn't require
explicitly
computing the sync statistics but does require computing the
write
statistics. This is because of the presence of
issue_xlog_fsync but
absence of an equivalent pg_xlog_pwrite. Additionally, I
observe that
the XLogWrite code path calls pgstat_report_wait_*() while
the WAL
receiver path does not. It seems technically
straight-forward to
refactor here to avoid the almost-duplicated logic in the
two places,
though I suspect there may be a trade-off for not adding
another
function call to the stack given the importance of WAL
processing
(though that seems marginalized compared to the cost of
actually
writing the WAL). Or, as Fujii noted, go the other way and
don't have
any shared code between the two but instead implement the
WAL receiver
one to use pg_stat_wal_receiver instead. In either case,
this
half-and-half implementation seems undesirable.
OK, as Fujii-san mentioned, I separated the WAL receiver
stats.
(v10-0002-Makes-the-wal-receiver-report-WAL-statistics.patch)
Thanks for updating the patches!
I added the infrastructure code to communicate the WAL
receiver stats messages between the WAL receiver and the
stats collector, and
the stats for WAL receiver is counted in
pg_stat_wal_receiver.
What do you think?
On second thought, this idea seems not good. Because those
stats are
collected between multiple walreceivers, but other values in
pg_stat_wal_receiver is only related to the walreceiver
process running
at that moment. IOW, it seems strange that some values show
dynamic
stats and the others show collected stats, even though they
are in
the same view pg_stat_wal_receiver. Thought?
OK, I fixed it.
The stats collected in the WAL receiver is exposed in
pg_stat_wal view in v11 patch.
Thanks for updating the patches! I'm now reading 001 patch.
+ /* Check whether the WAL file was synced to disk right now
*/
+ if (enableFsync &&
+ (sync_method == SYNC_METHOD_FSYNC ||
+ sync_method == SYNC_METHOD_FSYNC_WRITETHROUGH ||
+ sync_method == SYNC_METHOD_FDATASYNC))
+ {
Isn't it better to make issue_xlog_fsync() return immediately
if enableFsync is off, sync_method is open_sync or
open_data_sync,
to simplify the code more?
Thanks for the comments.
I added the above code in v12 patch.
+ /*
+ * Send WAL statistics only if WalWriterDelay has
elapsed to minimize
+ * the overhead in WAL-writing.
+ */
+ if (rc & WL_TIMEOUT)
+ pgstat_send_wal();
On second thought, this change means that it always takes
wal_writer_delay
before walwriter's WAL stats is sent after XLogBackgroundFlush()
is called.
For example, if wal_writer_delay is set to several seconds, some
values in
pg_stat_wal would be not up-to-date meaninglessly for those
seconds.
So I'm thinking to withdraw my previous comment and it's ok to
send
the stats every after XLogBackgroundFlush() is called. Thought?
Thanks, I didn't notice that.
Although PGSTAT_STAT_INTERVAL is 500msec, wal_writer_delay's
default value is 200msec and it may be set shorter time.
Yeah, if wal_writer_delay is set to very small value, there is a
risk
that the WAL stats are sent too frequently. I agree that's a
problem.
Why don't to make another way to check the timestamp?
+ /*
+ * Don't send a message unless it's been at least
PGSTAT_STAT_INTERVAL
+ * msec since we last sent one
+ */
+ now = GetCurrentTimestamp();
+ if (TimestampDifferenceExceeds(last_report, now,
PGSTAT_STAT_INTERVAL))
+ {
+ pgstat_send_wal();
+ last_report = now;
+ }
+
Although I worried that it's better to add the check code in
pgstat_send_wal(),
Agreed.
I didn't do so because to avoid to double check
PGSTAT_STAT_INTERVAL.
pgstat_send_wal() is invoked pg_report_stat() and it already
checks the
PGSTAT_STAT_INTERVAL.
I think that we can do that. What about the attached patch?
Thanks, I thought it's better.
I forgot to remove an unused variable.
The attached v13 patch is fixed.
Thanks for updating the patch!
+ w.wal_write,
+ w.wal_write_time,
+ w.wal_sync,
+ w.wal_sync_time,
It's more natural to put wal_write_time and wal_sync_time next to
each other? That is, what about the following order of columns?
wal_write
wal_sync
wal_write_time
wal_sync_time
Yes, I fixed it.
- case SYNC_METHOD_OPEN:
- case SYNC_METHOD_OPEN_DSYNC:
- /* write synced it already */
- break;
IMO it's better to add Assert(false) here to ensure that we never
reach
here, as follows. Thought?
+ case SYNC_METHOD_OPEN:
+ case SYNC_METHOD_OPEN_DSYNC:
+ /* not reachable */
+ Assert(false);
I agree.
Even when a backend exits, it sends the stats via
pgstat_beshutdown_hook().
On the other hand, walwriter doesn't do that. Walwriter also should
send
the stats even at its exit? Otherwise some stats can fail to be
collected.
But ISTM that this issue existed from before, for example
checkpointer
doesn't call pgstat_send_bgwriter() at its exit, so it's overkill
to fix
this issue in this patch?
Thanks, I thought it's better to do so.
I added the shutdown hook for the walwriter and the checkpointer in
v14-0003 patch.
Thanks for 0003 patch!
Isn't it overkill to send the stats in the walwriter-exit-callback?
IMO we can
just send the stats only when ShutdownRequestPending is true in the
walwriter
main loop (maybe just before calling HandleMainLoopInterrupts()).
If we do this, we cannot send the stats when walwriter throws FATAL
error.
But that's ok because FATAL error on walwriter causes the server to
crash.
Thought?
Thanks for your comments!
Yes, I agree.
Also ISTM that we don't need to use the callback for that purpose in
checkpointer because of the same reason. That is, we can send the
stats
just after calling ShutdownXLOG(0, 0) in
HandleCheckpointerInterrupts().
Thought?
Yes, I think so too.
Since ShutdownXLOG() may create restartpoint or checkpoint,
it might generate WAL records.
I'm now not sure how much useful these changes are. As far as I read
pgstat.c,
when shutdown is requested, the stats collector seems to exit even
when
there are outstanding stats messages. So if checkpointer and
walwriter send
the stats in their last cycles, those stats might not be collected.
On the other hand, I can think that sending the stats in the last
cycles would
improve the situation a bit than now. So I'm inclined to apply those
changes...
I didn't notice that. I agree this is an important aspect.
I understood there is a case that the stats collector exits before the
checkpointer
or the walwriter exits and some stats might not be collected.
IIUC the stats collector basically exits after checkpointer and
walwriter exit.
But there seems no guarantee that the stats collector processes
all the messages that other processes have sent during the shutdown of
the server.
Thanks, I understood the above postmaster behaviors.
PMState manages the status and after checkpointer is exited, the
postmaster sends
SIGQUIT signal to the stats collector if the shutdown mode is smart or
fast.
(IIUC, although the postmaster kill the walsender, the archiver and
the stats collector at the same time, it's ok because the walsender
and the archiver doesn't send stats to the stats collector now.)
But, there might be a corner case to lose stats sent by background
workers like
the checkpointer before they exit (although this is not implemented
yet.)
For example,
1. checkpointer send the stats before it exit
2. stats collector receive the signal and break before processing
the stats message from checkpointer. In this case, 1's message is
lost.
3. stats collector writes the stats in the statsfiles and exit
Why don't you recheck the coming message is zero just before the 2th
procedure?
(v17-0004-guarantee-to-collect-last-stats-messages.patch)
I measured the timing of the above in my linux laptop using
v17-measure-timing.patch.
I don't have any strong opinion to handle this case since this result
shows to receive and processes
the messages takes too short time (less than 1ms) although the stats
collector receives the shutdown
signal in 5msec(099->104) after the checkpointer process exits.
```
1615421204.556 [checkpointer] DEBUG: received shutdown request signal
1615421208.099 [checkpointer] DEBUG: proc_exit(-1): 0 callbacks to make
# exit and send the messages
1615421208.099 [stats collector] DEBUG: process BGWRITER stats message
# receive and process the messages
1615421208.099 [stats collector] DEBUG: process WAL stats message
1615421208.104 [postmaster] DEBUG: reaping dead processes
1615421208.104 [stats collector] DEBUG: received shutdown request
signal # receive shutdown request from the postmaster
```
Of course, there is another direction; we can improve the stats
collector so
that it guarantees to collect all the sent stats messages. But I'm
afraid
this change might be big.
For example, implement to manage background process status in shared
memory and
the stats collector collects the stats until another background
process exits?
In my understanding, the statistics are not required high accuracy,
it's ok to ignore them if the impact is not big.
If we guarantee high accuracy, another background process like
autovacuum launcher
must send the WAL stats because it accesses the system catalog and
might generate
WAL records due to HOT update even though the possibility is low.
I thought the impact is small because the time uncollected stats are
generated is
short compared to the time from startup. So, it's ok to ignore the
remaining stats
when the process exists.
I agree that it's not worth changing lots of code to collect such
stats.
But if we can implement that very simply, isn't it more worth doing
that than current situation because we may be able to collect more
accurate stats.
Yes, I agree.
I attached the patch to send the stats before the wal writer and the
checkpointer exit.
(v17-0001-send-stats-for-walwriter-when-shutdown.patch,
v17-0002-send-stats-for-checkpointer-when-shutdown.patch)
BTW, I found BgWriterStats.m_timed_checkpoints is not counted in
ShutdownLOG()
and we need to count it if to collect stats before it exits.
Maybe m_requested_checkpoints should be incremented in that case?
I thought this should be incremented
because it invokes the methods with CHECKPOINT_IS_SHUTDOWN.
```ShutdownXLOG()
CreateRestartPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
```
I fixed in v17-0002-send-stats-for-checkpointer-when-shutdown.patch.
In addition, I rebased the patch for WAL receiver.
(v17-0003-Makes-the-wal-receiver-report-WAL-statistics.patch)
Regards,
--
Masahiro Ikeda
NTT DATA CORPORATION
diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c
index 132df29aba..45c8531ac8 100644
--- a/src/backend/postmaster/walwriter.c
+++ b/src/backend/postmaster/walwriter.c
@@ -78,6 +78,9 @@ int WalWriterFlushAfter = 128;
#define LOOPS_UNTIL_HIBERNATE 50
#define HIBERNATE_FACTOR 25
+/* Prototypes for private functions */
+static void HandleWalWriterInterrupts(void);
+
/*
* Main entry point for walwriter process
*
@@ -242,7 +245,7 @@ WalWriterMain(void)
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
- HandleMainLoopInterrupts();
+ HandleWalWriterInterrupts();
/*
* Do what we're here for; then, if XLogBackgroundFlush() found useful
@@ -272,3 +275,34 @@ WalWriterMain(void)
WAIT_EVENT_WAL_WRITER_MAIN);
}
}
+
+/*
+ * interrupt handler for main loops of WAL writer processes.
+ */
+static void
+HandleWalWriterInterrupts(void)
+{
+ if (ProcSignalBarrierPending)
+ ProcessProcSignalBarrier();
+
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ if (ShutdownRequestPending)
+ {
+ /*
+ * Force to send remaining WAL statistics to the stats collector at
+ * process exits.
+ *
+ * Since pgstat_send_wal is invoked with 'force' is false in main loop
+ * to avoid overloading to the stats collector, there may exist unsent
+ * stats counters for the WAL writer.
+ */
+ pgstat_send_wal(true);
+
+ proc_exit(0);
+ }
+}
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 3894f4a270..20454d4040 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -168,6 +168,7 @@ static bool IsCheckpointOnSchedule(double progress);
static bool ImmediateCheckpointRequested(void);
static bool CompactCheckpointerRequestQueue(void);
static void UpdateSharedMemoryConfig(void);
+static void pgstat_send_checkpointer(void);
/* Signal handlers */
static void ReqCheckpointHandler(SIGNAL_ARGS);
@@ -495,17 +496,8 @@ CheckpointerMain(void)
/* Check for archive_timeout and switch xlog files if necessary. */
CheckArchiveTimeout();
- /*
- * Send off activity statistics to the stats collector. (The reason
- * why we re-use bgwriter-related code for this is that the bgwriter
- * and checkpointer used to be just one process. It's probably not
- * worth the trouble to split the stats support into two independent
- * stats message types.)
- */
- pgstat_send_bgwriter();
-
- /* Send WAL statistics to the stats collector. */
- pgstat_report_wal();
+ /* Send the statistics for the checkpointer to the stats collector */
+ pgstat_send_checkpointer();
/*
* If any checkpoint flags have been set, redo the loop to handle the
@@ -572,8 +564,18 @@ HandleCheckpointerInterrupts(void)
* back to the sigsetjmp block above
*/
ExitOnAnyError = true;
- /* Close down the database */
+
+ /*
+ * Close down the database.
+ *
+ * Since ShutdownXLOG() creates restartpoint or checkpoint and updates
+ * the statistics, increment the checkpoint request and send the
+ * statistics to the stats collector.
+ */
+ BgWriterStats.m_requested_checkpoints++;
ShutdownXLOG(0, 0);
+ pgstat_send_checkpointer();
+
/* Normal exit from the checkpointer is here */
proc_exit(0); /* done */
}
@@ -1335,3 +1337,22 @@ FirstCallSinceLastCheckpoint(void)
return FirstCall;
}
+
+/*
+ * Send the statistics for the checkpointer to the stats collector
+ */
+static void
+pgstat_send_checkpointer(void)
+{
+ /*
+ * Send off activity statistics to the stats collector. (The reason why
+ * we re-use bgwriter-related code for this is that the bgwriter and
+ * checkpointer used to be just one process. It's probably not worth the
+ * trouble to split the stats support into two independent stats message
+ * types.)
+ */
+ pgstat_send_bgwriter();
+
+ /* Send WAL statistics to the stats collector. */
+ pgstat_report_wal();
+}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 24c3dd32f8..7bad027162 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2534,7 +2534,6 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
Size nbytes;
Size nleft;
int written;
- instr_time start;
/* OK to write the page(s) */
from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
@@ -2544,28 +2543,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
{
errno = 0;
- /* Measure I/O timing to write WAL data */
- if (track_wal_io_timing)
- INSTR_TIME_SET_CURRENT(start);
-
- pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
- written = pg_pwrite(openLogFile, from, nleft, startoffset);
- pgstat_report_wait_end();
-
- /*
- * Increment the I/O timing and the number of times WAL data
- * were written out to disk.
- */
- if (track_wal_io_timing)
- {
- instr_time duration;
-
- INSTR_TIME_SET_CURRENT(duration);
- INSTR_TIME_SUBTRACT(duration, start);
- WalStats.m_wal_write_time += INSTR_TIME_GET_MICROSEC(duration);
- }
-
- WalStats.m_wal_write++;
+ written = XLogWriteFile(openLogFile, from, nleft, startoffset);
if (written <= 0)
{
@@ -2705,6 +2683,46 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
}
}
+/*
+ * Issue pg_pwrite to write an XLOG file.
+ *
+ * 'fd' is a file descriptor for the XLOG file to write
+ * 'buf' is a buffer starting address to write.
+ * 'nbyte' is a number of max bytes to write up.
+ * 'offset' is a offset of XLOG file to be set.
+ */
+int
+XLogWriteFile(int fd, const void *buf, size_t nbyte, off_t offset)
+{
+ int written;
+ instr_time start;
+
+ /* Measure I/O timing to write WAL data */
+ if (track_wal_io_timing)
+ INSTR_TIME_SET_CURRENT(start);
+
+ pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
+ written = pg_pwrite(fd, buf, nbyte, offset);
+ pgstat_report_wait_end();
+
+ /*
+ * Increment the I/O timing and the number of times WAL data were
+ * written out to disk.
+ */
+ if (track_wal_io_timing)
+ {
+ instr_time duration;
+
+ INSTR_TIME_SET_CURRENT(duration);
+ INSTR_TIME_SUBTRACT(duration, start);
+ WalStats.m_wal_write_time += INSTR_TIME_GET_MICROSEC(duration);
+ }
+
+ WalStats.m_wal_write++;
+
+ return written;
+}
+
/*
* Record the LSN for an asynchronous transaction commit/abort
* and nudge the WALWriter if there is work for it to do.
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 7810ee916c..3abd8ac93b 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -770,6 +770,9 @@ WalRcvDie(int code, Datum arg)
/* Ensure that all WAL records received are flushed to disk */
XLogWalRcvFlush(true);
+ /* Send WAL statistics to the stats collector before terminating */
+ pgstat_send_wal(true);
+
/* Mark ourselves inactive in shared memory */
SpinLockAcquire(&walrcv->mutex);
Assert(walrcv->walRcvState == WALRCV_STREAMING ||
@@ -907,6 +910,12 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
XLogArchiveForceDone(xlogfname);
else
XLogArchiveNotify(xlogfname);
+
+ /*
+ * Send WAL statistics to the stats collector when finishing
+ * the current WAL segment file to avoid overloading it.
+ */
+ pgstat_send_wal(false);
}
recvFile = -1;
@@ -928,7 +937,8 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
/* OK to write the logs */
errno = 0;
- byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
+ byteswritten = XLogWriteFile(recvFile, buf, segbytes, (off_t) startoff);
+
if (byteswritten <= 0)
{
char xlogfname[MAXFNAMELEN];
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 1e53d9d4ca..b345de8a28 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -290,6 +290,7 @@ extern bool XLogBackgroundFlush(void);
extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
extern int XLogFileInit(XLogSegNo segno, bool *use_existent, bool use_lock);
extern int XLogFileOpen(XLogSegNo segno);
+extern int XLogWriteFile(int fd, const void *buf, size_t nbyte, off_t offset);
extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
extern XLogSegNo XLogGetLastRemovedSegno(void);
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 68eefb9722..86b8449193 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -329,6 +329,7 @@ static void pgstat_beshutdown_hook(int code, Datum arg);
static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry,
Oid tableoid, bool create);
+static int pgstat_process_message(void);
static void pgstat_write_statsfiles(bool permanent, bool allDbs);
static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
@@ -4810,8 +4811,6 @@ pgstat_send_slru(void)
NON_EXEC_STATIC void
PgstatCollectorMain(int argc, char *argv[])
{
- int len;
- PgStat_Msg msg;
int wr;
WaitEvent event;
WaitEventSet *wes;
@@ -4896,158 +4895,10 @@ PgstatCollectorMain(int argc, char *argv[])
pgstat_write_statsfiles(false, false);
/*
- * Try to receive and process a message. This will not block,
- * since the socket is set to non-blocking mode.
- *
- * XXX On Windows, we have to force pgwin32_recv to cooperate,
- * despite the previous use of pg_set_noblock() on the socket.
- * This is extremely broken and should be fixed someday.
+ * Try to receive and process a message.
*/
-#ifdef WIN32
- pgwin32_noblock = 1;
-#endif
-
- len = recv(pgStatSock, (char *) &msg,
- sizeof(PgStat_Msg), 0);
-
-#ifdef WIN32
- pgwin32_noblock = 0;
-#endif
-
- if (len < 0)
- {
- if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
- break; /* out of inner loop */
- ereport(ERROR,
- (errcode_for_socket_access(),
- errmsg("could not read statistics message: %m")));
- }
-
- /*
- * We ignore messages that are smaller than our common header
- */
- if (len < sizeof(PgStat_MsgHdr))
- continue;
-
- /*
- * The received length must match the length in the header
- */
- if (msg.msg_hdr.m_size != len)
- continue;
-
- /*
- * O.K. - we accept this message. Process it.
- */
- switch (msg.msg_hdr.m_type)
- {
- case PGSTAT_MTYPE_DUMMY:
- break;
-
- case PGSTAT_MTYPE_INQUIRY:
- pgstat_recv_inquiry(&msg.msg_inquiry, len);
- break;
-
- case PGSTAT_MTYPE_TABSTAT:
- pgstat_recv_tabstat(&msg.msg_tabstat, len);
- break;
-
- case PGSTAT_MTYPE_TABPURGE:
- pgstat_recv_tabpurge(&msg.msg_tabpurge, len);
- break;
-
- case PGSTAT_MTYPE_DROPDB:
- pgstat_recv_dropdb(&msg.msg_dropdb, len);
- break;
-
- case PGSTAT_MTYPE_RESETCOUNTER:
- pgstat_recv_resetcounter(&msg.msg_resetcounter, len);
- break;
-
- case PGSTAT_MTYPE_RESETSHAREDCOUNTER:
- pgstat_recv_resetsharedcounter(&msg.msg_resetsharedcounter,
- len);
- break;
-
- case PGSTAT_MTYPE_RESETSINGLECOUNTER:
- pgstat_recv_resetsinglecounter(&msg.msg_resetsinglecounter,
- len);
- break;
-
- case PGSTAT_MTYPE_RESETSLRUCOUNTER:
- pgstat_recv_resetslrucounter(&msg.msg_resetslrucounter,
- len);
- break;
-
- case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER:
- pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter,
- len);
- break;
-
- case PGSTAT_MTYPE_AUTOVAC_START:
- pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
- break;
-
- case PGSTAT_MTYPE_VACUUM:
- pgstat_recv_vacuum(&msg.msg_vacuum, len);
- break;
-
- case PGSTAT_MTYPE_ANALYZE:
- pgstat_recv_analyze(&msg.msg_analyze, len);
- break;
-
- case PGSTAT_MTYPE_ARCHIVER:
- pgstat_recv_archiver(&msg.msg_archiver, len);
- break;
-
- case PGSTAT_MTYPE_BGWRITER:
- pgstat_recv_bgwriter(&msg.msg_bgwriter, len);
- break;
-
- case PGSTAT_MTYPE_WAL:
- pgstat_recv_wal(&msg.msg_wal, len);
- break;
-
- case PGSTAT_MTYPE_SLRU:
- pgstat_recv_slru(&msg.msg_slru, len);
- break;
-
- case PGSTAT_MTYPE_FUNCSTAT:
- pgstat_recv_funcstat(&msg.msg_funcstat, len);
- break;
-
- case PGSTAT_MTYPE_FUNCPURGE:
- pgstat_recv_funcpurge(&msg.msg_funcpurge, len);
- break;
-
- case PGSTAT_MTYPE_RECOVERYCONFLICT:
- pgstat_recv_recoveryconflict(&msg.msg_recoveryconflict,
- len);
- break;
-
- case PGSTAT_MTYPE_DEADLOCK:
- pgstat_recv_deadlock(&msg.msg_deadlock, len);
- break;
-
- case PGSTAT_MTYPE_TEMPFILE:
- pgstat_recv_tempfile(&msg.msg_tempfile, len);
- break;
-
- case PGSTAT_MTYPE_CHECKSUMFAILURE:
- pgstat_recv_checksum_failure(&msg.msg_checksumfailure,
- len);
- break;
-
- case PGSTAT_MTYPE_REPLSLOT:
- pgstat_recv_replslot(&msg.msg_replslot, len);
- break;
-
- case PGSTAT_MTYPE_CONNECTION:
- pgstat_recv_connstat(&msg.msg_conn, len);
- break;
-
- default:
- break;
- }
+ if (pgstat_process_message() < 0)
+ break; /* If an error occurred, go out of inner loop */
} /* end of inner message-processing loop */
/* Sleep until there's something to do */
@@ -5077,6 +4928,21 @@ PgstatCollectorMain(int argc, char *argv[])
break;
} /* end of outer loop */
+ /*
+ * Try to receive and process remaining messages before the process exits.
+ *
+ * The reason is that there is no guarantee all messages were processed in
+ * the above loop even though the stats collector is sent SIGQUIT signal
+ * by the postmaster after other backend and background processes, which
+ * sent their stats to the stats collector, exit if shutdown mode is smart
+ * or fast.
+ *
+ * For example, there might be a case that messages are lost when there
+ * are unprocessed messages, the postmaster send SIGQUIT signal to the
+ * stats collector.
+ */
+ while (pgstat_process_message() > 0);
+
/*
* Save the final stats to reuse at next startup.
*/
@@ -5225,6 +5091,169 @@ pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
return result;
}
+/*
+ * Try to receive and process a message. This will not block,
+ * since the socket is set to non-blocking mode.
+ *
+ * XXX On Windows, we have to force pgwin32_recv to cooperate,
+ * despite the previous use of pg_set_noblock() on the socket.
+ * This is extremely broken and should be fixed someday.
+ *
+ * Return the number of processed message. -1 if an error occurred.
+ */
+static int
+pgstat_process_message()
+{
+ int len;
+ PgStat_Msg msg;
+
+#ifdef WIN32
+ pgwin32_noblock = 1;
+#endif
+
+ len = recv(pgStatSock, (char *) &msg, sizeof(PgStat_Msg), 0);
+
+#ifdef WIN32
+ pgwin32_noblock = 0;
+#endif
+
+ if (len < 0)
+ {
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
+ return -1;
+ ereport(ERROR,
+ (errcode_for_socket_access(),
+ errmsg("could not read statistics message: %m")));
+ }
+
+ /*
+ * We ignore messages that are smaller than our common header
+ */
+ if (len < sizeof(PgStat_MsgHdr))
+ return 0;
+
+ /*
+ * The received length must match the length in the header
+ */
+ if (msg.msg_hdr.m_size != len)
+ return 0;
+
+ /*
+ * O.K. - we accept this message. Process it.
+ */
+ switch (msg.msg_hdr.m_type)
+ {
+ case PGSTAT_MTYPE_DUMMY:
+ break;
+
+ case PGSTAT_MTYPE_INQUIRY:
+ pgstat_recv_inquiry(&msg.msg_inquiry, len);
+ break;
+
+ case PGSTAT_MTYPE_TABSTAT:
+ pgstat_recv_tabstat(&msg.msg_tabstat, len);
+ break;
+
+ case PGSTAT_MTYPE_TABPURGE:
+ pgstat_recv_tabpurge(&msg.msg_tabpurge, len);
+ break;
+
+ case PGSTAT_MTYPE_DROPDB:
+ pgstat_recv_dropdb(&msg.msg_dropdb, len);
+ break;
+
+ case PGSTAT_MTYPE_RESETCOUNTER:
+ pgstat_recv_resetcounter(&msg.msg_resetcounter, len);
+ break;
+
+ case PGSTAT_MTYPE_RESETSHAREDCOUNTER:
+ pgstat_recv_resetsharedcounter(&msg.msg_resetsharedcounter,
+ len);
+ break;
+
+ case PGSTAT_MTYPE_RESETSINGLECOUNTER:
+ pgstat_recv_resetsinglecounter(&msg.msg_resetsinglecounter,
+ len);
+ break;
+
+ case PGSTAT_MTYPE_RESETSLRUCOUNTER:
+ pgstat_recv_resetslrucounter(&msg.msg_resetslrucounter,
+ len);
+ break;
+
+ case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER:
+ pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter,
+ len);
+ break;
+
+ case PGSTAT_MTYPE_AUTOVAC_START:
+ pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
+ break;
+
+ case PGSTAT_MTYPE_VACUUM:
+ pgstat_recv_vacuum(&msg.msg_vacuum, len);
+ break;
+
+ case PGSTAT_MTYPE_ANALYZE:
+ pgstat_recv_analyze(&msg.msg_analyze, len);
+ break;
+
+ case PGSTAT_MTYPE_ARCHIVER:
+ pgstat_recv_archiver(&msg.msg_archiver, len);
+ break;
+
+ case PGSTAT_MTYPE_BGWRITER:
+ pgstat_recv_bgwriter(&msg.msg_bgwriter, len);
+ break;
+
+ case PGSTAT_MTYPE_WAL:
+ pgstat_recv_wal(&msg.msg_wal, len);
+ break;
+
+ case PGSTAT_MTYPE_SLRU:
+ pgstat_recv_slru(&msg.msg_slru, len);
+ break;
+
+ case PGSTAT_MTYPE_FUNCSTAT:
+ pgstat_recv_funcstat(&msg.msg_funcstat, len);
+ break;
+
+ case PGSTAT_MTYPE_FUNCPURGE:
+ pgstat_recv_funcpurge(&msg.msg_funcpurge, len);
+ break;
+
+ case PGSTAT_MTYPE_RECOVERYCONFLICT:
+ pgstat_recv_recoveryconflict(&msg.msg_recoveryconflict,
+ len);
+ break;
+
+ case PGSTAT_MTYPE_DEADLOCK:
+ pgstat_recv_deadlock(&msg.msg_deadlock, len);
+ break;
+
+ case PGSTAT_MTYPE_TEMPFILE:
+ pgstat_recv_tempfile(&msg.msg_tempfile, len);
+ break;
+
+ case PGSTAT_MTYPE_CHECKSUMFAILURE:
+ pgstat_recv_checksum_failure(&msg.msg_checksumfailure,
+ len);
+ break;
+
+ case PGSTAT_MTYPE_REPLSLOT:
+ pgstat_recv_replslot(&msg.msg_replslot, len);
+ break;
+
+ case PGSTAT_MTYPE_CONNECTION:
+ pgstat_recv_connstat(&msg.msg_conn, len);
+ break;
+
+ default:
+ break;
+ }
+
+ return 1;
+}
/* ----------
* pgstat_write_statsfiles() -
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 24c3dd32f8..eed778b779 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2534,7 +2534,6 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
Size nbytes;
Size nleft;
int written;
- instr_time start;
/* OK to write the page(s) */
from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
@@ -2544,28 +2543,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
{
errno = 0;
- /* Measure I/O timing to write WAL data */
- if (track_wal_io_timing)
- INSTR_TIME_SET_CURRENT(start);
-
- pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
- written = pg_pwrite(openLogFile, from, nleft, startoffset);
- pgstat_report_wait_end();
-
- /*
- * Increment the I/O timing and the number of times WAL data
- * were written out to disk.
- */
- if (track_wal_io_timing)
- {
- instr_time duration;
-
- INSTR_TIME_SET_CURRENT(duration);
- INSTR_TIME_SUBTRACT(duration, start);
- WalStats.m_wal_write_time += INSTR_TIME_GET_MICROSEC(duration);
- }
-
- WalStats.m_wal_write++;
+ written = XLogWriteFile(openLogFile, from, nleft, startoffset);
if (written <= 0)
{
@@ -2705,6 +2683,46 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
}
}
+/*
+ * Issue pg_pwrite to write an XLOG file.
+ *
+ * 'fd' is a file descriptor for the XLOG file to write
+ * 'buf' is a buffer starting address to write.
+ * 'nbyte' is a number of max bytes to write up.
+ * 'offset' is a offset of XLOG file to be set.
+ */
+int
+XLogWriteFile(int fd, const void *buf, size_t nbyte, off_t offset)
+{
+ int written;
+ instr_time start;
+
+ /* Measure I/O timing to write WAL data */
+ if (track_wal_io_timing)
+ INSTR_TIME_SET_CURRENT(start);
+
+ pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
+ written = pg_pwrite(fd, buf, nbyte, offset);
+ pgstat_report_wait_end();
+
+ /*
+ * Increment the I/O timing and the number of times WAL data were written
+ * out to disk.
+ */
+ if (track_wal_io_timing)
+ {
+ instr_time duration;
+
+ INSTR_TIME_SET_CURRENT(duration);
+ INSTR_TIME_SUBTRACT(duration, start);
+ WalStats.m_wal_write_time += INSTR_TIME_GET_MICROSEC(duration);
+ }
+
+ WalStats.m_wal_write++;
+
+ return written;
+}
+
/*
* Record the LSN for an asynchronous transaction commit/abort
* and nudge the WALWriter if there is work for it to do.
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 3894f4a270..20454d4040 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -168,6 +168,7 @@ static bool IsCheckpointOnSchedule(double progress);
static bool ImmediateCheckpointRequested(void);
static bool CompactCheckpointerRequestQueue(void);
static void UpdateSharedMemoryConfig(void);
+static void pgstat_send_checkpointer(void);
/* Signal handlers */
static void ReqCheckpointHandler(SIGNAL_ARGS);
@@ -495,17 +496,8 @@ CheckpointerMain(void)
/* Check for archive_timeout and switch xlog files if necessary. */
CheckArchiveTimeout();
- /*
- * Send off activity statistics to the stats collector. (The reason
- * why we re-use bgwriter-related code for this is that the bgwriter
- * and checkpointer used to be just one process. It's probably not
- * worth the trouble to split the stats support into two independent
- * stats message types.)
- */
- pgstat_send_bgwriter();
-
- /* Send WAL statistics to the stats collector. */
- pgstat_report_wal();
+ /* Send the statistics for the checkpointer to the stats collector */
+ pgstat_send_checkpointer();
/*
* If any checkpoint flags have been set, redo the loop to handle the
@@ -572,8 +564,18 @@ HandleCheckpointerInterrupts(void)
* back to the sigsetjmp block above
*/
ExitOnAnyError = true;
- /* Close down the database */
+
+ /*
+ * Close down the database.
+ *
+ * Since ShutdownXLOG() creates restartpoint or checkpoint and updates
+ * the statistics, increment the checkpoint request and send the
+ * statistics to the stats collector.
+ */
+ BgWriterStats.m_requested_checkpoints++;
ShutdownXLOG(0, 0);
+ pgstat_send_checkpointer();
+
/* Normal exit from the checkpointer is here */
proc_exit(0); /* done */
}
@@ -1335,3 +1337,22 @@ FirstCallSinceLastCheckpoint(void)
return FirstCall;
}
+
+/*
+ * Send the statistics for the checkpointer to the stats collector
+ */
+static void
+pgstat_send_checkpointer(void)
+{
+ /*
+ * Send off activity statistics to the stats collector. (The reason why
+ * we re-use bgwriter-related code for this is that the bgwriter and
+ * checkpointer used to be just one process. It's probably not worth the
+ * trouble to split the stats support into two independent stats message
+ * types.)
+ */
+ pgstat_send_bgwriter();
+
+ /* Send WAL statistics to the stats collector. */
+ pgstat_report_wal();
+}
diff --git a/src/backend/postmaster/interrupt.c b/src/backend/postmaster/interrupt.c
index dd9136a942..a50e00f06b 100644
--- a/src/backend/postmaster/interrupt.c
+++ b/src/backend/postmaster/interrupt.c
@@ -108,5 +108,7 @@ SignalHandlerForShutdownRequest(SIGNAL_ARGS)
ShutdownRequestPending = true;
SetLatch(MyLatch);
+ elog(DEBUG3, "received shutdown request signal");
+
errno = save_errno;
}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 68eefb9722..28fc3a80f4 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -329,6 +329,7 @@ static void pgstat_beshutdown_hook(int code, Datum arg);
static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry,
Oid tableoid, bool create);
+static int pgstat_process_message(void);
static void pgstat_write_statsfiles(bool permanent, bool allDbs);
static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
@@ -4810,8 +4811,6 @@ pgstat_send_slru(void)
NON_EXEC_STATIC void
PgstatCollectorMain(int argc, char *argv[])
{
- int len;
- PgStat_Msg msg;
int wr;
WaitEvent event;
WaitEventSet *wes;
@@ -4896,158 +4895,10 @@ PgstatCollectorMain(int argc, char *argv[])
pgstat_write_statsfiles(false, false);
/*
- * Try to receive and process a message. This will not block,
- * since the socket is set to non-blocking mode.
- *
- * XXX On Windows, we have to force pgwin32_recv to cooperate,
- * despite the previous use of pg_set_noblock() on the socket.
- * This is extremely broken and should be fixed someday.
+ * Try to receive and process a message.
*/
-#ifdef WIN32
- pgwin32_noblock = 1;
-#endif
-
- len = recv(pgStatSock, (char *) &msg,
- sizeof(PgStat_Msg), 0);
-
-#ifdef WIN32
- pgwin32_noblock = 0;
-#endif
-
- if (len < 0)
- {
- if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
- break; /* out of inner loop */
- ereport(ERROR,
- (errcode_for_socket_access(),
- errmsg("could not read statistics message: %m")));
- }
-
- /*
- * We ignore messages that are smaller than our common header
- */
- if (len < sizeof(PgStat_MsgHdr))
- continue;
-
- /*
- * The received length must match the length in the header
- */
- if (msg.msg_hdr.m_size != len)
- continue;
-
- /*
- * O.K. - we accept this message. Process it.
- */
- switch (msg.msg_hdr.m_type)
- {
- case PGSTAT_MTYPE_DUMMY:
- break;
-
- case PGSTAT_MTYPE_INQUIRY:
- pgstat_recv_inquiry(&msg.msg_inquiry, len);
- break;
-
- case PGSTAT_MTYPE_TABSTAT:
- pgstat_recv_tabstat(&msg.msg_tabstat, len);
- break;
-
- case PGSTAT_MTYPE_TABPURGE:
- pgstat_recv_tabpurge(&msg.msg_tabpurge, len);
- break;
-
- case PGSTAT_MTYPE_DROPDB:
- pgstat_recv_dropdb(&msg.msg_dropdb, len);
- break;
-
- case PGSTAT_MTYPE_RESETCOUNTER:
- pgstat_recv_resetcounter(&msg.msg_resetcounter, len);
- break;
-
- case PGSTAT_MTYPE_RESETSHAREDCOUNTER:
- pgstat_recv_resetsharedcounter(&msg.msg_resetsharedcounter,
- len);
- break;
-
- case PGSTAT_MTYPE_RESETSINGLECOUNTER:
- pgstat_recv_resetsinglecounter(&msg.msg_resetsinglecounter,
- len);
- break;
-
- case PGSTAT_MTYPE_RESETSLRUCOUNTER:
- pgstat_recv_resetslrucounter(&msg.msg_resetslrucounter,
- len);
- break;
-
- case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER:
- pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter,
- len);
- break;
-
- case PGSTAT_MTYPE_AUTOVAC_START:
- pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
- break;
-
- case PGSTAT_MTYPE_VACUUM:
- pgstat_recv_vacuum(&msg.msg_vacuum, len);
- break;
-
- case PGSTAT_MTYPE_ANALYZE:
- pgstat_recv_analyze(&msg.msg_analyze, len);
- break;
-
- case PGSTAT_MTYPE_ARCHIVER:
- pgstat_recv_archiver(&msg.msg_archiver, len);
- break;
-
- case PGSTAT_MTYPE_BGWRITER:
- pgstat_recv_bgwriter(&msg.msg_bgwriter, len);
- break;
-
- case PGSTAT_MTYPE_WAL:
- pgstat_recv_wal(&msg.msg_wal, len);
- break;
-
- case PGSTAT_MTYPE_SLRU:
- pgstat_recv_slru(&msg.msg_slru, len);
- break;
-
- case PGSTAT_MTYPE_FUNCSTAT:
- pgstat_recv_funcstat(&msg.msg_funcstat, len);
- break;
-
- case PGSTAT_MTYPE_FUNCPURGE:
- pgstat_recv_funcpurge(&msg.msg_funcpurge, len);
- break;
-
- case PGSTAT_MTYPE_RECOVERYCONFLICT:
- pgstat_recv_recoveryconflict(&msg.msg_recoveryconflict,
- len);
- break;
-
- case PGSTAT_MTYPE_DEADLOCK:
- pgstat_recv_deadlock(&msg.msg_deadlock, len);
- break;
-
- case PGSTAT_MTYPE_TEMPFILE:
- pgstat_recv_tempfile(&msg.msg_tempfile, len);
- break;
-
- case PGSTAT_MTYPE_CHECKSUMFAILURE:
- pgstat_recv_checksum_failure(&msg.msg_checksumfailure,
- len);
- break;
-
- case PGSTAT_MTYPE_REPLSLOT:
- pgstat_recv_replslot(&msg.msg_replslot, len);
- break;
-
- case PGSTAT_MTYPE_CONNECTION:
- pgstat_recv_connstat(&msg.msg_conn, len);
- break;
-
- default:
- break;
- }
+ if (pgstat_process_message() < 0)
+ break; /* If an error occurred, go out of inner loop */
} /* end of inner message-processing loop */
/* Sleep until there's something to do */
@@ -5077,6 +4928,21 @@ PgstatCollectorMain(int argc, char *argv[])
break;
} /* end of outer loop */
+ /*
+ * Try to receive and process remaining messages before the process exits.
+ *
+ * The reason is that there is no guarantee all messages were processed in
+ * the above loop even though the stats collector is sent SIGQUIT signal
+ * by the postmaster after other backend and background processes, which
+ * sent their stats to the stats collector, exit if shutdown mode is smart
+ * or fast.
+ *
+ * For example, there might be a case that messages are lost when there
+ * are unprocessed messages, the postmaster send SIGQUIT signal to the
+ * stats collector.
+ */
+ while (pgstat_process_message() > 0);
+
/*
* Save the final stats to reuse at next startup.
*/
@@ -5225,6 +5091,171 @@ pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
return result;
}
+/*
+ * Try to receive and process a message. This will not block,
+ * since the socket is set to non-blocking mode.
+ *
+ * XXX On Windows, we have to force pgwin32_recv to cooperate,
+ * despite the previous use of pg_set_noblock() on the socket.
+ * This is extremely broken and should be fixed someday.
+ *
+ * Return the number of processed message. -1 if an error occurred.
+ */
+static int
+pgstat_process_message()
+{
+ int len;
+ PgStat_Msg msg;
+
+#ifdef WIN32
+ pgwin32_noblock = 1;
+#endif
+
+ len = recv(pgStatSock, (char *) &msg, sizeof(PgStat_Msg), 0);
+
+#ifdef WIN32
+ pgwin32_noblock = 0;
+#endif
+
+ if (len < 0)
+ {
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
+ return -1;
+ ereport(ERROR,
+ (errcode_for_socket_access(),
+ errmsg("could not read statistics message: %m")));
+ }
+
+ /*
+ * We ignore messages that are smaller than our common header
+ */
+ if (len < sizeof(PgStat_MsgHdr))
+ return 0;
+
+ /*
+ * The received length must match the length in the header
+ */
+ if (msg.msg_hdr.m_size != len)
+ return 0;
+
+ /*
+ * O.K. - we accept this message. Process it.
+ */
+ switch (msg.msg_hdr.m_type)
+ {
+ case PGSTAT_MTYPE_DUMMY:
+ break;
+
+ case PGSTAT_MTYPE_INQUIRY:
+ pgstat_recv_inquiry(&msg.msg_inquiry, len);
+ break;
+
+ case PGSTAT_MTYPE_TABSTAT:
+ pgstat_recv_tabstat(&msg.msg_tabstat, len);
+ break;
+
+ case PGSTAT_MTYPE_TABPURGE:
+ pgstat_recv_tabpurge(&msg.msg_tabpurge, len);
+ break;
+
+ case PGSTAT_MTYPE_DROPDB:
+ pgstat_recv_dropdb(&msg.msg_dropdb, len);
+ break;
+
+ case PGSTAT_MTYPE_RESETCOUNTER:
+ pgstat_recv_resetcounter(&msg.msg_resetcounter, len);
+ break;
+
+ case PGSTAT_MTYPE_RESETSHAREDCOUNTER:
+ pgstat_recv_resetsharedcounter(&msg.msg_resetsharedcounter,
+ len);
+ break;
+
+ case PGSTAT_MTYPE_RESETSINGLECOUNTER:
+ pgstat_recv_resetsinglecounter(&msg.msg_resetsinglecounter,
+ len);
+ break;
+
+ case PGSTAT_MTYPE_RESETSLRUCOUNTER:
+ pgstat_recv_resetslrucounter(&msg.msg_resetslrucounter,
+ len);
+ break;
+
+ case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER:
+ pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter,
+ len);
+ break;
+
+ case PGSTAT_MTYPE_AUTOVAC_START:
+ pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
+ break;
+
+ case PGSTAT_MTYPE_VACUUM:
+ pgstat_recv_vacuum(&msg.msg_vacuum, len);
+ break;
+
+ case PGSTAT_MTYPE_ANALYZE:
+ pgstat_recv_analyze(&msg.msg_analyze, len);
+ break;
+
+ case PGSTAT_MTYPE_ARCHIVER:
+ pgstat_recv_archiver(&msg.msg_archiver, len);
+ break;
+
+ case PGSTAT_MTYPE_BGWRITER:
+ elog(DEBUG3, "process BGWRITER stats message");
+ pgstat_recv_bgwriter(&msg.msg_bgwriter, len);
+ break;
+
+ case PGSTAT_MTYPE_WAL:
+ elog(DEBUG3, "process WAL stats message");
+ pgstat_recv_wal(&msg.msg_wal, len);
+ break;
+
+ case PGSTAT_MTYPE_SLRU:
+ pgstat_recv_slru(&msg.msg_slru, len);
+ break;
+
+ case PGSTAT_MTYPE_FUNCSTAT:
+ pgstat_recv_funcstat(&msg.msg_funcstat, len);
+ break;
+
+ case PGSTAT_MTYPE_FUNCPURGE:
+ pgstat_recv_funcpurge(&msg.msg_funcpurge, len);
+ break;
+
+ case PGSTAT_MTYPE_RECOVERYCONFLICT:
+ pgstat_recv_recoveryconflict(&msg.msg_recoveryconflict,
+ len);
+ break;
+
+ case PGSTAT_MTYPE_DEADLOCK:
+ pgstat_recv_deadlock(&msg.msg_deadlock, len);
+ break;
+
+ case PGSTAT_MTYPE_TEMPFILE:
+ pgstat_recv_tempfile(&msg.msg_tempfile, len);
+ break;
+
+ case PGSTAT_MTYPE_CHECKSUMFAILURE:
+ pgstat_recv_checksum_failure(&msg.msg_checksumfailure,
+ len);
+ break;
+
+ case PGSTAT_MTYPE_REPLSLOT:
+ pgstat_recv_replslot(&msg.msg_replslot, len);
+ break;
+
+ case PGSTAT_MTYPE_CONNECTION:
+ pgstat_recv_connstat(&msg.msg_conn, len);
+ break;
+
+ default:
+ break;
+ }
+
+ return 1;
+}
/* ----------
* pgstat_write_statsfiles() -
diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c
index 132df29aba..45c8531ac8 100644
--- a/src/backend/postmaster/walwriter.c
+++ b/src/backend/postmaster/walwriter.c
@@ -78,6 +78,9 @@ int WalWriterFlushAfter = 128;
#define LOOPS_UNTIL_HIBERNATE 50
#define HIBERNATE_FACTOR 25
+/* Prototypes for private functions */
+static void HandleWalWriterInterrupts(void);
+
/*
* Main entry point for walwriter process
*
@@ -242,7 +245,7 @@ WalWriterMain(void)
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
- HandleMainLoopInterrupts();
+ HandleWalWriterInterrupts();
/*
* Do what we're here for; then, if XLogBackgroundFlush() found useful
@@ -272,3 +275,34 @@ WalWriterMain(void)
WAIT_EVENT_WAL_WRITER_MAIN);
}
}
+
+/*
+ * interrupt handler for main loops of WAL writer processes.
+ */
+static void
+HandleWalWriterInterrupts(void)
+{
+ if (ProcSignalBarrierPending)
+ ProcessProcSignalBarrier();
+
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ if (ShutdownRequestPending)
+ {
+ /*
+ * Force to send remaining WAL statistics to the stats collector at
+ * process exits.
+ *
+ * Since pgstat_send_wal is invoked with 'force' is false in main loop
+ * to avoid overloading to the stats collector, there may exist unsent
+ * stats counters for the WAL writer.
+ */
+ pgstat_send_wal(true);
+
+ proc_exit(0);
+ }
+}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 7810ee916c..3abd8ac93b 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -770,6 +770,9 @@ WalRcvDie(int code, Datum arg)
/* Ensure that all WAL records received are flushed to disk */
XLogWalRcvFlush(true);
+ /* Send WAL statistics to the stats collector before terminating */
+ pgstat_send_wal(true);
+
/* Mark ourselves inactive in shared memory */
SpinLockAcquire(&walrcv->mutex);
Assert(walrcv->walRcvState == WALRCV_STREAMING ||
@@ -907,6 +910,12 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
XLogArchiveForceDone(xlogfname);
else
XLogArchiveNotify(xlogfname);
+
+ /*
+ * Send WAL statistics to the stats collector when finishing
+ * the current WAL segment file to avoid overloading it.
+ */
+ pgstat_send_wal(false);
}
recvFile = -1;
@@ -928,7 +937,8 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
/* OK to write the logs */
errno = 0;
- byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
+ byteswritten = XLogWriteFile(recvFile, buf, segbytes, (off_t) startoff);
+
if (byteswritten <= 0)
{
char xlogfname[MAXFNAMELEN];
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 1e53d9d4ca..b345de8a28 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -290,6 +290,7 @@ extern bool XLogBackgroundFlush(void);
extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
extern int XLogFileInit(XLogSegNo segno, bool *use_existent, bool use_lock);
extern int XLogFileOpen(XLogSegNo segno);
+extern int XLogWriteFile(int fd, const void *buf, size_t nbyte, off_t offset);
extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
extern XLogSegNo XLogGetLastRemovedSegno(void);