On Fri, Mar 1, 2024 at 2:06 AM Jeff Davis <pg...@j-davis.com> wrote: > Added to March CF. > > I don't have an immediate use case in mind for this, so please drive > that part of the discussion. I can't promise this for 17, but if the > patch is simple enough and a quick consensus develops, then it's > possible.
[pgss_001.v1.patch] adds a custom resource manager to the pg_stat_statements extension. The proposed patch is not a complete solution for pgss and may not work correctly with replication. The 020_crash.pl test demonstrates server interruption by killing a backend. Without rm_checkpoint hook, the server restores pgss stats only after last CHECKPOINT. Data added to WAL before the checkpoint is not restored. The rm_checkpoint hook allows saving shared memory data to disk at each checkpoint. However, for pg_stat_statements, it matters when the checkpoint occurred. When the server shuts down, pgss deletes the temporary file of query texts. In other cases, this is unacceptable. To provide this capability, a flags parameter was added to the rm_checkpoint hook. The changes are presented in [rmgr_003.v2.patch]. -- Regards, Daniil Anisimov Postgres Professional: http://postgrespro.com
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 3e2f1d4a23..ad0a1d5134 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -44,8 +44,8 @@ /* must be kept in sync with RmgrData definition in xlog_internal.h */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ - { name, redo, desc, identify, startup, cleanup, mask, decode }, +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \ + { name, redo, desc, identify, startup, cleanup, mask, decode, checkpoint }, RmgrData RmgrTable[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" @@ -83,6 +83,22 @@ RmgrCleanup(void) } } +/* + * Checkpoint all resource managers. + */ +void +RmgrCheckpoint(int flags) +{ + for (int rmid = 0; rmid <= RM_MAX_ID; rmid++) + { + if (!RmgrIdExists(rmid)) + continue; + + if (RmgrTable[rmid].rm_checkpoint != NULL) + RmgrTable[rmid].rm_checkpoint(flags); + } +} + /* * Emit ERROR when we encounter a record with an RmgrId we don't * recognize. diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 20a5f86209..d21bf8ae24 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7357,6 +7357,9 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) CheckPointSUBTRANS(); CheckPointMultiXact(); CheckPointPredicate(); + + RmgrCheckpoint(flags); + CheckPointBuffers(flags); /* Perform all queued up fsyncs */ diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 22f7351fdc..11ae1e7af4 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -28,7 +28,7 @@ * RmgrNames is an array of the built-in resource manager names, to make error * messages a bit nicer. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \ name, static const char *const RmgrNames[RM_MAX_ID + 1] = { diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 6b8c17bb4c..2bb5ba8c9f 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -32,7 +32,7 @@ #include "storage/standbydefs.h" #include "utils/relmapper.h" -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \ { name, desc, identify}, static const RmgrDescData RmgrDescTable[RM_N_BUILTIN_IDS] = { diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index 3b6a497e1b..34ddc0210c 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -19,7 +19,7 @@ typedef uint8 RmgrId; * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG * file format. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \ symname, typedef enum RmgrIds diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 78e6b908c6..0b03cc69be 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -24,26 +24,26 @@ * Changes to this list possibly need an XLOG_PAGE_MAGIC bump. */ -/* symbol name, textual name, redo, desc, identify, startup, cleanup, mask, decode */ -PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode) -PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode) -PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode) -PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode) -PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode) -PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL) -PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL) -PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL) -PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL) -PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL) -PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL) -PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL) -PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL) -PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode) +/* symbol name, textual name, redo, desc, identify, startup, cleanup, mask, decode, checkpoint */ +PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode, NULL) +PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode, NULL) +PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode, NULL) +PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode, NULL) +PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode, NULL) +PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL, NULL) +PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL, NULL) +PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL, NULL) +PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL, NULL) +PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL, NULL) +PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL, NULL) +PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL, NULL) +PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL, NULL) +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode, NULL) diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index b88b24f0c1..220f45b0ae 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -356,11 +356,13 @@ typedef struct RmgrData void (*rm_mask) (char *pagedata, BlockNumber blkno); void (*rm_decode) (struct LogicalDecodingContext *ctx, struct XLogRecordBuffer *buf); + void (*rm_checkpoint) (int flags); } RmgrData; extern PGDLLIMPORT RmgrData RmgrTable[]; extern void RmgrStartup(void); extern void RmgrCleanup(void); +extern void RmgrCheckpoint(int flags); extern void RmgrNotFound(RmgrId rmid); extern void RegisterCustomRmgr(RmgrId rmid, const RmgrData *rmgr);
diff --git a/contrib/pg_stat_statements/expected/wal.out b/contrib/pg_stat_statements/expected/wal.out index 34a2bf5b03..4b2220a96b 100644 --- a/contrib/pg_stat_statements/expected/wal.out +++ b/contrib/pg_stat_statements/expected/wal.out @@ -17,7 +17,7 @@ FROM pg_stat_statements ORDER BY query COLLATE "C"; --------------------------------------------------------------+-------+------+---------------------+-----------------------+--------------------- DELETE FROM pgss_wal_tab WHERE a > $1 | 1 | 1 | t | t | t INSERT INTO pgss_wal_tab VALUES(generate_series($1, $2), $3) | 1 | 10 | t | t | t - SELECT pg_stat_statements_reset() IS NOT NULL AS t | 1 | 1 | f | f | f + SELECT pg_stat_statements_reset() IS NOT NULL AS t | 1 | 1 | t | t | t SET pg_stat_statements.track_utility = FALSE | 1 | 0 | f | f | t UPDATE pgss_wal_tab SET b = $1 WHERE a > $2 | 1 | 3 | t | t | t (5 rows) diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c index 67cec865ba..d0220fd9eb 100644 --- a/contrib/pg_stat_statements/pg_stat_statements.c +++ b/contrib/pg_stat_statements/pg_stat_statements.c @@ -74,6 +74,10 @@ #include "utils/memutils.h" #include "utils/timestamp.h" +#include "access/xlog.h" +#include "access/xlog_internal.h" +#include "access/xloginsert.h" + PG_MODULE_MAGIC; /* Location of permanent stats file (valid when database is shut down) */ @@ -323,7 +327,6 @@ PG_FUNCTION_INFO_V1(pg_stat_statements_info); static void pgss_shmem_request(void); static void pgss_shmem_startup(void); -static void pgss_shmem_shutdown(int code, Datum arg); static void pgss_post_parse_analyze(ParseState *pstate, Query *query, JumbleState *jstate); static PlannedStmt *pgss_planner(Query *parse, @@ -370,6 +373,50 @@ static void fill_in_constant_lengths(JumbleState *jstate, const char *query, int query_loc); static int comp_location(const void *a, const void *b); +/* RMGR API */ +#define CUSTOMRMGR_ID RM_EXPERIMENTAL_ID +#define CUSTOMRMGR_NAME "pgss_rmgr" + +static void rmgr_redo(XLogReaderState *record); +static void rmgr_desc(StringInfo buf, XLogReaderState *record); +static const char *rmgr_identify(uint8 info); +static void rmgr_checkpoint(int flags); + +/* WAL record definitions */ +#define PGSS_XLOG_INSERT 0x00 +#define PGSS_XLOG_RESET 0x10 + +/* The necessary fields from pgssEntry */ +typedef struct pgssXLogInsert +{ + uint32 header; + pgssHashKey key; + Counters counters; + int encoding; + TimestampTz stats_since; + TimestampTz minmax_stats_since; + int query_len; + char qtext[FLEXIBLE_ARRAY_MEMBER]; +} pgssXLogInsert; + +/* The params of entry_reset() function */ +typedef struct pgssXLogReset +{ + uint32 header; + Oid userid; + uint64 queryid; + Oid dbid; + bool minmax_only; +} pgssXLogReset; + +/* RMGR data */ +const RmgrData pgss_rmgr = { + .rm_name = CUSTOMRMGR_NAME, + .rm_redo = rmgr_redo, + .rm_checkpoint = rmgr_checkpoint, + .rm_identify = rmgr_identify, + .rm_desc = rmgr_desc +}; /* * Module load callback @@ -457,6 +504,8 @@ _PG_init(void) MarkGUCPrefixReserved("pg_stat_statements"); + RegisterCustomRmgr(CUSTOMRMGR_ID, &pgss_rmgr); + /* * Install hooks. */ @@ -556,9 +605,12 @@ pgss_shmem_startup(void) /* * If we're in the postmaster (or a standalone backend...), set up a shmem * exit hook to dump the statistics to disk. + * + * Now we do it at CHECKPOINT. + * + *if (!IsUnderPostmaster) + * on_shmem_exit(pgss_shmem_shutdown, (Datum) 0); */ - if (!IsUnderPostmaster) - on_shmem_exit(pgss_shmem_shutdown, (Datum) 0); /* * Done if some other process already completed our initialization. @@ -720,108 +772,6 @@ fail: */ } -/* - * shmem_shutdown hook: Dump statistics into file. - * - * Note: we don't bother with acquiring lock, because there should be no - * other processes running when this is called. - */ -static void -pgss_shmem_shutdown(int code, Datum arg) -{ - FILE *file; - char *qbuffer = NULL; - Size qbuffer_size = 0; - HASH_SEQ_STATUS hash_seq; - int32 num_entries; - pgssEntry *entry; - - /* Don't try to dump during a crash. */ - if (code) - return; - - /* Safety check ... shouldn't get here unless shmem is set up. */ - if (!pgss || !pgss_hash) - return; - - /* Don't dump if told not to. */ - if (!pgss_save) - return; - - file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W); - if (file == NULL) - goto error; - - if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1) - goto error; - if (fwrite(&PGSS_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1) - goto error; - num_entries = hash_get_num_entries(pgss_hash); - if (fwrite(&num_entries, sizeof(int32), 1, file) != 1) - goto error; - - qbuffer = qtext_load_file(&qbuffer_size); - if (qbuffer == NULL) - goto error; - - /* - * When serializing to disk, we store query texts immediately after their - * entry data. Any orphaned query texts are thereby excluded. - */ - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) - { - int len = entry->query_len; - char *qstr = qtext_fetch(entry->query_offset, len, - qbuffer, qbuffer_size); - - if (qstr == NULL) - continue; /* Ignore any entries with bogus texts */ - - if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1 || - fwrite(qstr, 1, len + 1, file) != len + 1) - { - /* note: we assume hash_seq_term won't change errno */ - hash_seq_term(&hash_seq); - goto error; - } - } - - /* Dump global statistics for pg_stat_statements */ - if (fwrite(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1) - goto error; - - free(qbuffer); - qbuffer = NULL; - - if (FreeFile(file)) - { - file = NULL; - goto error; - } - - /* - * Rename file into place, so we atomically replace any old one. - */ - (void) durable_rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE, LOG); - - /* Unlink query-texts file; it's not needed while shutdown */ - unlink(PGSS_TEXT_FILE); - - return; - -error: - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write file \"%s\": %m", - PGSS_DUMP_FILE ".tmp"))); - free(qbuffer); - if (file) - FreeFile(file); - unlink(PGSS_DUMP_FILE ".tmp"); - unlink(PGSS_TEXT_FILE); -} - /* * Post-parse-analysis hook: mark query with a queryId */ @@ -1284,6 +1234,7 @@ pgss_store(const char *query, uint64 queryId, pgssEntry *entry; char *norm_query = NULL; int encoding = GetDatabaseEncoding(); + bool qtext_stored = false; Assert(query != NULL); @@ -1325,7 +1276,6 @@ pgss_store(const char *query, uint64 queryId, { Size query_offset; int gc_count; - bool stored; bool do_gc; /* @@ -1345,7 +1295,7 @@ pgss_store(const char *query, uint64 queryId, } /* Append new query text to file with only shared lock held */ - stored = qtext_store(norm_query ? norm_query : query, query_len, + qtext_stored = qtext_store(norm_query ? norm_query : query, query_len, &query_offset, &gc_count); /* @@ -1366,12 +1316,12 @@ pgss_store(const char *query, uint64 queryId, * This should be infrequent enough that doing it while holding * exclusive lock isn't a performance problem. */ - if (!stored || pgss->gc_count != gc_count) - stored = qtext_store(norm_query ? norm_query : query, query_len, + if (!qtext_stored || pgss->gc_count != gc_count) + qtext_stored = qtext_store(norm_query ? norm_query : query, query_len, &query_offset, NULL); /* If we failed to write to the text file, give up */ - if (!stored) + if (!qtext_stored) goto done; /* OK to create a new hashtable entry */ @@ -1486,6 +1436,38 @@ pgss_store(const char *query, uint64 queryId, SpinLockRelease(&e->mutex); } + /* Write entry to XLOG */ + if (pgss_save && !RecoveryInProgress()) + { + pgssXLogInsert *xlog_entry; + XLogRecPtr ptr; + + xlog_entry = palloc(sizeof(pgssXLogInsert)); + xlog_entry->header = PGSS_FILE_HEADER; + xlog_entry->key = entry->key; + xlog_entry->counters = entry->counters; + xlog_entry->encoding = entry->encoding; + xlog_entry->stats_since = entry->stats_since; + xlog_entry->minmax_stats_since = entry->minmax_stats_since; + + XLogBeginInsert(); + XLogRegisterData((char *) xlog_entry, offsetof(pgssXLogInsert, qtext)); + + /* Write the query text if need */ + if (qtext_stored) + { + xlog_entry->query_len = entry->query_len; + XLogRegisterData(norm_query ? norm_query : (char *) query, query_len); + } + else + xlog_entry->query_len = 0; + + XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT); + ptr = XLogInsert(CUSTOMRMGR_ID, PGSS_XLOG_INSERT); + XLogFlush(ptr); + pfree(xlog_entry); + } + done: LWLockRelease(pgss->lock); @@ -2676,6 +2658,27 @@ entry_reset(Oid userid, Oid dbid, uint64 queryid, bool minmax_only) LWLockAcquire(pgss->lock, LW_EXCLUSIVE); num_entries = hash_get_num_entries(pgss_hash); + /* Write entry to XLOG */ + if (pgss_save && !RecoveryInProgress()) + { + pgssXLogReset *xlrec; + XLogRecPtr ptr; + + xlrec = palloc0(sizeof(pgssXLogReset)); + xlrec->header = PGSS_FILE_HEADER; + xlrec->userid = userid; + xlrec->dbid = dbid; + xlrec->queryid = queryid; + xlrec->minmax_only = minmax_only; + + XLogBeginInsert(); + XLogRegisterData((char *) xlrec, sizeof(pgssXLogReset)); + XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT); + ptr = XLogInsert(CUSTOMRMGR_ID, PGSS_XLOG_RESET); + XLogFlush(ptr); + pfree(xlrec); + } + stats_reset = GetCurrentTimestamp(); if (userid != 0 && dbid != 0 && queryid != UINT64CONST(0)) @@ -3010,3 +3013,236 @@ comp_location(const void *a, const void *b) return pg_cmp_s32(l, r); } + +static void +rmgr_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + /* + * Because we did not restore records from storage, + * we also do not restore records from WAL. + */ + if (!pgss_save) + return; + + if (info == PGSS_XLOG_INSERT) + { + pgssXLogInsert *xlrec = (pgssXLogInsert *) XLogRecGetData(record); + pgssEntry *entry; + + if (xlrec->header != PGSS_FILE_HEADER) + { + elog(WARNING, "Skip the inconsistent WAL record"); + return; + } + + /* Safety check... */ + if (!pgss || !pgss_hash) + return; + + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + + entry = (pgssEntry *) hash_search(pgss_hash, &xlrec->key, HASH_FIND, NULL); + + /* Create new entry, if not present */ + if (!entry) + { + Size query_offset; + bool stored; + char *query; + + Assert(xlrec->query_len > 0); + + query = (char *) xlrec->qtext; + + /* Append new query text to file */ + stored = qtext_store(query, xlrec->query_len, &query_offset, NULL); + + /* If we failed to write to the text file, give up */ + if (!stored) + { + LWLockRelease(pgss->lock); + return; + } + + /* OK to create a new hashtable entry */ + entry = entry_alloc(&xlrec->key, query_offset, xlrec->query_len, + xlrec->encoding, false); + + /* If needed, perform garbage collection */ + gc_qtexts(); + } + + /* Copy the necessary data from XLog record */ + entry->counters = xlrec->counters; + + entry->encoding = xlrec->encoding; + entry->stats_since = xlrec->stats_since; + entry->minmax_stats_since = xlrec->minmax_stats_since; + + LWLockRelease(pgss->lock); + } + else if (info == PGSS_XLOG_RESET) + { + pgssXLogReset *xlrec = (pgssXLogReset *) XLogRecGetData(record); + + if (xlrec->header != PGSS_FILE_HEADER) + { + elog(WARNING, "Skip the inconsistent WAL record"); + return; + } + + entry_reset(xlrec->userid, xlrec->dbid, xlrec->queryid, xlrec->minmax_only); + } +} + +static void +rmgr_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == PGSS_XLOG_INSERT) + { + pgssXLogInsert *xlrec = (pgssXLogInsert *) rec; + + if (xlrec->header != PGSS_FILE_HEADER) + { + elog(WARNING, "Skip the inconsistent WAL record"); + return; + } + + appendStringInfo(buf, "userid: %u, dbid: %u, queryid: " UINT64_FORMAT + ", toplevel: %d", + xlrec->key.userid, xlrec->key.dbid, xlrec->key.queryid, + (int) xlrec->key.toplevel); + } + else if (info == PGSS_XLOG_RESET) + { + pgssXLogReset *xlrec = (pgssXLogReset *) rec; + + if (xlrec->header != PGSS_FILE_HEADER) + { + elog(WARNING, "Skip the inconsistent WAL record"); + return; + } + + appendStringInfo(buf, "userid: %u, dbid: %u, queryid: " UINT64_FORMAT + ", minmax_only: %d", + xlrec->userid, xlrec->dbid, xlrec->queryid, + (int) xlrec->minmax_only); + } +} + +static const char * +rmgr_identify(uint8 info) +{ + if ((info & ~XLR_INFO_MASK) == PGSS_XLOG_INSERT) + return "INSERT"; + if ((info & ~XLR_INFO_MASK) == PGSS_XLOG_RESET) + return "RESET"; + + return NULL; +} + +static void +rmgr_checkpoint(int flags) +{ + FILE *file; + char *qbuffer = NULL; + Size qbuffer_size = 0; + HASH_SEQ_STATUS hash_seq; + int32 num_entries; + pgssEntry *entry; + + /* Safety check ... shouldn't get here unless shmem is set up. */ + if (!pgss || !pgss_hash) + return; + + /* Don't dump if told not to. */ + if (!pgss_save) + return; + + /* XXX: Can there be concurrent CHECKPOINTs? */ + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + + file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W); + if (file == NULL) + goto error; + + if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1) + goto error; + if (fwrite(&PGSS_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1) + goto error; + num_entries = hash_get_num_entries(pgss_hash); + if (fwrite(&num_entries, sizeof(int32), 1, file) != 1) + goto error; + + qbuffer = qtext_load_file(&qbuffer_size); + if (qbuffer == NULL) + goto error; + + /* + * When serializing to disk, we store query texts immediately after their + * entry data. Any orphaned query texts are thereby excluded. + */ + hash_seq_init(&hash_seq, pgss_hash); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + int len = entry->query_len; + char *qstr = qtext_fetch(entry->query_offset, len, + qbuffer, qbuffer_size); + + if (qstr == NULL) + continue; /* Ignore any entries with bogus texts */ + + if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1 || + fwrite(qstr, 1, len + 1, file) != len + 1) + { + /* note: we assume hash_seq_term won't change errno */ + hash_seq_term(&hash_seq); + goto error; + } + } + + /* Dump global statistics for pg_stat_statements */ + if (fwrite(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1) + goto error; + + free(qbuffer); + qbuffer = NULL; + + if (FreeFile(file)) + { + file = NULL; + goto error; + } + + /* + * Rename file into place, so we atomically replace any old one. + */ + (void) durable_rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE, LOG); + + /* Unlink query-texts file; it's not needed while shutdown */ + if (flags & CHECKPOINT_IS_SHUTDOWN) + unlink(PGSS_TEXT_FILE); + + LWLockRelease(pgss->lock); + return; + +error: + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", + PGSS_DUMP_FILE ".tmp"))); + free(qbuffer); + if (file) + FreeFile(file); + unlink(PGSS_DUMP_FILE ".tmp"); + + if (flags & CHECKPOINT_IS_SHUTDOWN) + unlink(PGSS_TEXT_FILE); + + LWLockRelease(pgss->lock); +} diff --git a/contrib/pg_stat_statements/t/020_crash.pl b/contrib/pg_stat_statements/t/020_crash.pl new file mode 100644 index 0000000000..f33bb43d7d --- /dev/null +++ b/contrib/pg_stat_statements/t/020_crash.pl @@ -0,0 +1,80 @@ +# Copyright (c) 2023-2024, PostgreSQL Global Development Group + +# Tests for checking that pg_stat_statements contents are preserved +# across restarts. + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $node = PostgreSQL::Test::Cluster->new('main'); +$node->init; +$node->append_conf('postgresql.conf', + q[ + shared_preload_libraries = 'pg_stat_statements' + restart_after_crash = 1 + ]); +$node->start; + +$node->safe_psql('postgres', 'CREATE EXTENSION pg_stat_statements'); + +# Without the CHECKPOINT hook, we won't see this query in pg_stat_statements +# after a server crash. +$node->safe_psql('postgres', 'CREATE TABLE t1 (a int)'); + +$node->safe_psql('postgres', 'CHECKPOINT'); +$node->safe_psql('postgres', 'SELECT a FROM t1'); + +is( $node->safe_psql( + 'postgres', + "SELECT query FROM pg_stat_statements WHERE query NOT LIKE '%pg_stat_statements%' ORDER BY query" + ), + "CHECKPOINT\nCREATE TABLE t1 (a int)\nSELECT a FROM t1", + 'pg_stat_statements populated'); + + +# Perform a server shutdown by killing the backend. +my $psql_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default); + +my ($killme_stdin, $killme_stdout, $killme_stderr) = ('', '', ''); +my $killme = IPC::Run::start( + [ + 'psql', '-X', '-qAt', '-v', 'ON_ERROR_STOP=1', '-f', '-', '-d', + $node->connstr('postgres') + ], + '<', + \$killme_stdin, + '>', + \$killme_stdout, + '2>', + \$killme_stderr, + $psql_timeout); + +$killme_stdin .= "SELECT pg_backend_pid();\n"; +ok( pump_until( + $killme, $psql_timeout, \$killme_stdout, qr/[[:digit:]]+[\r\n]$/m), + 'acquired pid for SIGQUIT'); +my $pid = $killme_stdout; +chomp($pid); + +my $ret = PostgreSQL::Test::Utils::system_log('pg_ctl', 'kill', 'QUIT', $pid); +is($ret, 0, "killed process with SIGQUIT"); + +$killme->finish; + +# Wait till server restarts +is($node->poll_query_until('postgres', undef, ''), + "1", "reconnected after SIGQUIT"); + +is( $node->safe_psql( + 'postgres', + "SELECT query FROM pg_stat_statements WHERE query NOT LIKE '%pg_stat_statements%' ORDER BY query" + ), + "CHECKPOINT\nCREATE TABLE t1 (a int)\nSELECT a FROM t1\nSELECT pg_backend_pid()", + 'pg_stat_statements data kept across the server crash'); + +$node->stop; + +done_testing();