On Fri, Mar 1, 2024 at 2:06 AM Jeff Davis <pg...@j-davis.com> wrote:
> Added to March CF.
>
> I don't have an immediate use case in mind for this, so please drive
> that part of the discussion. I can't promise this for 17, but if the
> patch is simple enough and a quick consensus develops, then it's
> possible.

[pgss_001.v1.patch] adds a custom resource manager to the
pg_stat_statements extension. The proposed patch is not a complete solution
for pgss and may not work correctly with replication.

The 020_crash.pl test demonstrates server interruption by killing a
backend. Without rm_checkpoint hook, the server restores pgss stats only
after last CHECKPOINT. Data added to WAL before the checkpoint is not
restored.

The rm_checkpoint hook allows saving shared memory data to disk at each
checkpoint. However, for pg_stat_statements, it matters when the checkpoint
occurred. When the server shuts down, pgss deletes the temporary file of
query texts. In other cases, this is unacceptable.
To provide this capability, a flags parameter was added to the
rm_checkpoint hook. The changes are presented in [rmgr_003.v2.patch].

--
Regards,
Daniil Anisimov
Postgres Professional: http://postgrespro.com
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 3e2f1d4a23..ad0a1d5134 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -44,8 +44,8 @@
 
 
 /* must be kept in sync with RmgrData definition in xlog_internal.h */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
-	{ name, redo, desc, identify, startup, cleanup, mask, decode },
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \
+	{ name, redo, desc, identify, startup, cleanup, mask, decode, checkpoint },
 
 RmgrData	RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
@@ -83,6 +83,22 @@ RmgrCleanup(void)
 	}
 }
 
+/*
+ * Checkpoint all resource managers.
+ */
+void
+RmgrCheckpoint(int flags)
+{
+	for (int rmid = 0; rmid <= RM_MAX_ID; rmid++)
+	{
+		if (!RmgrIdExists(rmid))
+			continue;
+
+		if (RmgrTable[rmid].rm_checkpoint != NULL)
+			RmgrTable[rmid].rm_checkpoint(flags);
+	}
+}
+
 /*
  * Emit ERROR when we encounter a record with an RmgrId we don't
  * recognize.
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 20a5f86209..d21bf8ae24 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7357,6 +7357,9 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 	CheckPointSUBTRANS();
 	CheckPointMultiXact();
 	CheckPointPredicate();
+
+	RmgrCheckpoint(flags);
+
 	CheckPointBuffers(flags);
 
 	/* Perform all queued up fsyncs */
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 22f7351fdc..11ae1e7af4 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -28,7 +28,7 @@
  * RmgrNames is an array of the built-in resource manager names, to make error
  * messages a bit nicer.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \
   name,
 
 static const char *const RmgrNames[RM_MAX_ID + 1] = {
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 6b8c17bb4c..2bb5ba8c9f 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -32,7 +32,7 @@
 #include "storage/standbydefs.h"
 #include "utils/relmapper.h"
 
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \
 	{ name, desc, identify},
 
 static const RmgrDescData RmgrDescTable[RM_N_BUILTIN_IDS] = {
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index 3b6a497e1b..34ddc0210c 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -19,7 +19,7 @@ typedef uint8 RmgrId;
  * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG
  * file format.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \
 	symname,
 
 typedef enum RmgrIds
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 78e6b908c6..0b03cc69be 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -24,26 +24,26 @@
  * Changes to this list possibly need an XLOG_PAGE_MAGIC bump.
  */
 
-/* symbol name, textual name, redo, desc, identify, startup, cleanup, mask, decode */
-PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode)
-PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode)
-PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL)
-PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL)
-PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL)
-PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL)
-PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL)
-PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL)
-PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode)
-PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode)
-PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode)
-PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL)
-PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL)
-PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL)
-PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL)
-PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL)
-PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL)
-PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL)
-PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL)
-PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
-PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+/* symbol name, textual name, redo, desc, identify, startup, cleanup, mask, decode, checkpoint */
+PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode, NULL)
+PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode, NULL)
+PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode, NULL)
+PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode, NULL)
+PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode, NULL)
+PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL, NULL)
+PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL, NULL)
+PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL, NULL)
+PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL, NULL)
+PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL, NULL)
+PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL, NULL)
+PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL, NULL)
+PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode, NULL)
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index b88b24f0c1..220f45b0ae 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -356,11 +356,13 @@ typedef struct RmgrData
 	void		(*rm_mask) (char *pagedata, BlockNumber blkno);
 	void		(*rm_decode) (struct LogicalDecodingContext *ctx,
 							  struct XLogRecordBuffer *buf);
