On Wed, 10 Aug 2022 at 04:05, Drouvot, Bertrand <bdrou...@amazon.com> wrote: > > Hi, > > On 8/9/22 6:00 PM, Greg Stark wrote: > > On Tue, 9 Aug 2022 at 06:19, Drouvot, Bertrand <bdrou...@amazon.com> wrote: > >> > >> What do you think about adding a function in core PG to provide such > >> functionality? (means being able to retrieve all the stats (+ eventually > >> add some filtering) without the need to connect to each database). > > I'm working on it myself too. I'll post a patch for discussion in a bit. > > Great! Thank you!
So I was adding the code to pgstat.c because I had thought there were some data types I needed and/or static functions I needed. However you and Andres encouraged me to check again now. And indeed I was able, after fixing a couple things, to make the code work entirely externally. This is definitely not polished and there's a couple obvious things missing. But at the risk of embarrassment I've attached my WIP. Please be gentle :) I'll post the github link in a bit when I've fixed up some meta info. I'm definitely not wedded to the idea of using callbacks, it was just the most convenient way to get started, especially when I was putting the main loop in pgstat.c. Ideally I do want to keep open the possibility of streaming the results out without buffering the whole set in memory. > Out of curiosity, would you be also interested by such a feature for > previous versions (that will not get the patch in) ? I always had trouble understanding the existing stats code so I was hoping the new code would make it easier. It seems to have worked but it's possible I'm wrong and it was always possible and the problem was always just me :) -- greg
/*------------------------------------------------------------------------- * * telemetry.c * * Most of this code was copied from pg_prewarm.c as a template. * * *------------------------------------------------------------------------- */ #include "postgres.h" #include <unistd.h> #include <stdio.h> #include <stdarg.h> #include "access/relation.h" #include "access/xact.h" #include "catalog/pg_class.h" #include "catalog/pg_type.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/bgworker.h" #include "postmaster/interrupt.h" #include "storage/buf_internals.h" #include "storage/dsm.h" #include "storage/ipc.h" #include "storage/latch.h" #include "storage/lwlock.h" #include "storage/proc.h" #include "storage/procsignal.h" #include "storage/shmem.h" #include "storage/smgr.h" #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/datetime.h" #include "utils/guc.h" #include "utils/memutils.h" #include "utils/rel.h" #include "utils/resowner.h" #include "telemetry.h" #include "telemetry_pgstat.h" PG_MODULE_MAGIC; /* We should already have included what we need to get uint64_t size_t fd_set socklen_t and struct sockaddr so don't let microhttpd include extra stuff for them */ #define MHD_PLATFORM_H #include <microhttpd.h> /* MHD_USE_EPOLL */ /* MHD_USE_AUTO */ /* MHD_OPTION_CONNECTION_LIMIT */ /* MHD_OPTION_CONNECTION_TIMEOUT */ /* MHD_OPTION_EXTERNAL_LOGGER */ /* Background worker harness */ void _PG_init(void); /* Actual internal background worker main entry point */ static void telemetry_start_worker(unsigned short port); /* GUC variables. */ static int telemetry_port = 9187; /* TCP port to listen on for metrics */ static char *telemetry_listen_addresses; /* TCP listen addresses */ static bool telemetry = true; /* start worker automatically on default port */ static enum MHD_Result telemetry_handler(void *cls, struct MHD_Connection *connection, const char *url, const char *method, const char *version, const char *upload_data, size_t *upload_data_size, void **con_cls); /* * Module load callback. */ void _PG_init(void) { DefineCustomIntVariable("telemetry.port", "TCP Port to serve metrics on by default", NULL, &telemetry_port, 9187, 1, 65535, PGC_SIGHUP, 0, /* flags */ NULL, NULL, NULL /* hooks */ ); DefineCustomStringVariable("telemetry.listen_addresses", "TCP Listen Addresses to serve metrics on by default", NULL, &telemetry_listen_addresses, "*", PGC_SIGHUP, GUC_LIST_INPUT, /* flags */ NULL, NULL, NULL /* hooks */ ); if (!process_shared_preload_libraries_in_progress) return; /* can't define PGC_POSTMASTER variable after startup */ DefineCustomBoolVariable("telemetry.start_server", "Starts the telemetry worker on startup.", NULL, &telemetry, true, PGC_POSTMASTER, 0, NULL, NULL, NULL); EmitWarningsOnPlaceholders("telemetry"); /* Register telemetry worker, if enabled. */ if (telemetry) telemetry_start_worker(telemetry_port); } static void telemetry_logger(void *arg, const char *fmt, va_list ap); struct MHD_OptionItem mhd_ops[] = { { MHD_OPTION_EXTERNAL_LOGGER, (intptr_t)telemetry_logger, NULL }, { MHD_OPTION_CONNECTION_LIMIT, 10, NULL }, { MHD_OPTION_CONNECTION_TIMEOUT, 30, NULL }, { MHD_OPTION_END, 0, NULL } }; /* * Main entry point for the leader telemetry process. This is invoked * *in* the background worker process. */ void telemetry_main(Datum main_arg) { /* Handle -1 or out of range values? */ unsigned short port = DatumGetInt32(main_arg); WaitEventSet *waitset; WaitEvent event; struct MHD_Daemon *daemon; enum MHD_Result mhd_retval; const union MHD_DaemonInfo *mhd_info; /*fd_set r_fdset, w_fdset, e_fdset;*/ /*int max_fd, i;*/ unsigned long long mhd_timeout; long timeout; /* Establish signal handlers; once that's done, unblock signals. */ pqsignal(SIGTERM, SignalHandlerForShutdownRequest); pqsignal(SIGHUP, SignalHandlerForConfigReload); pqsignal(SIGUSR1, procsignal_sigusr1_handler); BackgroundWorkerUnblockSignals(); daemon = MHD_start_daemon(MHD_USE_ERROR_LOG | MHD_USE_EPOLL /* flags */, port, NULL /* accept callback */, NULL /* closure */, telemetry_handler /* default handler */, NULL /* closure */, MHD_OPTION_ARRAY, mhd_ops, MHD_OPTION_END); waitset = CreateWaitEventSet(TopMemoryContext, 3); mhd_info = MHD_get_daemon_info(daemon, MHD_DAEMON_INFO_EPOLL_FD); if (mhd_info != NULL && mhd_info->epoll_fd > 0) { elog(LOG, "telemetry got %d as epoll file descriptor", mhd_info->epoll_fd); AddWaitEventToSet(waitset, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE, mhd_info->epoll_fd, NULL, NULL); } else { elog(LOG, "telemetry couldn't get mhd epoll fd"); return; /* max_fd = 0; mhd_retval = MHD_get_fdset(daemon, r_fdset, w_fdset, e_fdset, &max_fd); for(i=0;i++;i<=max_fd) { if (FD_ISSET(i, r_fdset)) AddWaitEventToSet(waitset, WL_SOCKET_READABLE, i, NULL, NULL); if (FD_ISSET(i, w_fdset)) AddWaitEventToSet(waitset, WL_SOCKET_WRITEABLE, i, NULL, NULL); } */ } AddWaitEventToSet(waitset, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); AddWaitEventToSet(waitset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL); /* wait for requests */ while (!ShutdownRequestPending) { /* In case of a SIGHUP, just reload the configuration. */ if (ConfigReloadPending) { int old_port = telemetry_port; ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); if (old_port != telemetry_port) { ;/* restart web server on new port */ } } mhd_retval = MHD_run(daemon); if (mhd_retval != MHD_YES) { elog(LOG, "mhd_no from MHD_run()"); } mhd_retval = MHD_get_timeout (daemon, &mhd_timeout); if (mhd_retval == MHD_YES && mhd_timeout > 0 && mhd_timeout < 10000) { timeout = (long)timeout; } else { timeout = 10000; } elog(LOG, "telemetry worker waiting %0.3fs for requests", (double)timeout/1000); WaitEventSetWait(waitset, (timeout<10000 ? (long)timeout : 10000), &event, 1, PG_WAIT_EXTENSION); /* Reset the latch, loop. */ ResetLatch(MyLatch); } MHD_stop_daemon(daemon); } /* * SQL-callable function to launch telemetry. */ PG_FUNCTION_INFO_V1(telemetry_server_start); Datum telemetry_server_start(PG_FUNCTION_ARGS) { unsigned short port = PG_GETARG_INT32(0); telemetry_start_worker(port); PG_RETURN_VOID(); } /* * Start telemetry worker process. */ static void telemetry_start_worker(unsigned short port) { BackgroundWorker worker; BackgroundWorkerHandle *handle; BgwHandleStatus status; pid_t pid; MemSet(&worker, 0, sizeof(BackgroundWorker)); worker.bgw_flags = BGWORKER_SHMEM_ACCESS; worker.bgw_start_time = BgWorkerStart_ConsistentState; worker.bgw_main_arg = Int32GetDatum(port); strcpy(worker.bgw_library_name, "telemetry"); strcpy(worker.bgw_function_name, "telemetry_main"); strcpy(worker.bgw_name, "telemetry server"); strcpy(worker.bgw_type, "telemetry server"); if (process_shared_preload_libraries_in_progress) { RegisterBackgroundWorker(&worker); return; } /* must set notify PID to wait for startup */ worker.bgw_notify_pid = MyProcPid; if (!RegisterDynamicBackgroundWorker(&worker, &handle)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_RESOURCES), errmsg("could not register background process"), errhint("You may need to increase max_worker_processes."))); status = WaitForBackgroundWorkerStartup(handle, &pid); if (status != BGWH_STARTED) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_RESOURCES), errmsg("could not start background process"), errhint("More details may be available in the server log."))); } struct activity_stats stats; static enum MHD_Result telemetry_handler(void *cls, struct MHD_Connection *connection, const char *url, const char *method, const char *version, const char *upload_data, size_t *upload_data_size, void **con_cls) { struct MHD_Response *mhd_response; enum MHD_Result mhd_retval; char *buf; elog(LOG, "telemetry %s request: %s", method, url); buf = pg_stat_dump_stats_string(); mhd_response = MHD_create_response_from_buffer(strlen(buf), (void*)buf, MHD_RESPMEM_PERSISTENT); if (!mhd_response) { elog(LOG, "NULL response!"); return MHD_NO; } mhd_retval = MHD_queue_response(connection, 200, mhd_response); MHD_destroy_response(mhd_response); if (mhd_retval != MHD_YES) { elog(LOG, "mhd_no from queue_response"); return MHD_NO; } return MHD_YES; } static void telemetry_logger(void *arg, const char *fmt, va_list ap) { char buf[256]; int retval; retval = vsnprintf(buf, sizeof(buf), fmt, ap); if (retval >= sizeof(buf)) { elog(LOG, "message buffer overflow need %d bytes", retval); } else { elog(LOG, "telemetry mhd log: %s", buf); } }
#include "postgres.h" #include <unistd.h> #include "access/transam.h" #include "access/xact.h" #include "lib/dshash.h" #include "pgstat.h" #include "port/atomics.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/lwlock.h" #include "storage/pg_shmem.h" #include "storage/shmem.h" #include "utils/guc.h" #include "utils/memutils.h" #include "utils/pgstat_internal.h" #include "utils/timestamp.h" #include <math.h> #include <float.h> /* SQL callable interface to return metrics */ Datum pg_stat_dump_stats(PG_FUNCTION_ARGS); /* C-level call for telemetry monitoring agent to call */ char * pg_stat_dump_stats_string(void); /* XXX */ typedef void (*dump_stats_cb)(PgStat_Kind kind, const char *kind_name, Oid dboid, Oid objoid, const char *name, void *body, void *closure); static void pgstat_dump_stats(dump_stats_cb callback, void *closure) { PgStat_Kind kind; dshash_seq_status hstat; PgStatShared_HashEntry *ps; struct timespec tstart={0,0}, tend={0,0}; clock_gettime(CLOCK_MONOTONIC, &tstart); pgstat_assert_is_up(); pgstat_fetch_consistency = PGSTAT_FETCH_CONSISTENCY_NONE; elog(NOTICE, "dumping stats contents"); /* * XXX: The following could now be generalized to just iterate over * pgstat_kind_infos instead of knowing about the different kinds of * stats. */ for (kind=PGSTAT_KIND_FIRST_VALID; kind <= PGSTAT_KIND_LAST; kind++) { if (pgstat_get_kind_info(kind)->fixed_amount) pgstat_snapshot_fixed(kind); } callback(PGSTAT_KIND_ARCHIVER, pgstat_get_kind_info(PGSTAT_KIND_ARCHIVER)->name, InvalidOid, InvalidOid, NULL, &pgStatLocal.snapshot.archiver, closure); callback(PGSTAT_KIND_BGWRITER, pgstat_get_kind_info(PGSTAT_KIND_BGWRITER)->name, InvalidOid, InvalidOid, NULL, &pgStatLocal.snapshot.bgwriter, closure); callback(PGSTAT_KIND_CHECKPOINTER, pgstat_get_kind_info(PGSTAT_KIND_CHECKPOINTER)->name, InvalidOid, InvalidOid, NULL, &pgStatLocal.snapshot.checkpointer, closure); for (int i = 0 ; i < SLRU_NUM_ELEMENTS; i++ ) { callback(PGSTAT_KIND_SLRU, pgstat_get_kind_info(PGSTAT_KIND_SLRU)->name, InvalidOid, InvalidOid, slru_names[i], &pgStatLocal.snapshot.slru[i], closure); } callback(PGSTAT_KIND_WAL, pgstat_get_kind_info(PGSTAT_KIND_WAL)->name, InvalidOid, InvalidOid, NULL, &pgStatLocal.snapshot.wal, closure); /* * Walk through the stats entries */ dshash_seq_init(&hstat, pgStatLocal.shared_hash, false); while ((ps = dshash_seq_next(&hstat)) != NULL) { PgStatShared_Common *shstats; void *body; const PgStat_KindInfo *kind_info = NULL; CHECK_FOR_INTERRUPTS(); /* we may have some "dropped" entries not yet removed, skip them */ Assert(!ps->dropped); if (ps->dropped) continue; shstats = (PgStatShared_Common *) dsa_get_address(pgStatLocal.dsa, ps->body); body = pgstat_get_entry_data(ps->key.kind, shstats); kind_info = pgstat_get_kind_info(ps->key.kind); /* if not dropped the valid-entry refcount should exist */ Assert(pg_atomic_read_u32(&ps->refcount) > 0); if (!kind_info->to_serialized_name) { callback(ps->key.kind, kind_info->name, ps->key.dboid, ps->key.objoid, NULL, body, closure); } else { /* stats entry identified by name on disk (e.g. replication slots) */ NameData name; kind_info->to_serialized_name(shstats, &name); callback(ps->key.kind, kind_info->name, InvalidOid, InvalidOid, NameStr(name), body, closure); } } dshash_seq_term(&hstat); clock_gettime(CLOCK_MONOTONIC, &tend); elog(NOTICE, "pgstat_dump_stats took about %0.3f ms\n", 1000 * (((double)tend.tv_sec + 1.0e-9*tend.tv_nsec) - ((double)tstart.tv_sec + 1.0e-9*tstart.tv_nsec)) ); } /* * Lots of work left to do here * * Prometheus expects double precision floats and all our counters are * integers so the best strategy is just to have them wrap at 2^53 to avoid * losing precision. Prometheus treats that as a reset so we trade absolute * value for precision but since they're counters that's fine. * * Our timestamps are 64-bit microseconds and Prometheus standard is to use * double precision in units of seconds. That loses precision for dates far in * the future and past but these stats just have things like the time of the * last failure. * */ #define COUNTER_MASK ((1LL<<DBL_MANT_DIG)-1) static void emit_counter(const char *name, char *labels, PgStat_Counter counter, void *closure) { StringInfo response = closure; int64 val = counter & COUNTER_MASK; /* Assert check that we are printed values that don't lose precision in * doubles */ double valf = (double)val; Assert((int64)valf == val); if (labels) appendStringInfo(response, "%s{%s}=%lu\n", name, labels, val); else appendStringInfo(response, "%s=%lu\n", name, val); } static void emit_timestamptz(const char *name, char *labels, TimestampTz timestamp, void *closure) { StringInfo response = closure; double val = timestamp== 0 ? NAN : (double)timestamp / 1000000 + SECS_PER_DAY*(POSTGRES_EPOCH_JDATE-UNIX_EPOCH_JDATE); if (labels) appendStringInfo(response, "%s{%s}=%lf\n", name, labels, val); else appendStringInfo(response, "%s=%lf\n", name, val); } static void dump_stats_callback(PgStat_Kind kind, const char *kind_name, Oid dboid, Oid objoid, const char *name, void *body, void *closure) { if (dboid == InvalidOid && objoid == InvalidOid && name == NULL) { elog(WARNING, "stats for \"%s\" with no key?!", kind_name); } switch(kind) { /* stats for variable-numbered objects */ case PGSTAT_KIND_DATABASE: { PgStat_StatDBEntry *stats = body; /* db=0 represents the background workers apparently? */ char *labels = dboid==0 ? NULL : psprintf("db=\"%d\"", dboid); emit_counter("xact_commit", labels, stats->n_xact_commit, closure); emit_counter("xact_rollback", labels, stats->n_xact_rollback, closure); emit_counter("blks_fetched", labels, stats->n_blocks_fetched, closure); emit_counter("blks_hit", labels, stats->n_blocks_hit, closure); emit_counter("tup_returned", labels, stats->n_tuples_returned, closure); emit_counter("tup_fetched", labels, stats->n_tuples_fetched, closure); emit_counter("tup_inserted", labels, stats->n_tuples_inserted, closure); emit_counter("tup_updated", labels, stats->n_tuples_updated, closure); emit_counter("tup_deleted", labels, stats->n_tuples_deleted, closure); emit_timestamptz("last_autovac_time", labels, stats->last_autovac_time, closure); /* Why do we do this?? */ emit_counter("conflicts", labels, (stats->n_conflict_tablespace + stats->n_conflict_lock + stats->n_conflict_snapshot + stats->n_conflict_bufferpin + stats->n_conflict_startup_deadlock), closure); /* emit_counter("conflicts_tablespace", labels, stats->n_conflict_tablespace, closure); emit_counter("conflicts_lock", labels, stats->n_conflict_lock, closure); emit_counter("conflicts_snapshot", labels, stats->n_conflict_snapshot, closure); emit_counter("conflicts_bufferpin", labels, stats->n_conflict_bufferpin, closure); emit_counter("conflicts_startup_deadlock", labels, stats->n_conflict_startup_deadlock, closure); */ emit_counter("temp_files", labels, stats->n_temp_files, closure); emit_counter("temp_bytes", labels, stats->n_temp_bytes, closure); emit_counter("deadlocks", labels, stats->n_deadlocks, closure); emit_counter("checksum_failures", labels, stats->n_checksum_failures, closure); emit_timestamptz("last_checksum_failure", labels, stats->last_checksum_failure, closure); emit_counter("blk_read_time", labels, stats->n_block_read_time, closure); emit_counter("blk_write_time", labels, stats->n_block_write_time, closure); emit_counter("numbackends", labels, stats->n_sessions, closure); emit_counter("session_time", labels, stats->total_session_time, closure); emit_counter("active_time", labels, stats->total_active_time, closure); emit_counter("idle_in_transaction_time", labels, stats->total_idle_in_xact_time, closure); emit_counter("sessions_abandoned", labels, stats->n_sessions_abandoned, closure); emit_counter("sessions_fatal", labels, stats->n_sessions_fatal, closure); emit_counter("sessions_killed", labels, stats->n_sessions_killed, closure); /*emit_timestamptz("stat_reset_timestamp", labels, stats->stat_reset_timestamp, closure);*/ break; } case PGSTAT_KIND_RELATION: { PgStat_StatTabEntry *stats = body; /* db=0 represents the shared tables apparently? */ char *labels = dboid==0 ? psprintf("oid=\"%d\"", objoid) : psprintf("db=\"%d\",oid=\"%d\"", dboid,objoid); emit_counter("numscans", labels, stats->numscans, closure); emit_counter("tuples_returned", labels, stats->tuples_returned, closure); emit_counter("tuples_fetched", labels, stats->tuples_fetched, closure); emit_counter("tuples_inserted", labels, stats->tuples_inserted, closure); emit_counter("tuples_updated", labels, stats->tuples_updated, closure); emit_counter("tuples_deleted", labels, stats->tuples_deleted, closure); emit_counter("tuples_hot_updated", labels, stats->tuples_hot_updated, closure); emit_counter("n_live_tuples", labels, stats->n_live_tuples, closure); emit_counter("n_dead_tuples", labels, stats->n_dead_tuples, closure); emit_counter("changes_since_analyze", labels, stats->changes_since_analyze, closure); emit_counter("inserts_since_vacuum", labels, stats->inserts_since_vacuum, closure); emit_counter("blocks_fetched", labels, stats->blocks_fetched, closure); emit_counter("blocks_hit", labels, stats->blocks_hit, closure); emit_timestamptz("vacuum_timestamp", labels, stats->vacuum_timestamp, closure); emit_counter("vacuum_count", labels, stats->vacuum_count, closure); emit_timestamptz("autovac_vacuum_timestamp", labels, stats->autovac_vacuum_timestamp, closure); emit_counter("autovac_vacuum_count", labels, stats->autovac_vacuum_count, closure); emit_timestamptz("analyze_timestamp", labels, stats->analyze_timestamp, closure); emit_counter("analyze_count", labels, stats->analyze_count, closure); emit_timestamptz("autovac_analyze_timestamp", labels, stats->autovac_analyze_timestamp, closure); emit_counter("autovac_analyze_count", labels, stats->autovac_analyze_count, closure); break; } case PGSTAT_KIND_FUNCTION: { PgStat_StatFuncEntry *stats = body; char *labels = psprintf("db=\"%d\",oid=\"%d\"", dboid,objoid); emit_counter("calls", labels, stats->f_numcalls, closure); emit_counter("total_time", labels, stats->f_total_time, closure); emit_counter("self_time", labels, stats->f_self_time, closure); break; } case PGSTAT_KIND_REPLSLOT: { PgStat_StatReplSlotEntry *stats = body; char *labels = psprintf("slot_name=\"%s\"", name); emit_counter("spill_txns", labels, stats->spill_txns, closure); emit_counter("spill_count", labels, stats->spill_count, closure); emit_counter("spill_bytes", labels, stats->spill_bytes, closure); emit_counter("stream_txns", labels, stats->stream_txns, closure); emit_counter("stream_count", labels, stats->stream_count, closure); emit_counter("stream_bytes", labels, stats->stream_bytes, closure); emit_counter("total_txns", labels, stats->total_txns, closure); emit_counter("total_bytes", labels, stats->total_bytes, closure); /*emit_timestamptz("stat_reset_timestamp", labels, stats->stat_reset_timestamp, closure);*/ break; } case PGSTAT_KIND_SUBSCRIPTION: { PgStat_StatSubEntry *stats = body; char *labels = psprintf("db=\"%d\",oid=\"%d\"", dboid,objoid); emit_counter("apply_error_count", labels, stats->apply_error_count, closure); emit_counter("sync_error_count", labels, stats->sync_error_count, closure); /*emit_timestamptz("stat_reset_timestamp", labels, stats->stat_reset_timestamp, closure);*/ break; } /* stats for fixed-numbered objects -- these are actually invoked * explicitly from dump_stats on the snapshot objects rather than through * the dshash iteration */ case PGSTAT_KIND_ARCHIVER: { PgStat_ArchiverStats *stats = body; char *labels = NULL; emit_counter("archived_count", labels, stats->archived_count, closure); /* last_archived_wal*/ emit_timestamptz("last_archived_timestamp", labels, stats->last_archived_timestamp, closure); emit_counter("failed_count", labels, stats->failed_count, closure); /* last_failed_wal */ emit_timestamptz("last_failed_timestamp", labels, stats->last_failed_timestamp, closure); /*emit_timestamptz("stat_reset_timestamp", labels, stats->stat_reset_timestamp, closure);*/ break; } case PGSTAT_KIND_BGWRITER: { PgStat_BgWriterStats *stats = body; char *labels = NULL; emit_counter("buffers_clean", labels, stats->buf_written_clean, closure); emit_counter("maxwritten_clean", labels, stats->maxwritten_clean, closure); emit_counter("buffers_alloc", labels, stats->buf_alloc, closure); /*emit_timestamptz("stat_reset_timestamp", labels, stats->stat_reset_timestamp, closure);*/ break; } case PGSTAT_KIND_CHECKPOINTER: { PgStat_CheckpointerStats *stats = body; char *labels = NULL; emit_counter("checkpoints_timed", labels, stats->timed_checkpoints, closure); emit_counter("checkpoints_req", labels, stats->requested_checkpoints, closure); emit_counter("checkpoint_write_time", labels, stats->checkpoint_write_time, closure); emit_counter("checkpoint_sync_time", labels, stats->checkpoint_sync_time, closure); emit_counter("buffers_checkpoint", labels, stats->buf_written_checkpoints, closure); emit_counter("buffers_backend", labels, stats->buf_written_backend, closure); emit_counter("buffers_backend_fsync", labels, stats->buf_fsync_backend, closure); break; } case PGSTAT_KIND_SLRU: { PgStat_SLRUStats *stats = body; char *labels = psprintf("slru=\"%s\"", name); emit_counter("blks_zeroed", labels, stats->blocks_zeroed, closure); emit_counter("blks_hit", labels, stats->blocks_hit, closure); emit_counter("blks_read", labels, stats->blocks_read, closure); emit_counter("blks_written", labels, stats->blocks_written, closure); emit_counter("blks_exists", labels, stats->blocks_exists, closure); emit_counter("flushes", labels, stats->flush, closure); emit_counter("truncates", labels, stats->truncate, closure); /*emit_timestamptz("stat_reset_timestamp", labels, stats->stat_reset_timestamp, closure);*/ break; } case PGSTAT_KIND_WAL: { PgStat_WalStats *stats = body; char *labels=NULL; emit_counter("wal_records", labels, stats->wal_records, closure); emit_counter("wal_fpi", labels, stats->wal_fpi, closure); emit_counter("wal_bytes", labels, stats->wal_bytes, closure); emit_counter("wal_buffers_full", labels, stats->wal_buffers_full, closure); emit_counter("wal_write", labels, stats->wal_write, closure); emit_counter("wal_sync", labels, stats->wal_sync, closure); emit_counter("wal_write_time", labels, stats->wal_write_time, closure); emit_counter("wal_sync_time", labels, stats->wal_sync_time, closure); /*emit_timestamptz("stat_reset_timestamp", labels, stats->stat_reset_timestamp, closure);*/ break; } case PGSTAT_KIND_INVALID: elog(WARNING, "Invalid stats object type found"); break; } } PG_FUNCTION_INFO_V1(pg_stat_dump_stats); Datum pg_stat_dump_stats(PG_FUNCTION_ARGS) { StringInfo response = makeStringInfo(); pgstat_dump_stats(&dump_stats_callback, response); /* Test overflow behaviour, the emitted values should be distinct and wrap around to 0 */ emit_counter("test_value", "value=0", 0, response); emit_counter("test_value", "value=1<<53-2", (1LL<<53)-2, response); emit_counter("test_value", "value=1<<53-1", (1LL<<53)-1, response); emit_counter("test_value", "value=1<<53", 1LL<<53, response); elog(WARNING, "stats:\n%s", response->data); elog(WARNING, "DBL_MANT_DIG=%d COUNTER_MASK=%llx", DBL_MANT_DIG, COUNTER_MASK); PG_RETURN_INT32(response->len); } char * pg_stat_dump_stats_string(void) { StringInfo response = makeStringInfo(); pgstat_dump_stats(&dump_stats_callback, response); return response->data; }