On Wed, 10 Aug 2022 at 04:05, Drouvot, Bertrand <bdrou...@amazon.com> wrote:
>
> Hi,
>
> On 8/9/22 6:00 PM, Greg Stark wrote:
> > On Tue, 9 Aug 2022 at 06:19, Drouvot, Bertrand <bdrou...@amazon.com> wrote:
> >>
> >> What do you think about adding a function in core PG to provide such
> >> functionality? (means being able to retrieve all the stats (+ eventually
> >> add some filtering) without the need to connect to each database).
> > I'm working on it myself too. I'll post a patch for discussion in a bit.
>
> Great! Thank you!

So I was adding the code to pgstat.c because I had thought there were
some data types I needed and/or static functions I needed. However you
and Andres encouraged me to check again now. And indeed I was able,
after fixing a couple things, to make the code work entirely
externally.

This is definitely not polished and there's a couple obvious things
missing. But at the risk of embarrassment I've attached my WIP. Please
be gentle :) I'll post the github link in a bit when I've fixed up
some meta info.

I'm definitely not wedded to the idea of using callbacks, it was just
the most convenient way to get started, especially when I was putting
the main loop in pgstat.c.  Ideally I do want to keep open the
possibility of streaming the results out without buffering the whole
set in memory.

> Out of curiosity, would you be also interested by such a feature for
> previous versions (that will not get the patch in) ?

I always had trouble understanding the existing stats code so I was
hoping the new code would make it easier. It seems to have worked but
it's possible I'm wrong and it was always possible and the problem was
always just me :)


-- 
greg
/*-------------------------------------------------------------------------
 *
 * telemetry.c
 *
 * Most of this code was copied from pg_prewarm.c as a template. 
 *
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include <unistd.h>
#include <stdio.h>
#include <stdarg.h>

#include "access/relation.h"
#include "access/xact.h"
#include "catalog/pg_class.h"
#include "catalog/pg_type.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "storage/buf_internals.h"
#include "storage/dsm.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "storage/shmem.h"
#include "storage/smgr.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/datetime.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/resowner.h"


#include "telemetry.h"
#include "telemetry_pgstat.h"

PG_MODULE_MAGIC;


/* We should already have included what we need to get uint64_t size_t
   fd_set socklen_t and struct sockaddr so don't let microhttpd
   include extra stuff for them */
#define MHD_PLATFORM_H
#include <microhttpd.h>

/* MHD_USE_EPOLL */
/* MHD_USE_AUTO */
/* MHD_OPTION_CONNECTION_LIMIT */
/* MHD_OPTION_CONNECTION_TIMEOUT */
/* MHD_OPTION_EXTERNAL_LOGGER */



/* Background worker harness */
void		_PG_init(void);

/* Actual internal background worker main entry point */
static void telemetry_start_worker(unsigned short port);

/* GUC variables. */
static int telemetry_port = 9187; /* TCP port to listen on for metrics */
static char *telemetry_listen_addresses; /* TCP listen addresses */
static bool telemetry = true; /* start worker automatically on default port */


static enum MHD_Result
telemetry_handler(void *cls,
		  struct MHD_Connection *connection,
		  const char *url,
		  const char *method,
		  const char *version,
		  const char *upload_data,
		  size_t *upload_data_size,
		  void **con_cls);

/*
 * Module load callback.
 */