+	void		(*rm_checkpoint) (int flags);
 } RmgrData;
 
 extern PGDLLIMPORT RmgrData RmgrTable[];
 extern void RmgrStartup(void);
 extern void RmgrCleanup(void);
+extern void RmgrCheckpoint(int flags);
 extern void RmgrNotFound(RmgrId rmid);
 extern void RegisterCustomRmgr(RmgrId rmid, const RmgrData *rmgr);
 
diff --git a/contrib/pg_stat_statements/expected/wal.out b/contrib/pg_stat_statements/expected/wal.out
index 34a2bf5b03..4b2220a96b 100644
--- a/contrib/pg_stat_statements/expected/wal.out
+++ b/contrib/pg_stat_statements/expected/wal.out
@@ -17,7 +17,7 @@ FROM pg_stat_statements ORDER BY query COLLATE "C";
 --------------------------------------------------------------+-------+------+---------------------+-----------------------+---------------------
  DELETE FROM pgss_wal_tab WHERE a > $1                        |     1 |    1 | t                   | t                     | t
  INSERT INTO pgss_wal_tab VALUES(generate_series($1, $2), $3) |     1 |   10 | t                   | t                     | t
- SELECT pg_stat_statements_reset() IS NOT NULL AS t           |     1 |    1 | f                   | f                     | f
+ SELECT pg_stat_statements_reset() IS NOT NULL AS t           |     1 |    1 | t                   | t                     | t
  SET pg_stat_statements.track_utility = FALSE                 |     1 |    0 | f                   | f                     | t
  UPDATE pgss_wal_tab SET b = $1 WHERE a > $2                  |     1 |    3 | t                   | t                     | t
 (5 rows)
diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index 67cec865ba..d0220fd9eb 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -74,6 +74,10 @@
 #include "utils/memutils.h"
 #include "utils/timestamp.h"
 
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xloginsert.h"
+
 PG_MODULE_MAGIC;
 
 /* Location of permanent stats file (valid when database is shut down) */
@@ -323,7 +327,6 @@ PG_FUNCTION_INFO_V1(pg_stat_statements_info);
 
 static void pgss_shmem_request(void);
 static void pgss_shmem_startup(void);