void
_PG_init(void)
{
    DefineCustomIntVariable("telemetry.port",
			    "TCP Port to serve metrics on by default",
			    NULL,
			    &telemetry_port,
			    9187, 1, 65535,
			    PGC_SIGHUP,
			    0, /* flags */
			    NULL, NULL, NULL /* hooks */
			    );
    
    
    DefineCustomStringVariable("telemetry.listen_addresses",
			       "TCP Listen Addresses to serve metrics on by default",
			       NULL,
			       &telemetry_listen_addresses,
			       "*",
			       PGC_SIGHUP,
			       GUC_LIST_INPUT, /* flags */
			       NULL, NULL, NULL /* hooks */
			       );
    
    if (!process_shared_preload_libraries_in_progress)
	return;
    
    /* can't define PGC_POSTMASTER variable after startup */
    DefineCustomBoolVariable("telemetry.start_server",
			     "Starts the telemetry worker on startup.",
			     NULL,
			     &telemetry,
			     true,
			     PGC_POSTMASTER,
			     0,
			     NULL,
			     NULL,
			     NULL);

    EmitWarningsOnPlaceholders("telemetry");

    /* Register telemetry worker, if enabled. */
    if (telemetry)
	telemetry_start_worker(telemetry_port);
}


static void telemetry_logger(void *arg, const char *fmt, va_list ap);

struct MHD_OptionItem mhd_ops[] = {
    { MHD_OPTION_EXTERNAL_LOGGER, (intptr_t)telemetry_logger, NULL },
    { MHD_OPTION_CONNECTION_LIMIT, 10, NULL },
    { MHD_OPTION_CONNECTION_TIMEOUT, 30, NULL },
    { MHD_OPTION_END, 0, NULL }
};


/*
 * Main entry point for the leader telemetry process. This is invoked
 * *in* the background worker process.
 */