-static void pgss_shmem_shutdown(int code, Datum arg);
 static void pgss_post_parse_analyze(ParseState *pstate, Query *query,
 									JumbleState *jstate);
 static PlannedStmt *pgss_planner(Query *parse,
@@ -370,6 +373,50 @@ static void fill_in_constant_lengths(JumbleState *jstate, const char *query,
 									 int query_loc);
 static int	comp_location(const void *a, const void *b);
 
+/* RMGR API */
+#define CUSTOMRMGR_ID		RM_EXPERIMENTAL_ID
+#define CUSTOMRMGR_NAME		"pgss_rmgr"
+
+static void rmgr_redo(XLogReaderState *record);
+static void rmgr_desc(StringInfo buf, XLogReaderState *record);
+static const char *rmgr_identify(uint8 info);
+static void rmgr_checkpoint(int flags);
+
+/* WAL record definitions */
+#define PGSS_XLOG_INSERT	0x00
+#define PGSS_XLOG_RESET		0x10
+
+/* The necessary fields from pgssEntry */
+typedef struct pgssXLogInsert
+{
+	uint32		header;
+	pgssHashKey key;
+	Counters	counters;
+	int			encoding;
+	TimestampTz stats_since;
+	TimestampTz minmax_stats_since;
+	int			query_len;
+	char		qtext[FLEXIBLE_ARRAY_MEMBER];
+} pgssXLogInsert;
+
+/* The params of entry_reset() function */
+typedef struct pgssXLogReset
+{
+	uint32		header;
+	Oid			userid;
+	uint64		queryid;
+	Oid			dbid;
+	bool		minmax_only;
+} pgssXLogReset;
+
+/* RMGR data */
+const RmgrData pgss_rmgr = {
+	.rm_name = CUSTOMRMGR_NAME,
+	.rm_redo = rmgr_redo,
+	.rm_checkpoint = rmgr_checkpoint,
+	.rm_identify = rmgr_identify,
+	.rm_desc = rmgr_desc
+};
 
 /*
  * Module load callback
@@ -457,6 +504,8 @@ _PG_init(void)
 
 	MarkGUCPrefixReserved("pg_stat_statements");
 
+	RegisterCustomRmgr(CUSTOMRMGR_ID, &pgss_rmgr);
+
 	/*
 	 * Install hooks.
 	 */
@@ -556,9 +605,12 @@ pgss_shmem_startup(void)
 	/*
 	 * If we're in the postmaster (or a standalone backend...), set up a shmem
 	 * exit hook to dump the statistics to disk.
+	 *
+	 * Now we do it at CHECKPOINT.
+	 *
+	 *if (!IsUnderPostmaster)
+	 *	on_shmem_exit(pgss_shmem_shutdown, (Datum) 0);
 	 */
-	if (!IsUnderPostmaster)
-		on_shmem_exit(pgss_shmem_shutdown, (Datum) 0);
 
 	/*
 	 * Done if some other process already completed our initialization.
@@ -720,108 +772,6 @@ fail:
 	 */
 }
 
-/*
- * shmem_shutdown hook: Dump statistics into file.
- *
- * Note: we don't bother with acquiring lock, because there should be no
- * other processes running when this is called.
- */
-static void
-pgss_shmem_shutdown(int code, Datum arg)
-{
-	FILE	   *file;
-	char	   *qbuffer = NULL;
-	Size		qbuffer_size = 0;
-	HASH_SEQ_STATUS hash_seq;
-	int32		num_entries;
-	pgssEntry  *entry;
-
-	/* Don't try to dump during a crash. */
-	if (code)
-		return;
-
-	/* Safety check ... shouldn't get here unless shmem is set up. */
-	if (!pgss || !pgss_hash)
-		return;
-
-	/* Don't dump if told not to. */
-	if (!pgss_save)
-		return;
-
-	file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W);
-	if (file == NULL)
-		goto error;
-
-	if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1)
-		goto error;
-	if (fwrite(&PGSS_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1)
-		goto error;
-	num_entries = hash_get_num_entries(pgss_hash);
-	if (fwrite(&num_entries, sizeof(int32), 1, file) != 1)
-		goto error;
-
-	qbuffer = qtext_load_file(&qbuffer_size);
-	if (qbuffer == NULL)
-		goto error;
-
-	/*
-	 * When serializing to disk, we store query texts immediately after their
-	 * entry data.  Any orphaned query texts are thereby excluded.
-	 */
-	hash_seq_init(&hash_seq, pgss_hash);
-	while ((entry = hash_seq_search(&hash_seq)) != NULL)
-	{
-		int			len = entry->query_len;
-		char	   *qstr = qtext_fetch(entry->query_offset, len,
-									   qbuffer, qbuffer_size);
-
-		if (qstr == NULL)
-			continue;			/* Ignore any entries with bogus texts */
-
-		if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1 ||
-			fwrite(qstr, 1, len + 1, file) != len + 1)
-		{
-			/* note: we assume hash_seq_term won't change errno */
-			hash_seq_term(&hash_seq);
-			goto error;
-		}
-	}
-
-	/* Dump global statistics for pg_stat_statements */
-	if (fwrite(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1)
-		goto error;
-
-	free(qbuffer);
-	qbuffer = NULL;
-
-	if (FreeFile(file))
-	{
-		file = NULL;
-		goto error;
-	}
-
-	/*
-	 * Rename file into place, so we atomically replace any old one.
-	 */
-	(void) durable_rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE, LOG);
-
-	/* Unlink query-texts file; it's not needed while shutdown */
-	unlink(PGSS_TEXT_FILE);
-
-	return;
-
-error:
-	ereport(LOG,
-			(errcode_for_file_access(),
-			 errmsg("could not write file \"%s\": %m",
-					PGSS_DUMP_FILE ".tmp")));
-	free(qbuffer);
-	if (file)
-		FreeFile(file);
-	unlink(PGSS_DUMP_FILE ".tmp");
-	unlink(PGSS_TEXT_FILE);
-}
-
 /*
  * Post-parse-analysis hook: mark query with a queryId
  */
@@ -1284,6 +1234,7 @@ pgss_store(const char *query, uint64 queryId,
 	pgssEntry  *entry;
 	char	   *norm_query = NULL;
 	int			encoding = GetDatabaseEncoding();
+	bool		qtext_stored = false;
 
 	Assert(query != NULL);
 
@@ -1325,7 +1276,6 @@ pgss_store(const char *query, uint64 queryId,
 	{
 		Size		query_offset;
 		int			gc_count;
-		bool		stored;
 		bool		do_gc;
 
 		/*
@@ -1345,7 +1295,7 @@ pgss_store(const char *query, uint64 queryId,
 		}
 
 		/* Append new query text to file with only shared lock held */
-		stored = qtext_store(norm_query ? norm_query : query, query_len,
+		qtext_stored = qtext_store(norm_query ? norm_query : query, query_len,
 							 &query_offset, &gc_count);
 
 		/*
@@ -1366,12 +1316,12 @@ pgss_store(const char *query, uint64 queryId,
 		 * This should be infrequent enough that doing it while holding
 		 * exclusive lock isn't a performance problem.
 		 */
-		if (!stored || pgss->gc_count != gc_count)
-			stored = qtext_store(norm_query ? norm_query : query, query_len,
+		if (!qtext_stored || pgss->gc_count != gc_count)
+			qtext_stored = qtext_store(norm_query ? norm_query : query, query_len,
 								 &query_offset, NULL);
 
 		/* If we failed to write to the text file, give up */
-		if (!stored)
+		if (!qtext_stored)
 			goto done;
 
 		/* OK to create a new hashtable entry */
@@ -1486,6 +1436,38 @@ pgss_store(const char *query, uint64 queryId,
 		SpinLockRelease(&e->mutex);
 	}
 
+	/* Write entry to XLOG */
+	if (pgss_save && !RecoveryInProgress())
+	{
+		pgssXLogInsert *xlog_entry;
+		XLogRecPtr	ptr;
+
+		xlog_entry = palloc(sizeof(pgssXLogInsert));
+		xlog_entry->header = PGSS_FILE_HEADER;
+		xlog_entry->key = entry->key;
+		xlog_entry->counters = entry->counters;
+		xlog_entry->encoding = entry->encoding;
+		xlog_entry->stats_since = entry->stats_since;
+		xlog_entry->minmax_stats_since = entry->minmax_stats_since;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) xlog_entry, offsetof(pgssXLogInsert, qtext));
+
+		/* Write the query text if need */
+		if (qtext_stored)
+		{
+			xlog_entry->query_len = entry->query_len;
+			XLogRegisterData(norm_query ? norm_query : (char *) query, query_len);
+		}
+		else
+			xlog_entry->query_len = 0;
+
+		XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
+		ptr = XLogInsert(CUSTOMRMGR_ID, PGSS_XLOG_INSERT);
+		XLogFlush(ptr);
+		pfree(xlog_entry);
+	}
+
 done:
 	LWLockRelease(pgss->lock);
 
@@ -2676,6 +2658,27 @@ entry_reset(Oid userid, Oid dbid, uint64 queryid, bool minmax_only)
 	LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
 	num_entries = hash_get_num_entries(pgss_hash);
 
+	/* Write entry to XLOG */
+	if (pgss_save && !RecoveryInProgress())
+	{
+		pgssXLogReset *xlrec;
+		XLogRecPtr	ptr;
+
+		xlrec = palloc0(sizeof(pgssXLogReset));
+		xlrec->header = PGSS_FILE_HEADER;
+		xlrec->userid = userid;
+		xlrec->dbid = dbid;
+		xlrec->queryid = queryid;
+		xlrec->minmax_only = minmax_only;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) xlrec, sizeof(pgssXLogReset));
+		XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
+		ptr = XLogInsert(CUSTOMRMGR_ID, PGSS_XLOG_RESET);
+		XLogFlush(ptr);
+		pfree(xlrec);
+	}
+
 	stats_reset = GetCurrentTimestamp();
 
 	if (userid != 0 && dbid != 0 && queryid != UINT64CONST(0))
@@ -3010,3 +3013,236 @@ comp_location(const void *a, const void *b)
 
 	return pg_cmp_s32(l, r);
 }