void
telemetry_main(Datum main_arg)
{
    /* Handle -1 or out of range values? */
    unsigned short port = DatumGetInt32(main_arg);

    WaitEventSet *waitset;
    WaitEvent event;

    struct MHD_Daemon *daemon;
    enum MHD_Result mhd_retval;
    const union MHD_DaemonInfo *mhd_info;
    /*fd_set r_fdset, w_fdset, e_fdset;*/
    /*int max_fd, i;*/
    unsigned long long mhd_timeout;
    long timeout;

    /* Establish signal handlers; once that's done, unblock signals. */
    pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
    pqsignal(SIGHUP, SignalHandlerForConfigReload);
    pqsignal(SIGUSR1, procsignal_sigusr1_handler);
    BackgroundWorkerUnblockSignals();


    daemon = MHD_start_daemon(MHD_USE_ERROR_LOG | MHD_USE_EPOLL /* flags */,
			      port,
			      NULL /* accept callback */, NULL /* closure */,
			      telemetry_handler /* default handler */, NULL /* closure */,
			      MHD_OPTION_ARRAY, mhd_ops,
			      MHD_OPTION_END);
    waitset = CreateWaitEventSet(TopMemoryContext, 3);

    mhd_info = MHD_get_daemon_info(daemon, MHD_DAEMON_INFO_EPOLL_FD);
    if (mhd_info != NULL && mhd_info->epoll_fd > 0)
	{
	    elog(LOG, "telemetry got %d as epoll file descriptor", mhd_info->epoll_fd);
	    AddWaitEventToSet(waitset, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE, mhd_info->epoll_fd, NULL, NULL);
	} else {
	elog(LOG, "telemetry couldn't get mhd epoll fd");
	return;
	/*
	  max_fd = 0;
	  mhd_retval = MHD_get_fdset(daemon, r_fdset, w_fdset, e_fdset, &max_fd);
	  for(i=0;i++;i<=max_fd) {
	  if (FD_ISSET(i, r_fdset)) AddWaitEventToSet(waitset, WL_SOCKET_READABLE, i, NULL, NULL);
	  if (FD_ISSET(i, w_fdset)) AddWaitEventToSet(waitset, WL_SOCKET_WRITEABLE, i, NULL, NULL);
	  }
	*/
    }	
			
    AddWaitEventToSet(waitset, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
    AddWaitEventToSet(waitset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL);
	
    /* wait for requests */
    while (!ShutdownRequestPending)
	{
	    /* In case of a SIGHUP, just reload the configuration. */
	    if (ConfigReloadPending)
		{
		    int old_port = telemetry_port;
		    ConfigReloadPending = false;
		    ProcessConfigFile(PGC_SIGHUP);
		    if (old_port != telemetry_port) {
			;/* restart web server on new port */
		    }
		}

	    mhd_retval = MHD_run(daemon);
	    if (mhd_retval != MHD_YES) {
		elog(LOG, "mhd_no from MHD_run()");
	    }

		
	    mhd_retval = MHD_get_timeout (daemon, &mhd_timeout);
	    if (mhd_retval == MHD_YES && mhd_timeout > 0 && mhd_timeout < 10000) {
		timeout = (long)timeout;
	    } else {
		timeout = 10000;
	    }
		
	    elog(LOG, "telemetry worker waiting %0.3fs for requests", (double)timeout/1000);
	    WaitEventSetWait(waitset, (timeout<10000 ? (long)timeout : 10000), &event, 1, PG_WAIT_EXTENSION);
	    /* Reset the latch, loop. */
	    ResetLatch(MyLatch);
	}

    MHD_stop_daemon(daemon);
}


/*
 * SQL-callable function to launch telemetry.
 */
PG_FUNCTION_INFO_V1(telemetry_server_start);
Datum
telemetry_server_start(PG_FUNCTION_ARGS)
{
    unsigned short port = PG_GETARG_INT32(0);

    telemetry_start_worker(port);
    PG_RETURN_VOID();
}


/*
 * Start telemetry worker process.
 */
static void
telemetry_start_worker(unsigned short port)
{
    BackgroundWorker worker;
    BackgroundWorkerHandle *handle;
    BgwHandleStatus status;
    pid_t		pid;

    MemSet(&worker, 0, sizeof(BackgroundWorker));
    worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
    worker.bgw_start_time = BgWorkerStart_ConsistentState;
    worker.bgw_main_arg = Int32GetDatum(port);
    strcpy(worker.bgw_library_name, "telemetry");
    strcpy(worker.bgw_function_name, "telemetry_main");
    strcpy(worker.bgw_name, "telemetry server");
    strcpy(worker.bgw_type, "telemetry server");

    if (process_shared_preload_libraries_in_progress)
	{
	    RegisterBackgroundWorker(&worker);
	    return;
	}

    /* must set notify PID to wait for startup */
    worker.bgw_notify_pid = MyProcPid;

    if (!RegisterDynamicBackgroundWorker(&worker, &handle))
	ereport(ERROR,
		(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
		 errmsg("could not register background process"),
		 errhint("You may need to increase max_worker_processes.")));

    status = WaitForBackgroundWorkerStartup(handle, &pid);
    if (status != BGWH_STARTED)
	ereport(ERROR,
		(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
		 errmsg("could not start background process"),
		 errhint("More details may be available in the server log.")));
}


struct activity_stats stats;

static enum MHD_Result
telemetry_handler(void *cls,
		  struct MHD_Connection *connection,
		  const char *url,
		  const char *method,
		  const char *version,
		  const char *upload_data,
		  size_t *upload_data_size,
		  void **con_cls)
{
    struct MHD_Response *mhd_response;
    enum MHD_Result mhd_retval;
    char *buf;

    elog(LOG, "telemetry %s request: %s", method, url);

    buf = pg_stat_dump_stats_string();

    mhd_response = MHD_create_response_from_buffer(strlen(buf), (void*)buf, MHD_RESPMEM_PERSISTENT);
    if (!mhd_response) {
	elog(LOG, "NULL response!");
	return MHD_NO;
    }

    mhd_retval = MHD_queue_response(connection, 200, mhd_response);
    MHD_destroy_response(mhd_response);

    if (mhd_retval != MHD_YES) {
	elog(LOG, "mhd_no from queue_response");
	return MHD_NO;
    }

    return MHD_YES;
}


static void telemetry_logger(void *arg, const char *fmt, va_list ap)
{
    char buf[256];
    int retval;
    retval = vsnprintf(buf, sizeof(buf), fmt, ap);
    if (retval >= sizeof(buf)) {
	elog(LOG, "message buffer overflow need %d bytes", retval);
    } else {
	elog(LOG, "telemetry mhd log: %s", buf);
    }
}


#include "postgres.h"

#include <unistd.h>

#include "access/transam.h"
#include "access/xact.h"
#include "lib/dshash.h"
#include "pgstat.h"
#include "port/atomics.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/pg_shmem.h"
#include "storage/shmem.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pgstat_internal.h"
#include "utils/timestamp.h"
#include <math.h>
#include <float.h>


/* SQL callable interface to return metrics */
Datum
pg_stat_dump_stats(PG_FUNCTION_ARGS);

/* C-level call for telemetry monitoring agent to call */ 
char *
pg_stat_dump_stats_string(void);

/* XXX */
typedef void (*dump_stats_cb)(PgStat_Kind kind, const char *kind_name, Oid dboid, Oid objoid, const char *name, void *body, void *closure);


static void
pgstat_dump_stats(dump_stats_cb callback, void *closure)
{
	PgStat_Kind kind;
	dshash_seq_status hstat;
	PgStatShared_HashEntry *ps;

	struct timespec tstart={0,0}, tend={0,0};
	clock_gettime(CLOCK_MONOTONIC, &tstart);

	pgstat_assert_is_up();

	pgstat_fetch_consistency = PGSTAT_FETCH_CONSISTENCY_NONE;

	elog(NOTICE, "dumping stats contents");


	
	/*
	 * XXX: The following could now be generalized to just iterate over
	 * pgstat_kind_infos instead of knowing about the different kinds of
	 * stats.
	 */
	for (kind=PGSTAT_KIND_FIRST_VALID; kind <= PGSTAT_KIND_LAST; kind++)
		{
			if (pgstat_get_kind_info(kind)->fixed_amount)
				pgstat_snapshot_fixed(kind);
		}
	callback(PGSTAT_KIND_ARCHIVER, pgstat_get_kind_info(PGSTAT_KIND_ARCHIVER)->name, InvalidOid, InvalidOid, NULL, &pgStatLocal.snapshot.archiver, closure);
	callback(PGSTAT_KIND_BGWRITER, pgstat_get_kind_info(PGSTAT_KIND_BGWRITER)->name, InvalidOid, InvalidOid, NULL, &pgStatLocal.snapshot.bgwriter, closure);
	callback(PGSTAT_KIND_CHECKPOINTER, pgstat_get_kind_info(PGSTAT_KIND_CHECKPOINTER)->name, InvalidOid, InvalidOid, NULL, &pgStatLocal.snapshot.checkpointer, closure);
	for (int i = 0 ; i < SLRU_NUM_ELEMENTS; i++ ) {
		callback(PGSTAT_KIND_SLRU, pgstat_get_kind_info(PGSTAT_KIND_SLRU)->name, InvalidOid, InvalidOid, slru_names[i], &pgStatLocal.snapshot.slru[i], closure);
	}
	callback(PGSTAT_KIND_WAL, pgstat_get_kind_info(PGSTAT_KIND_WAL)->name, InvalidOid, InvalidOid, NULL, &pgStatLocal.snapshot.wal, closure);

	/*
	 * Walk through the stats entries
	 */
	dshash_seq_init(&hstat, pgStatLocal.shared_hash, false);
	while ((ps = dshash_seq_next(&hstat)) != NULL)
		{
			PgStatShared_Common *shstats;
			void *body;
			const PgStat_KindInfo *kind_info = NULL;

			CHECK_FOR_INTERRUPTS();

			/* we may have some "dropped" entries not yet removed, skip them */
			Assert(!ps->dropped);
			if (ps->dropped)
				continue;

			shstats = (PgStatShared_Common *) dsa_get_address(pgStatLocal.dsa, ps->body);
			body = pgstat_get_entry_data(ps->key.kind, shstats);

			kind_info = pgstat_get_kind_info(ps->key.kind);

			/* if not dropped the valid-entry refcount should exist */
			Assert(pg_atomic_read_u32(&ps->refcount) > 0);
		
			if (!kind_info->to_serialized_name)
				{
					callback(ps->key.kind, kind_info->name, ps->key.dboid, ps->key.objoid, NULL, body, closure);
				}
			else
				{
					/* stats entry identified by name on disk (e.g. replication slots) */
					NameData	name;
					kind_info->to_serialized_name(shstats, &name);
					callback(ps->key.kind, kind_info->name, InvalidOid, InvalidOid, NameStr(name), body, closure);
				}
		}
	dshash_seq_term(&hstat);

	clock_gettime(CLOCK_MONOTONIC, &tend);
	elog(NOTICE,
		 "pgstat_dump_stats took about %0.3f ms\n",
		 1000 * (((double)tend.tv_sec + 1.0e-9*tend.tv_nsec) -
				 ((double)tstart.tv_sec + 1.0e-9*tstart.tv_nsec))
		 );

}


/* 
 * Lots of work left to do here
 *
 * Prometheus expects double precision floats and all our counters are
 * integers so the best strategy is just to have them wrap at 2^53 to avoid
 * losing precision. Prometheus treats that as a reset so we trade absolute
 * value for precision but since they're counters that's fine.
 *
 * Our timestamps are 64-bit microseconds and Prometheus standard is to use
 * double precision in units of seconds. That loses precision for dates far in
 * the future and past but these stats just have things like the time of the
 * last failure.
 *
 */


#define COUNTER_MASK ((1LL<<DBL_MANT_DIG)-1)

static void emit_counter(const char *name, char *labels, PgStat_Counter counter, void *closure)
{
	StringInfo response = closure;
	int64 val = counter & COUNTER_MASK;

	/* Assert check that we are printed values that don't lose precision in
	 * doubles */
	double valf = (double)val;
	Assert((int64)valf == val);

	if (labels)
		appendStringInfo(response, "%s{%s}=%lu\n", name, labels, val);
	else
		appendStringInfo(response, "%s=%lu\n", name, val);
}	

static void emit_timestamptz(const char *name, char *labels, TimestampTz timestamp, void *closure)
{
	StringInfo response = closure;
	double val = timestamp== 0 ? NAN : (double)timestamp / 1000000  + SECS_PER_DAY*(POSTGRES_EPOCH_JDATE-UNIX_EPOCH_JDATE);

	if (labels)
		appendStringInfo(response, "%s{%s}=%lf\n", name, labels, val);
	else
		appendStringInfo(response, "%s=%lf\n", name, val);
}	

static void
dump_stats_callback(PgStat_Kind kind, const char *kind_name, Oid dboid, Oid objoid, const char *name, void *body, void *closure) {
	if (dboid == InvalidOid && objoid == InvalidOid && name == NULL) {
		elog(WARNING, "stats for \"%s\" with no key?!", kind_name);
	}
	switch(kind) {
		/* stats for variable-numbered objects */
	case PGSTAT_KIND_DATABASE:
	{
		PgStat_StatDBEntry *stats = body;
		/* db=0 represents the background workers apparently? */
		char *labels = dboid==0 ? NULL : psprintf("db=\"%d\"", dboid);

		emit_counter("xact_commit", labels,  stats->n_xact_commit, closure);
		emit_counter("xact_rollback", labels,  stats->n_xact_rollback, closure);
		emit_counter("blks_fetched", labels,  stats->n_blocks_fetched, closure);
		emit_counter("blks_hit", labels,  stats->n_blocks_hit, closure);
		emit_counter("tup_returned", labels,  stats->n_tuples_returned, closure);
		emit_counter("tup_fetched", labels,  stats->n_tuples_fetched, closure);
		emit_counter("tup_inserted", labels,  stats->n_tuples_inserted, closure);
		emit_counter("tup_updated", labels,  stats->n_tuples_updated, closure);
		emit_counter("tup_deleted", labels,  stats->n_tuples_deleted, closure);
		emit_timestamptz("last_autovac_time", labels, stats->last_autovac_time, closure);
		/* Why do we do this?? */
		emit_counter("conflicts",
					 labels,
					 (stats->n_conflict_tablespace +
					  stats->n_conflict_lock +
					  stats->n_conflict_snapshot +
					  stats->n_conflict_bufferpin +
					  stats->n_conflict_startup_deadlock),
					 closure);
		/*
		emit_counter("conflicts_tablespace", labels,  stats->n_conflict_tablespace, closure);
		emit_counter("conflicts_lock", labels,  stats->n_conflict_lock, closure);
		emit_counter("conflicts_snapshot", labels,  stats->n_conflict_snapshot, closure);
		emit_counter("conflicts_bufferpin", labels,  stats->n_conflict_bufferpin, closure);
		emit_counter("conflicts_startup_deadlock", labels,  stats->n_conflict_startup_deadlock, closure);
		*/
		emit_counter("temp_files", labels,  stats->n_temp_files, closure);
		emit_counter("temp_bytes", labels,  stats->n_temp_bytes, closure);
		emit_counter("deadlocks", labels,  stats->n_deadlocks, closure);
		emit_counter("checksum_failures", labels,  stats->n_checksum_failures, closure);
		emit_timestamptz("last_checksum_failure", labels, stats->last_checksum_failure, closure);
		emit_counter("blk_read_time", labels,  stats->n_block_read_time, closure);
		emit_counter("blk_write_time", labels,  stats->n_block_write_time, closure);
		emit_counter("numbackends", labels,  stats->n_sessions, closure);
		emit_counter("session_time", labels,  stats->total_session_time, closure);
		emit_counter("active_time", labels,  stats->total_active_time, closure);
		emit_counter("idle_in_transaction_time", labels,  stats->total_idle_in_xact_time, closure);
		emit_counter("sessions_abandoned", labels,  stats->n_sessions_abandoned, closure);
		emit_counter("sessions_fatal", labels,  stats->n_sessions_fatal, closure);
		emit_counter("sessions_killed", labels,  stats->n_sessions_killed, closure);
		/*emit_timestamptz("stat_reset_timestamp", labels, stats->stat_reset_timestamp, closure);*/
		break;
	}
	case PGSTAT_KIND_RELATION:
	{
		PgStat_StatTabEntry *stats = body;
		/* db=0 represents the shared tables apparently? */
		char *labels = dboid==0
			? psprintf("oid=\"%d\"", objoid)
			: psprintf("db=\"%d\",oid=\"%d\"", dboid,objoid);

		emit_counter("numscans", labels, stats->numscans, closure);
		emit_counter("tuples_returned", labels, stats->tuples_returned, closure);
		emit_counter("tuples_fetched", labels, stats->tuples_fetched, closure);
		emit_counter("tuples_inserted", labels, stats->tuples_inserted, closure);
		emit_counter("tuples_updated", labels, stats->tuples_updated, closure);
		emit_counter("tuples_deleted", labels, stats->tuples_deleted, closure);
		emit_counter("tuples_hot_updated", labels, stats->tuples_hot_updated, closure);
		emit_counter("n_live_tuples", labels, stats->n_live_tuples, closure);
		emit_counter("n_dead_tuples", labels, stats->n_dead_tuples, closure);
		emit_counter("changes_since_analyze", labels, stats->changes_since_analyze, closure);
		emit_counter("inserts_since_vacuum", labels, stats->inserts_since_vacuum, closure);
		emit_counter("blocks_fetched", labels, stats->blocks_fetched, closure);
		emit_counter("blocks_hit", labels, stats->blocks_hit, closure);
		emit_timestamptz("vacuum_timestamp", labels, stats->vacuum_timestamp, closure);
		emit_counter("vacuum_count", labels, stats->vacuum_count, closure);
		emit_timestamptz("autovac_vacuum_timestamp", labels, stats->autovac_vacuum_timestamp, closure);
		emit_counter("autovac_vacuum_count", labels, stats->autovac_vacuum_count, closure);
		emit_timestamptz("analyze_timestamp", labels, stats->analyze_timestamp, closure);
		emit_counter("analyze_count", labels, stats->analyze_count, closure);
		emit_timestamptz("autovac_analyze_timestamp", labels, stats->autovac_analyze_timestamp, closure);
		emit_counter("autovac_analyze_count", labels, stats->autovac_analyze_count, closure);
		break;
	}
	case PGSTAT_KIND_FUNCTION:
	{
		PgStat_StatFuncEntry *stats = body;
		char *labels = psprintf("db=\"%d\",oid=\"%d\"", dboid,objoid);

		emit_counter("calls", labels, stats->f_numcalls, closure);
		emit_counter("total_time", labels, stats->f_total_time, closure);
		emit_counter("self_time", labels, stats->f_self_time, closure);
		break;
	}
	case PGSTAT_KIND_REPLSLOT:
	{
		PgStat_StatReplSlotEntry *stats = body;
		char *labels = psprintf("slot_name=\"%s\"", name);

		emit_counter("spill_txns", labels, stats->spill_txns, closure);
		emit_counter("spill_count", labels, stats->spill_count, closure);
		emit_counter("spill_bytes", labels, stats->spill_bytes, closure);
		emit_counter("stream_txns", labels, stats->stream_txns, closure);
		emit_counter("stream_count", labels, stats->stream_count, closure);
		emit_counter("stream_bytes", labels, stats->stream_bytes, closure);
		emit_counter("total_txns", labels, stats->total_txns, closure);
		emit_counter("total_bytes", labels, stats->total_bytes, closure);
		/*emit_timestamptz("stat_reset_timestamp", labels, stats->stat_reset_timestamp, closure);*/
		break;
	}
	case PGSTAT_KIND_SUBSCRIPTION:
	{
		PgStat_StatSubEntry *stats = body;
		char *labels = psprintf("db=\"%d\",oid=\"%d\"", dboid,objoid);

		emit_counter("apply_error_count", labels, stats->apply_error_count, closure);
		emit_counter("sync_error_count", labels, stats->sync_error_count, closure);
		/*emit_timestamptz("stat_reset_timestamp", labels, stats->stat_reset_timestamp, closure);*/
		break;
	}
		
	/* stats for fixed-numbered objects -- these are actually invoked
	 * explicitly from dump_stats on the snapshot objects rather than through
	 * the dshash iteration */
	case PGSTAT_KIND_ARCHIVER:
	{
		PgStat_ArchiverStats *stats = body;
		char *labels = NULL;

		emit_counter("archived_count", labels, stats->archived_count, closure);
		/* last_archived_wal*/
		emit_timestamptz("last_archived_timestamp", labels, stats->last_archived_timestamp, closure);
		emit_counter("failed_count", labels, stats->failed_count, closure);
		/*  last_failed_wal */
		emit_timestamptz("last_failed_timestamp", labels, stats->last_failed_timestamp, closure);
		/*emit_timestamptz("stat_reset_timestamp", labels, stats->stat_reset_timestamp, closure);*/
		break;
	}
	case PGSTAT_KIND_BGWRITER:
	{
		PgStat_BgWriterStats *stats = body;
		char *labels = NULL;

		emit_counter("buffers_clean", labels, stats->buf_written_clean, closure);
		emit_counter("maxwritten_clean", labels, stats->maxwritten_clean, closure);
		emit_counter("buffers_alloc", labels, stats->buf_alloc, closure);
		/*emit_timestamptz("stat_reset_timestamp", labels, stats->stat_reset_timestamp, closure);*/
		break;
	}
	case PGSTAT_KIND_CHECKPOINTER:
	{
		PgStat_CheckpointerStats *stats = body;
		char *labels = NULL;

		emit_counter("checkpoints_timed", labels, stats->timed_checkpoints, closure);
		emit_counter("checkpoints_req", labels, stats->requested_checkpoints, closure);
		emit_counter("checkpoint_write_time", labels, stats->checkpoint_write_time, closure);
		emit_counter("checkpoint_sync_time", labels, stats->checkpoint_sync_time, closure);
		emit_counter("buffers_checkpoint", labels, stats->buf_written_checkpoints, closure);
		emit_counter("buffers_backend", labels, stats->buf_written_backend, closure);
		emit_counter("buffers_backend_fsync", labels, stats->buf_fsync_backend, closure);
		break;
	}
	case PGSTAT_KIND_SLRU:
	{
		PgStat_SLRUStats *stats = body;
		char *labels = psprintf("slru=\"%s\"", name);

		emit_counter("blks_zeroed", labels, stats->blocks_zeroed, closure);
		emit_counter("blks_hit", labels, stats->blocks_hit, closure);
		emit_counter("blks_read", labels, stats->blocks_read, closure);
		emit_counter("blks_written", labels, stats->blocks_written, closure);
		emit_counter("blks_exists", labels, stats->blocks_exists, closure);
		emit_counter("flushes", labels, stats->flush, closure);
		emit_counter("truncates", labels, stats->truncate, closure);
		/*emit_timestamptz("stat_reset_timestamp", labels, stats->stat_reset_timestamp, closure);*/
		break;
	}
	case PGSTAT_KIND_WAL:
	{
		PgStat_WalStats *stats = body;
		char *labels=NULL;

		emit_counter("wal_records", labels, stats->wal_records, closure);
		emit_counter("wal_fpi", labels, stats->wal_fpi, closure);
		emit_counter("wal_bytes", labels, stats->wal_bytes, closure);
		emit_counter("wal_buffers_full", labels, stats->wal_buffers_full, closure);
		emit_counter("wal_write", labels, stats->wal_write, closure);
		emit_counter("wal_sync", labels, stats->wal_sync, closure);
		emit_counter("wal_write_time", labels, stats->wal_write_time, closure);
		emit_counter("wal_sync_time", labels, stats->wal_sync_time, closure);
		/*emit_timestamptz("stat_reset_timestamp", labels, stats->stat_reset_timestamp, closure);*/
		break;
	}
		
	case PGSTAT_KIND_INVALID:
		elog(WARNING, "Invalid stats object type found");
		break;
	}
}

PG_FUNCTION_INFO_V1(pg_stat_dump_stats);

Datum
pg_stat_dump_stats(PG_FUNCTION_ARGS)
{
	StringInfo response = makeStringInfo();
	pgstat_dump_stats(&dump_stats_callback, response);

	/* Test overflow behaviour, the emitted values should be distinct and wrap around to 0 */
	emit_counter("test_value", "value=0", 0, response);
	emit_counter("test_value", "value=1<<53-2", (1LL<<53)-2, response);
	emit_counter("test_value", "value=1<<53-1", (1LL<<53)-1, response);
	emit_counter("test_value", "value=1<<53", 1LL<<53, response);

	elog(WARNING, "stats:\n%s", response->data);
	elog(WARNING, "DBL_MANT_DIG=%d COUNTER_MASK=%llx", DBL_MANT_DIG, COUNTER_MASK);

	PG_RETURN_INT32(response->len);
}

char *
pg_stat_dump_stats_string(void)
{
	StringInfo response = makeStringInfo();
	pgstat_dump_stats(&dump_stats_callback, response);
	return response->data;
}

Reply via email to