+
+static void
+rmgr_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	/*
+	 * Because we did not restore records from storage,
+	 * we also do not restore records from WAL.
+	 */
+	if (!pgss_save)
+		return;
+
+	if (info == PGSS_XLOG_INSERT)
+	{
+		pgssXLogInsert *xlrec = (pgssXLogInsert *) XLogRecGetData(record);
+		pgssEntry  *entry;
+
+		if (xlrec->header != PGSS_FILE_HEADER)
+		{
+			elog(WARNING, "Skip the inconsistent WAL record");
+			return;
+		}
+
+		/* Safety check... */
+		if (!pgss || !pgss_hash)
+			return;
+
+		LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
+
+		entry = (pgssEntry *) hash_search(pgss_hash, &xlrec->key, HASH_FIND, NULL);
+
+		/* Create new entry, if not present */
+		if (!entry)
+		{
+			Size		query_offset;
+			bool		stored;
+			char	   *query;
+
+			Assert(xlrec->query_len > 0);
+
+			query = (char *) xlrec->qtext;
+
+			/* Append new query text to file */
+			stored = qtext_store(query, xlrec->query_len, &query_offset, NULL);
+
+			/* If we failed to write to the text file, give up */
+			if (!stored)
+			{
+				LWLockRelease(pgss->lock);
+				return;
+			}
+
+			/* OK to create a new hashtable entry */
+			entry = entry_alloc(&xlrec->key, query_offset, xlrec->query_len,
+								xlrec->encoding, false);
+
+			/* If needed, perform garbage collection */
+			gc_qtexts();
+		}
+
+		/* Copy the necessary data from XLog record */
+		entry->counters = xlrec->counters;
+
+		entry->encoding = xlrec->encoding;
+		entry->stats_since = xlrec->stats_since;
+		entry->minmax_stats_since = xlrec->minmax_stats_since;
+
+		LWLockRelease(pgss->lock);
+	}
+	else if (info == PGSS_XLOG_RESET)
+	{
+		pgssXLogReset *xlrec = (pgssXLogReset *) XLogRecGetData(record);
+
+		if (xlrec->header != PGSS_FILE_HEADER)
+		{
+			elog(WARNING, "Skip the inconsistent WAL record");
+			return;
+		}
+
+		entry_reset(xlrec->userid, xlrec->dbid, xlrec->queryid, xlrec->minmax_only);
+	}
+}
+
+static void
+rmgr_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info == PGSS_XLOG_INSERT)
+	{
+		pgssXLogInsert *xlrec = (pgssXLogInsert *) rec;
+
+		if (xlrec->header != PGSS_FILE_HEADER)
+		{
+			elog(WARNING, "Skip the inconsistent WAL record");
+			return;
+		}
+
+		appendStringInfo(buf, "userid: %u, dbid: %u, queryid: " UINT64_FORMAT
+						 ", toplevel: %d",
+						 xlrec->key.userid, xlrec->key.dbid, xlrec->key.queryid,
+						 (int) xlrec->key.toplevel);
+	}
+	else if (info == PGSS_XLOG_RESET)
+	{
+		pgssXLogReset *xlrec = (pgssXLogReset *) rec;
+
+		if (xlrec->header != PGSS_FILE_HEADER)
+		{
+			elog(WARNING, "Skip the inconsistent WAL record");
+			return;
+		}
+
+		appendStringInfo(buf, "userid: %u, dbid: %u, queryid: " UINT64_FORMAT
+						 ", minmax_only: %d",
+						 xlrec->userid, xlrec->dbid, xlrec->queryid,
+						 (int) xlrec->minmax_only);
+	}
+}
+
+static const char *
+rmgr_identify(uint8 info)
+{
+	if ((info & ~XLR_INFO_MASK) == PGSS_XLOG_INSERT)
+		return "INSERT";
+	if ((info & ~XLR_INFO_MASK) == PGSS_XLOG_RESET)
+		return "RESET";
+
+	return NULL;
+}
+
+static void
+rmgr_checkpoint(int flags)
+{
+	FILE	   *file;
+	char	   *qbuffer = NULL;
+	Size		qbuffer_size = 0;
+	HASH_SEQ_STATUS hash_seq;
+	int32		num_entries;
+	pgssEntry  *entry;
+
+	/* Safety check ... shouldn't get here unless shmem is set up. */
+	if (!pgss || !pgss_hash)
+		return;
+
+	/* Don't dump if told not to. */
+	if (!pgss_save)
+		return;
+
+	/* XXX: Can there be concurrent CHECKPOINTs? */
+	LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
+
+	file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W);
+	if (file == NULL)
+		goto error;
+
+	if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1)
+		goto error;
+	if (fwrite(&PGSS_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1)
+		goto error;
+	num_entries = hash_get_num_entries(pgss_hash);
+	if (fwrite(&num_entries, sizeof(int32), 1, file) != 1)
+		goto error;
+
+	qbuffer = qtext_load_file(&qbuffer_size);
+	if (qbuffer == NULL)
+		goto error;
+
+	/*
+	 * When serializing to disk, we store query texts immediately after their
+	 * entry data.  Any orphaned query texts are thereby excluded.
+	 */
+	hash_seq_init(&hash_seq, pgss_hash);
+	while ((entry = hash_seq_search(&hash_seq)) != NULL)
+	{
+		int			len = entry->query_len;
+		char	   *qstr = qtext_fetch(entry->query_offset, len,
+									   qbuffer, qbuffer_size);
+
+		if (qstr == NULL)
+			continue;			/* Ignore any entries with bogus texts */
+
+		if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1 ||
+			fwrite(qstr, 1, len + 1, file) != len + 1)
+		{
+			/* note: we assume hash_seq_term won't change errno */
+			hash_seq_term(&hash_seq);
+			goto error;
+		}
+	}
+
+	/* Dump global statistics for pg_stat_statements */
+	if (fwrite(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1)
+		goto error;
+
+	free(qbuffer);
+	qbuffer = NULL;
+
+	if (FreeFile(file))
+	{
+		file = NULL;
+		goto error;
+	}
+
+	/*
+	 * Rename file into place, so we atomically replace any old one.
+	 */
+	(void) durable_rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE, LOG);
+
+	/* Unlink query-texts file; it's not needed while shutdown */
+	if (flags & CHECKPOINT_IS_SHUTDOWN)
+		unlink(PGSS_TEXT_FILE);
+
+	LWLockRelease(pgss->lock);
+	return;
+
+error:
+	ereport(LOG,
+			(errcode_for_file_access(),
+			 errmsg("could not write file \"%s\": %m",
+					PGSS_DUMP_FILE ".tmp")));
+	free(qbuffer);
+	if (file)
+		FreeFile(file);
+	unlink(PGSS_DUMP_FILE ".tmp");
+
+	if (flags & CHECKPOINT_IS_SHUTDOWN)
+		unlink(PGSS_TEXT_FILE);
+
+	LWLockRelease(pgss->lock);
+}
diff --git a/contrib/pg_stat_statements/t/020_crash.pl b/contrib/pg_stat_statements/t/020_crash.pl
new file mode 100644
index 0000000000..f33bb43d7d
--- /dev/null
+++ b/contrib/pg_stat_statements/t/020_crash.pl
@@ -0,0 +1,80 @@
+# Copyright (c) 2023-2024, PostgreSQL Global Development Group
+
+# Tests for checking that pg_stat_statements contents are preserved
+# across restarts.
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('main');
+$node->init;
+$node->append_conf('postgresql.conf',
+	q[
+		shared_preload_libraries = 'pg_stat_statements'
+		restart_after_crash = 1
+	]);
+$node->start;
+
+$node->safe_psql('postgres', 'CREATE EXTENSION pg_stat_statements');
+
+# Without the CHECKPOINT hook, we won't see this query in pg_stat_statements
+# after a server crash.
+$node->safe_psql('postgres', 'CREATE TABLE t1 (a int)');
+
+$node->safe_psql('postgres', 'CHECKPOINT');
+$node->safe_psql('postgres', 'SELECT a FROM t1');
+
+is( $node->safe_psql(
+		'postgres',
+		"SELECT query FROM pg_stat_statements WHERE query NOT LIKE '%pg_stat_statements%' ORDER BY query"
+	),
+	"CHECKPOINT\nCREATE TABLE t1 (a int)\nSELECT a FROM t1",
+	'pg_stat_statements populated');
+
+
+# Perform a server shutdown by killing the backend.
+my $psql_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);
+
+my ($killme_stdin, $killme_stdout, $killme_stderr) = ('', '', '');
+my $killme = IPC::Run::start(
+	[
+		'psql', '-X', '-qAt', '-v', 'ON_ERROR_STOP=1', '-f', '-', '-d',
+		$node->connstr('postgres')
+	],
+	'<',
+	\$killme_stdin,
+	'>',
+	\$killme_stdout,
+	'2>',
+	\$killme_stderr,
+	$psql_timeout);
+
+$killme_stdin .= "SELECT pg_backend_pid();\n";
+ok( pump_until(
+		$killme, $psql_timeout, \$killme_stdout, qr/[[:digit:]]+[\r\n]$/m),
+	'acquired pid for SIGQUIT');
+my $pid = $killme_stdout;
+chomp($pid);
+
+my $ret = PostgreSQL::Test::Utils::system_log('pg_ctl', 'kill', 'QUIT', $pid);
+is($ret, 0, "killed process with SIGQUIT");
+
+$killme->finish;
+
+# Wait till server restarts
+is($node->poll_query_until('postgres', undef, ''),
+	"1", "reconnected after SIGQUIT");
+
+is( $node->safe_psql(
+		'postgres',
+		"SELECT query FROM pg_stat_statements WHERE query NOT LIKE '%pg_stat_statements%' ORDER BY query"
+	),
+	"CHECKPOINT\nCREATE TABLE t1 (a int)\nSELECT a FROM t1\nSELECT pg_backend_pid()",
+	'pg_stat_statements data kept across the server crash');
+
+$node->stop;
+
+done_testing();

Reply via email to