Hi All, I faced a few times a situation where a long running query is actually including the time the backend is waiting for the frontend to fetch all the rows (see [1] for details). See a sample code fe-time.c and its comments in attachment to reproduce this behavior.
There's no simple way today to pinpoint the problem in production without advance interactive auditing, and/or using system tools. After studying the problem, I believe it boil down to track the wait event ClientWrite, so I ended up on this thread. You might catch some mismatching times thanks to auto_explain as well. Using the fe-time.c demo code with the following command: PGDATABASE=postgres PGHOST=::1 time ./fe-time 100 The frontend time is 10s, the query time reported is 3228.631ms, but last row has been produced after 20.672ms: LOG: duration: 3228.631 ms plan: Query Text: SELECT * FROM pgbench_accounts Seq Scan on pgbench_accounts (time=0.005..20.672 rows=100000 loops=1) (Note that in contrast with localhost, through the unix socket, the backend reported query time is always really close to 10s). I re-based the existing patch (see in attachment), to look at the ClientWrite for this exact query: # SELECT wait_event, calls, times FROM pg_stat_get_waitaccum(NULL) WHERE wait_event = 'ClientWrite'; wait_event | calls | times -------------+-------+--------- ClientWrite | 4 | 3132266 The "time" is expressed as µs in the patch, so 3132.266ms of the total 3228.631ms is spent sending the result to the frontend. I'm not sure where are the missing 75ms. The pg_wait_sampling extension might help but it requires a production restart to install, then enable it. Whatever if the solution is sampling or cumulative, an in-core and hot-switchable solution would be much more convenient. But anyway, looking at pg_wait_sampling, we have a clear suspect as well for the later query run: # SELECT event, count FROM pg_wait_sampling_profile WHERE queryid=4045741516911800313; event | count -------------+------- ClientWrite | 309 The default profil period of pg_wait_sampling being 10ms, we can roughly estimate the ClientWrite around 3090ms. Note that this is close enough because we know 3132266µs has been accumulated among only 4 large wait events. Finishing bellow. On Mon, 3 Aug 2020 00:00:40 +0200 Daniel Gustafsson <dan...@yesql.se> wrote: > > On 31 Jul 2020, at 07:23, imai.yoshik...@fujitsu.com wrote: > > > >> This patch fails to apply to HEAD, please submit a rebased version. I've > >> marked this as as Waiting on Author. Please, find in attachment a rebase of both patches. I did some small editing on the way. I didn't bench them. I'm not sure this patch is the best approach though. Receive it as a motivation to keep up with this discussion. As I wrote, whatever if the solution is sampling or cumulative, an in-core and hot-switchable solution would be much more convenient. The fact is that this patch was already available and ready to keep up with a discussion. Collecting and summing all wait events from all backends in the same place forbid to track precisely wait events from a specific backends. Especially on a busy system where numbers can quickly be buried by all other activities around. I wonder if wait events should only be accumulated on backend side, making possible to enable/disable them on the fly and to collect some reports eg. in logs or to output. Most of the code from these patch could be recycled in a simpler patch implementing this. Thoughts? > > Sorry for my absence. Unfortunately I couldn't have time to work on this > > patch in this cf. I believe I will be back in next cf, work on this patch > > and also review other patches. > > No worries, it happens. Since the thread has stalled and there is no updated > patch I've marked this entry Returned with Feedback. Please feel free to > re-open a new CF entry if you return to this patch. I volunteer to be a reviewer on this patch. Imai-san, do you agree to add it as new CF entry? Regards, [1]: Last time I had such situation was few weeks ago. A customer was reporting a query being randomly slow, running bellow 100ms most of the time and sometime hitting 28s. Long story short, the number of row was the same (10-15k), but the result set size was 9x bigger (1MB vs 9MB). As the same query was running fine from psql, I suspected the frontend was somehow saturated. Tcpdump helped me to compute that the throughput fall to 256kB/s after the first 2MB of data transfert with a very narrow TCP window. I explained to the customer their app probably doesn't pull the rows fast enough and that some buffers were probably saturated on the frontend side, waiting for the app and slowing down the whole transfert. Devels fixed the problem by moving away two fields transformations (unaccent) from their loop fetching the rows.
>From 88c2779679c5c9625ca5348eec0543daab5ccab4 Mon Sep 17 00:00:00 2001 From: Jehan-Guillaume de Rorthais <j...@dalibo.com> Date: Tue, 1 Jun 2021 13:25:57 +0200 Subject: [PATCH 1/2] Add pg_stat_waitaccum view. pg_stat_waitaccum shows counts and duration of each wait events. Each backend/backgrounds counts and measures the time of wait event in every pgstat_report_wait_start and pgstat_report_wait_end. They store those info into their local variables and send to Statistics Collector. We can get those info via Statistics Collector. For reducing overhead, I implemented statistic hash instead of dynamic hash. I also implemented track_wait_timing which determines wait event duration is collected or not. On windows, this function might be not worked correctly, because now it initializes local variables in pg_stat_init which is not passed to fork processes on windows. --- src/backend/postmaster/pgstat.c | 305 +++++++++++++++++- src/backend/storage/lmgr/lwlock.c | 19 ++ src/backend/utils/activity/wait_event.c | 97 +++++- src/backend/utils/adt/pgstatfuncs.c | 80 +++++ src/backend/utils/misc/guc.c | 10 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/catalog/pg_proc.dat | 9 + src/include/pgstat.h | 68 +++- src/include/storage/lwlock.h | 1 + src/include/storage/proc.h | 1 + src/include/utils/wait_event.h | 57 +--- src/test/regress/expected/rules.out | 5 + 12 files changed, 599 insertions(+), 54 deletions(-) diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index b0d07c0e0b..b4a84a2f62 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -114,6 +114,7 @@ */ bool pgstat_track_counts = false; int pgstat_track_functions = TRACK_FUNC_OFF; +bool pgstat_track_wait_timing = false; /* ---------- * Built from GUC parameter @@ -177,6 +178,10 @@ static time_t last_pgstat_start_time; static bool pgStatRunningInCollector = false; +WAHash *wa_hash; + +instr_time waitStart; + /* * Structures in which backends store per-table info that's waiting to be * sent to the collector. @@ -276,6 +281,7 @@ static HTAB *pgStatDBHash = NULL; */ static PgStat_ArchiverStats archiverStats; static PgStat_GlobalStats globalStats; +static PgStat_WaitAccumStats waitAccumStats; static PgStat_WalStats walStats; static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; static HTAB *replSlotStatHash = NULL; @@ -305,6 +311,9 @@ static pid_t pgstat_forkexec(void); NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_noreturn(); +static void pgstat_init_waitaccum_hash(WAHash **hash); +static PgStat_WaitAccumEntry *pgstat_add_wa_entry(WAHash *hash, uint32 key); + static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create); static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create); @@ -348,6 +357,7 @@ static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len); static void pgstat_recv_anl_ancestors(PgStat_MsgAnlAncestors *msg, int len); static void pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len); static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len); +static void pgstat_recv_waitaccum(PgStat_MsgWaitAccum *msg, int len); static void pgstat_recv_wal(PgStat_MsgWal *msg, int len); static void pgstat_recv_slru(PgStat_MsgSLRU *msg, int len); static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len); @@ -359,6 +369,25 @@ static void pgstat_recv_connstat(PgStat_MsgConn *msg, int len); static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len); static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); +PgStat_WaitAccumEntry * +pgstat_get_wa_entry(WAHash *hash, uint32 key) +{ + WAEntry *current; + int bucket = key % WA_BUCKET_SIZE; + + current = hash->buckets[bucket]; + + while (current != NULL) + { + if (current->key == key) + return current->entry; + + current = current->next; + } + + return NULL; +} + /* ------------------------------------------------------------ * Public functions called from postmaster follow * ------------------------------------------------------------ @@ -635,6 +664,8 @@ retry2: /* Now that we have a long-lived socket, tell fd.c about it. */ ReserveExternalFD(); + pgstat_init_waitaccum_hash(&wa_hash); + return; startup_failed: @@ -657,6 +688,75 @@ startup_failed: SetConfigOption("track_counts", "off", PGC_INTERNAL, PGC_S_OVERRIDE); } +static PgStat_WaitAccumEntry * +pgstat_add_wa_entry(WAHash *hash, uint32 key) +{ + WAEntry *prev; + WAEntry *new; + int bucket = key % WA_BUCKET_SIZE; + + prev = hash->buckets[bucket]; + + while (prev != NULL && prev->next != NULL) + prev = prev->next; + + new = &hash->entries[hash->entry_num++]; + new->key = key; + new->entry = MemoryContextAllocZero(TopMemoryContext, (sizeof(PgStat_WaitAccumEntry))); + + if (prev != NULL) + prev->next = new; + else + hash->buckets[bucket] = new; + + return new->entry; +} + +static void +pgstat_init_waitaccum_entry(WAHash *hash, uint32 wait_event_info) +{ + PgStat_WaitAccumEntry *entry; + + entry = pgstat_add_wa_entry(hash, wait_event_info); + entry->wait_event_info = wait_event_info; +} + +static void +pgstat_init_waitaccum_hash(WAHash **hash) +{ + uint32 i; + int last_tranche_id; + + *hash = MemoryContextAllocZero(TopMemoryContext, sizeof(WAHash)); + + last_tranche_id = LWLockGetLastTrancheId(); + for (i = PG_WAIT_LWLOCK + 1; i <= (PG_WAIT_LWLOCK | last_tranche_id); i++) + pgstat_init_waitaccum_entry(*hash, i); + + for (i = (PG_WAIT_LOCK | LOCKTAG_RELATION); i <= (PG_WAIT_LOCK | LOCKTAG_LAST_TYPE); i++) + pgstat_init_waitaccum_entry(*hash, i); + + for (i = PG_WAIT_BUFFER_PIN; i <= PG_WAIT_BUFFER_PIN; i++) + pgstat_init_waitaccum_entry(*hash, i); + + for (i = PG_WAIT_ACTIVITY; i <= PG_WAIT_ACTIVITY_LAST_TYPE; i++) + pgstat_init_waitaccum_entry(*hash, i); + + for (i = PG_WAIT_CLIENT; i <= PG_WAIT_CLIENT_LAST_TYPE; i++) + pgstat_init_waitaccum_entry(*hash, i); + + for (i = PG_WAIT_IPC; i <= PG_WAIT_IPC_LAST_TYPE; i++) + pgstat_init_waitaccum_entry(*hash, i); + + for (i = PG_WAIT_TIMEOUT; i <= PG_WAIT_TIMEOUT_LAST_TYPE; i++) + pgstat_init_waitaccum_entry(*hash, i); + + for (i = PG_WAIT_IO; i <= PG_WAIT_IO_LAST_TYPE; i++) + pgstat_init_waitaccum_entry(*hash, i); + + // FIXME: support extensions stuff +} + /* * subroutine for pgstat_reset_all */ @@ -960,8 +1060,11 @@ pgstat_report_stat(bool disconnect) /* Send WAL statistics */ pgstat_send_wal(true); - /* Finally send SLRU statistics */ + /* Send SLRU statistics */ pgstat_send_slru(); + + /* Finally send wait accumulative statistics */ + pgstat_send_waitaccum(); } /* @@ -1453,6 +1556,8 @@ pgstat_reset_shared_counters(const char *target) msg.m_resettarget = RESET_BGWRITER; else if (strcmp(target, "wal") == 0) msg.m_resettarget = RESET_WAL; + else if (strcmp(target, "waitaccum") == 0) + msg.m_resettarget = RESET_WAITACCUM; else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -2895,6 +3000,22 @@ pgstat_fetch_replslot(NameData slotname) return pgstat_get_replslot_entry(slotname, false); } +/* + * --------- + * pgstat_fetch_stat_waitaccum() - + * + * Support function for the SQL-callable pgstat* functions. Returns + * a pointer to the wait accum statistics struct. + * --------- + */ +PgStat_WaitAccumStats * +pgstat_fetch_stat_waitaccum(void) +{ + backend_read_statsfile(); + + return &waitAccumStats; +} + /* * Shut down a single backend's statistics reporting at process exit. * @@ -3172,6 +3293,52 @@ pgstat_send_slru(void) } } +/* ---------- + * pgstat_send_waitaccum() - + * + * ---------- + */ +void +pgstat_send_waitaccum() +{ + PgStat_MsgWaitAccum msg; + PgStat_WaitAccumEntry *entry; + int i; + + if (wa_hash == NULL) + return; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_WAITACCUM); + msg.m_nentries = 0; + + for (i = 0; i < wa_hash->entry_num; i++) + { + entry = wa_hash->entries[i].entry; + + /* Send only wait events that have occurred. */ + if (entry->calls == 0) + continue; + + /* + * Prepare and send the message + */ + memcpy(&msg.m_entry[msg.m_nentries], entry, sizeof(PgStat_WaitAccumEntry)); + if (++msg.m_nentries >= PGSTAT_NUM_WAITACCUMENTRIES) + { + pgstat_send(&msg, offsetof(PgStat_MsgWaitAccum, m_entry[0]) + + msg.m_nentries * sizeof(PgStat_WaitAccumEntry)); + msg.m_nentries = 0; + } + + /* Clear wait events information. */ + entry->calls = 0; + INSTR_TIME_SET_ZERO(entry->times); + } + + if (msg.m_nentries > 0) + pgstat_send(&msg, offsetof(PgStat_MsgWaitAccum, m_entry[0]) + + msg.m_nentries * sizeof(PgStat_WaitAccumEntry)); +} /* ---------- * PgstatCollectorMain() - @@ -3390,6 +3557,10 @@ PgstatCollectorMain(int argc, char *argv[]) pgstat_recv_slru(&msg.msg_slru, len); break; + case PGSTAT_MTYPE_WAITACCUM: + pgstat_recv_waitaccum(&msg.msg_waitaccum, len); + break; + case PGSTAT_MTYPE_FUNCSTAT: pgstat_recv_funcstat(&msg.msg_funcstat, len); break; @@ -3605,7 +3776,6 @@ pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create) return result; } - /* ---------- * pgstat_write_statsfiles() - * Write the global statistics file, as well as requested DB files. @@ -3629,7 +3799,7 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) int32 format_id; const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; - int rc; + int rc, i; elog(DEBUG2, "writing stats file \"%s\"", statfile); @@ -3725,6 +3895,23 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) } } + /* + * Walk through the waitaccum hash. + */ + for (i = 0; i < waitAccumStats.hash->entry_num; i++) + { + PgStat_WaitAccumEntry *entry = waitAccumStats.hash->entries[i].entry; + + /* Write only wait events that have occurred. */ + if (entry->calls == 0) + continue; + + /* Write out the wait event entry */ + fputc('W', fpout); + rc = fwrite(entry, sizeof(PgStat_WaitAccumEntry), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + } + /* * No more output to be done. Close the temp file and replace the old * pgstat.stat with it. The ferror() check replaces testing for error @@ -3957,6 +4144,8 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) memset(&archiverStats, 0, sizeof(archiverStats)); memset(&walStats, 0, sizeof(walStats)); memset(&slruStats, 0, sizeof(slruStats)); + waitAccumStats.hash = MemoryContextAllocZero(pgStatLocalContext, + sizeof(WAHash)); /* * Set the current timestamp (will be kept only in case we can't load an @@ -4184,6 +4373,44 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) break; } + /* + * 'W' A PgStat_WaitAccumEntry struct describing a Wait + * event accumulator follows. + */ + case 'W': + { + PgStat_WaitAccumEntry *entry; + PgStat_WaitAccumEntry buf; + WAHash *hash = waitAccumStats.hash; + + if (fread(&buf, 1, sizeof(PgStat_WaitAccumEntry), fpin) + != sizeof(PgStat_WaitAccumEntry)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + entry = pgstat_get_wa_entry(hash, buf.wait_event_info); + + if (entry) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + /* + * Add to the wait event hash + */ + entry = pgstat_add_wa_entry(hash, buf.wait_event_info); + memcpy(entry, &buf, sizeof(PgStat_WaitAccumEntry)); + + break; + } + case 'E': goto done; @@ -4396,6 +4623,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, PgStat_WalStats myWalStats; PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS]; PgStat_StatReplSlotEntry myReplSlotStats; + PgStat_WaitAccumEntry myWaitAccumStats; FILE *fpin; int32 format_id; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; @@ -4526,6 +4754,26 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, } break; + /* + * 'W' A PgStat_WaitAccumEntry struct describing a Wait + * event accumulator follows. + */ + case 'W': + { + if (fread(&myWaitAccumStats, 1, sizeof(PgStat_WaitAccumEntry), fpin) + != sizeof(PgStat_WaitAccumEntry)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + FreeFile(fpin); + return false; + } + + break; + } + + case 'E': goto done; @@ -5071,6 +5319,20 @@ pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len) memset(&walStats, 0, sizeof(walStats)); walStats.stat_reset_timestamp = GetCurrentTimestamp(); } + else if (msg->m_resettarget == RESET_WAITACCUM) + { + PgStat_WaitAccumEntry *entry; + WAHash *hash = waitAccumStats.hash; + int i; + + for (i = 0; i < hash->entry_num; i++) + { + entry = hash->entries[i].entry; + + entry->calls = 0; + INSTR_TIME_SET_ZERO(entry->times); + } + } /* * Presumably the sender of this message validated the target, don't @@ -5394,6 +5656,43 @@ pgstat_recv_slru(PgStat_MsgSLRU *msg, int len) slruStats[msg->m_index].truncate += msg->m_truncate; } +/* ---------- + * pgstat_recv_waitaccum() - + * + * Process a WAITACCUM message. + * ---------- + */ +static void +pgstat_recv_waitaccum(PgStat_MsgWaitAccum *msg, int len) +{ + PgStat_WaitAccumEntry *m_entry = &(msg->m_entry[0]); + PgStat_WaitAccumEntry *entry; + WAHash *hash = waitAccumStats.hash; + int i; + + /* + * Process all function entries in the message. + */ + for (i = 0; i < msg->m_nentries; i++, m_entry++) + { + entry = pgstat_get_wa_entry(hash, m_entry->wait_event_info); + + if (!entry) + { + entry = pgstat_add_wa_entry(hash, m_entry->wait_event_info); + memcpy(entry, m_entry, sizeof(PgStat_WaitAccumEntry)); + } + else + { + /* + * Otherwise add the values to the existing entry. + */ + entry->calls += m_entry->calls; + INSTR_TIME_ADD(entry->times, m_entry->times); + } + } +} + /* ---------- * pgstat_recv_recoveryconflict() - * diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 55b9d7970e..e9a120805b 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -634,6 +634,25 @@ LWLockNewTrancheId(void) return result; } +/* + * Get a last tranche ID. + */ +int +LWLockGetLastTrancheId(void) +{ + int result; + int *LWLockCounter; + + Assert(!lock_named_request_allowed); + + LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int)); + SpinLockAcquire(ShmemLock); + result = *LWLockCounter; + SpinLockRelease(ShmemLock); + + return result; +} + /* * Register a dynamic tranche name in the lookup table of the current process. * diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index 6baf67740c..392a783fae 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -25,6 +25,7 @@ #include "storage/lmgr.h" /* for GetLockNameFromTagType */ #include "storage/lwlock.h" /* for GetLWLockIdentifier */ #include "utils/wait_event.h" +#include "pgstat.h" static const char *pgstat_get_wait_activity(WaitEventActivity w); @@ -32,7 +33,8 @@ static const char *pgstat_get_wait_client(WaitEventClient w); static const char *pgstat_get_wait_ipc(WaitEventIPC w); static const char *pgstat_get_wait_timeout(WaitEventTimeout w); static const char *pgstat_get_wait_io(WaitEventIO w); - +static inline void pgstat_report_waitaccum_start(); +static inline void pgstat_report_waitaccum_end(); static uint32 local_my_wait_event_info; uint32 *my_wait_event_info = &local_my_wait_event_info; @@ -732,3 +734,96 @@ pgstat_get_wait_io(WaitEventIO w) return event_name; } + + +static inline void +pgstat_report_waitaccum_start() +{ + if (wa_hash == NULL) + return; + + if (pgstat_track_wait_timing) + { + INSTR_TIME_SET_CURRENT(waitStart); + } +} + +static inline void +pgstat_report_waitaccum_end() +{ + PgStat_WaitAccumEntry *entry; + instr_time diff; + + if (wa_hash == NULL) + return; + + if (pgstat_track_wait_timing) + { + INSTR_TIME_SET_CURRENT(diff); + INSTR_TIME_SUBTRACT(diff, waitStart); + } + + entry = pgstat_get_wa_entry(wa_hash, *my_wait_event_info); + + if (!entry) + { + return; + } + + entry->calls++; + if (pgstat_track_wait_timing) + { + INSTR_TIME_ADD(entry->times, diff); + } +} + + +/* ---------- + * pgstat_report_wait_start() - + * + * Called from places where server process needs to wait. This is called + * to report wait event information. The wait information is stored + * as 4-bytes where first byte represents the wait event class (type of + * wait, for different types of wait, refer WaitClass) and the next + * 3-bytes represent the actual wait event. Currently 2-bytes are used + * for wait event which is sufficient for current usage, 1-byte is + * reserved for future usage. + * + * Historically we used to make this reporting conditional on + * pgstat_track_activities, but the check for that seems to add more cost + * than it saves. + * + * my_wait_event_info initially points to local memory, making it safe to + * call this before MyProc has been initialized. + * ---------- + */ +inline void +pgstat_report_wait_start(uint32 wait_event_info) +{ + /* + * Since this is a four-byte field which is always read and written as + * four-bytes, updates are atomic. + */ + *(volatile uint32 *) my_wait_event_info = wait_event_info; + + //FIXME: recent patch to speed up this call. + pgstat_report_waitaccum_start(); +} + +/* ---------- + * pgstat_report_wait_end() - + * + * Called to report end of a wait. + * ---------- + */ +inline void +pgstat_report_wait_end(void) +{ + //FIXME: recent patch to speed up this call. + pgstat_report_waitaccum_end(); + + /* see pgstat_report_wait_start() */ + *(volatile uint32 *) my_wait_event_info = 0; + + +} diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 14056f5347..56afd20f22 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2380,3 +2380,83 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } + +Datum +pg_stat_get_waitaccum(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_WAITACCUM_COLS 4 + PgStat_WaitAccumStats *waitaccum_stats; + PgStat_WaitAccumEntry *entry; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + int i; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + /* Get statistics about the waitaccum process */ + waitaccum_stats = pgstat_fetch_stat_waitaccum(); + + for (i = 0; i < waitaccum_stats->hash->entry_num; i++) + { + Datum values[PG_STAT_GET_WAITACCUM_COLS]; + bool nulls[PG_STAT_GET_WAITACCUM_COLS]; + const char *wait_event_type = NULL; + const char *wait_event = NULL; + + /* Initialise values and NULL flags arrays */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + entry = waitaccum_stats->hash->entries[i].entry; + + /* Fill values and NULLs */ + { + uint32 raw_wait_event; + + raw_wait_event = UINT32_ACCESS_ONCE(entry->wait_event_info); + wait_event_type = pgstat_get_wait_event_type(raw_wait_event); + wait_event = pgstat_get_wait_event(raw_wait_event); + } + + values[0] = CStringGetTextDatum(wait_event_type); + + values[1] = CStringGetTextDatum(wait_event); + + values[2] = Int64GetDatum(entry->calls); + + values[3] = UInt64GetDatum(INSTR_TIME_GET_MICROSEC(entry->times)); + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 68b62d523d..3260ab77d7 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1546,6 +1546,16 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"track_wait_timing", PGC_SUSET, STATS_COLLECTOR, + gettext_noop("Collects timing statistics for wait events."), + NULL + }, + &pgstat_track_wait_timing, + false, + NULL, NULL, NULL + }, + { {"update_process_title", PGC_SUSET, PROCESS_TITLE, gettext_noop("Updates the process title to show the active SQL command."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index ddbb6dc2be..f6d0a5af8d 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -599,6 +599,7 @@ #track_counts = on #track_io_timing = off #track_wal_io_timing = off +#track_wait_timing = off #track_functions = none # none, pl, all #stats_temp_directory = 'pg_stat_tmp' diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index acbcae4607..6c22bddc8e 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5284,6 +5284,15 @@ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,query_id}', prosrc => 'pg_stat_get_activity' }, +{ oid => '8316', + descr => 'statistics: information about accumulative data of wait event', + proname => 'pg_stat_get_waitaccum', prorows => '200', proisstrict => 'f', + proretset => 't', provolatile => 's', proparallel => 'r', + prorettype => 'record', proargtypes => 'int4', + proallargtypes => '{int4,text,text,int8,int8}', + proargmodes => '{i,o,o,o,o}', + proargnames => '{pid,wait_event_type,wait_event,calls,times}', + prosrc => 'pg_stat_get_waitaccum' }, { oid => '3318', descr => 'statistics: information about progress of backends running maintenance command', proname => 'pg_stat_get_progress_info', prorows => '100', proretset => 't', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 9612c0a6c2..5564907df8 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -74,6 +74,7 @@ typedef enum StatMsgType PGSTAT_MTYPE_BGWRITER, PGSTAT_MTYPE_WAL, PGSTAT_MTYPE_SLRU, + PGSTAT_MTYPE_WAITACCUM, PGSTAT_MTYPE_FUNCSTAT, PGSTAT_MTYPE_FUNCPURGE, PGSTAT_MTYPE_RECOVERYCONFLICT, @@ -137,14 +138,15 @@ typedef enum PgStat_Shared_Reset_Target { RESET_ARCHIVER, RESET_BGWRITER, - RESET_WAL + RESET_WAL, + RESET_WAITACCUM } PgStat_Shared_Reset_Target; /* Possible object types for resetting single counters */ typedef enum PgStat_Single_Reset_Type { RESET_TABLE, - RESET_FUNCTION + RESET_FUNCTION, } PgStat_Single_Reset_Type; /* ------------------------------------------------------------ @@ -539,6 +541,48 @@ typedef struct PgStat_MsgReplSlot PgStat_Counter m_total_bytes; } PgStat_MsgReplSlot; +/* ---------- + * PgStat_WaitAccumEntry Entry in backend/background's per-wait_event_info hash table + * ---------- + */ +typedef struct PgStat_WaitAccumEntry +{ + uint32 wait_event_info; + PgStat_Counter calls; + instr_time times; +} PgStat_WaitAccumEntry; + +typedef struct WAEntry +{ + int key; + PgStat_WaitAccumEntry *entry; + struct WAEntry *next; +} WAEntry; + +#define WA_BUCKET_SIZE 461 + +typedef struct WAHash +{ + WAEntry entries[WA_BUCKET_SIZE]; + WAEntry *buckets[WA_BUCKET_SIZE]; + int entry_num; +} WAHash; + +/* ---------- + * PgStat_MsgWaitAccum Sent by backend/background's process to update statistics. + * ---------- + */ +#define PGSTAT_NUM_WAITACCUMENTRIES \ + ((PGSTAT_MSG_PAYLOAD - sizeof(int)) \ + / sizeof(PgStat_WaitAccumEntry)) + +typedef struct PgStat_MsgWaitAccum +{ + PgStat_MsgHdr m_hdr; + + int m_nentries; + PgStat_WaitAccumEntry m_entry[PGSTAT_NUM_WAITACCUMENTRIES]; +} PgStat_MsgWaitAccum; /* ---------- * PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict @@ -676,7 +720,6 @@ typedef struct PgStat_MsgConn SessionEndType m_disconnect; } PgStat_MsgConn; - /* ---------- * PgStat_Msg Union over all possible messages. * ---------- @@ -702,6 +745,7 @@ typedef union PgStat_Msg PgStat_MsgBgWriter msg_bgwriter; PgStat_MsgWal msg_wal; PgStat_MsgSLRU msg_slru; + PgStat_MsgWaitAccum msg_waitaccum; PgStat_MsgFuncstat msg_funcstat; PgStat_MsgFuncpurge msg_funcpurge; PgStat_MsgRecoveryConflict msg_recoveryconflict; @@ -721,7 +765,7 @@ typedef union PgStat_Msg * ------------------------------------------------------------ */ -#define PGSTAT_FILE_FORMAT_ID 0x01A5BCA2 +#define PGSTAT_FILE_FORMAT_ID 0x01A5BCA3 /* ---------- * PgStat_StatDBEntry The collector's data per database @@ -909,6 +953,15 @@ typedef struct PgStat_StatReplSlotEntry } PgStat_StatReplSlotEntry; +/* + * WaitAccum statistics kept in the stats collector + */ +typedef struct PgStat_WaitAccumStats +{ + WAHash *hash; +} PgStat_WaitAccumStats; + + /* * Working state needed to accumulate per-function-call timing statistics. */ @@ -925,6 +978,8 @@ typedef struct PgStat_FunctionCallUsage instr_time f_start; } PgStat_FunctionCallUsage; +extern instr_time waitStart; +extern WAHash *wa_hash; /* ---------- * GUC parameters @@ -932,6 +987,7 @@ typedef struct PgStat_FunctionCallUsage */ extern PGDLLIMPORT bool pgstat_track_counts; extern PGDLLIMPORT int pgstat_track_functions; +extern PGDLLIMPORT bool pgstat_track_wait_timing; extern char *pgstat_stat_directory; extern char *pgstat_stat_tmpname; extern char *pgstat_stat_filename; @@ -1092,6 +1148,7 @@ extern void pgstat_twophase_postabort(TransactionId xid, uint16 info, extern void pgstat_send_archiver(const char *xlog, bool failed); extern void pgstat_send_bgwriter(void); extern void pgstat_send_wal(bool force); +extern void pgstat_send_waitaccum(void); /* ---------- * Support functions for the SQL-callable functions to @@ -1106,6 +1163,7 @@ extern PgStat_GlobalStats *pgstat_fetch_global(void); extern PgStat_WalStats *pgstat_fetch_stat_wal(void); extern PgStat_SLRUStats *pgstat_fetch_slru(void); extern PgStat_StatReplSlotEntry *pgstat_fetch_replslot(NameData slotname); +extern PgStat_WaitAccumStats *pgstat_fetch_stat_waitaccum(void); extern void pgstat_count_slru_page_zeroed(int slru_idx); extern void pgstat_count_slru_page_hit(int slru_idx); @@ -1117,4 +1175,6 @@ extern void pgstat_count_slru_truncate(int slru_idx); extern const char *pgstat_slru_name(int slru_idx); extern int pgstat_slru_index(const char *name); +extern PgStat_WaitAccumEntry *pgstat_get_wa_entry(WAHash *hash, uint32 key); + #endif /* PGSTAT_H */ diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index a8f052e484..e6638b2c45 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -154,6 +154,7 @@ extern LWLockPadded *GetNamedLWLockTranche(const char *tranche_name); * registration in the main shared memory segment wouldn't work for that case. */ extern int LWLockNewTrancheId(void); +extern int LWLockGetLastTrancheId(void); extern void LWLockRegisterTranche(int tranche_id, const char *tranche_name); extern void LWLockInitialize(LWLock *lock, int tranche_id); diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index be67d8a861..db09b3b64a 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -21,6 +21,7 @@ #include "storage/lock.h" #include "storage/pg_sema.h" #include "storage/proclist_types.h" +#include "portability/instr_time.h" /* * Each backend advertises up to PGPROC_MAX_CACHED_SUBXIDS TransactionIds diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index 6c6ec2e711..caa0db7f79 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -50,6 +50,8 @@ typedef enum WAIT_EVENT_WAL_WRITER_MAIN } WaitEventActivity; +#define PG_WAIT_ACTIVITY_LAST_TYPE WAIT_EVENT_WAL_WRITER_MAIN + /* ---------- * Wait Events - Client * @@ -70,6 +72,8 @@ typedef enum WAIT_EVENT_WAL_SENDER_WRITE_DATA, } WaitEventClient; +#define PG_WAIT_CLIENT_LAST_TYPE WAIT_EVENT_WAL_SENDER_WRITE_DATA + /* ---------- * Wait Events - IPC * @@ -128,6 +132,8 @@ typedef enum WAIT_EVENT_XACT_GROUP_UPDATE } WaitEventIPC; +#define PG_WAIT_IPC_LAST_TYPE WAIT_EVENT_XACT_GROUP_UPDATE + /* ---------- * Wait Events - Timeout * @@ -143,6 +149,8 @@ typedef enum WAIT_EVENT_VACUUM_DELAY } WaitEventTimeout; +#define PG_WAIT_TIMEOUT_LAST_TYPE WAIT_EVENT_VACUUM_DELAY + /* ---------- * Wait Events - IO * @@ -227,58 +235,15 @@ typedef enum WAIT_EVENT_LOGICAL_SUBXACT_WRITE } WaitEventIO; +#define PG_WAIT_IO_LAST_TYPE WAIT_EVENT_LOGICAL_SUBXACT_WRITE extern const char *pgstat_get_wait_event(uint32 wait_event_info); extern const char *pgstat_get_wait_event_type(uint32 wait_event_info); -static inline void pgstat_report_wait_start(uint32 wait_event_info); -static inline void pgstat_report_wait_end(void); +void pgstat_report_wait_start(uint32 wait_event_info); +void pgstat_report_wait_end(void); extern void pgstat_set_wait_event_storage(uint32 *wait_event_info); extern void pgstat_reset_wait_event_storage(void); extern PGDLLIMPORT uint32 *my_wait_event_info; - -/* ---------- - * pgstat_report_wait_start() - - * - * Called from places where server process needs to wait. This is called - * to report wait event information. The wait information is stored - * as 4-bytes where first byte represents the wait event class (type of - * wait, for different types of wait, refer WaitClass) and the next - * 3-bytes represent the actual wait event. Currently 2-bytes are used - * for wait event which is sufficient for current usage, 1-byte is - * reserved for future usage. - * - * Historically we used to make this reporting conditional on - * pgstat_track_activities, but the check for that seems to add more cost - * than it saves. - * - * my_wait_event_info initially points to local memory, making it safe to - * call this before MyProc has been initialized. - * ---------- - */ -static inline void -pgstat_report_wait_start(uint32 wait_event_info) -{ - /* - * Since this is a four-byte field which is always read and written as - * four-bytes, updates are atomic. - */ - *(volatile uint32 *) my_wait_event_info = wait_event_info; -} - -/* ---------- - * pgstat_report_wait_end() - - * - * Called to report end of a wait. - * ---------- - */ -static inline void -pgstat_report_wait_end(void) -{ - /* see pgstat_report_wait_start() */ - *(volatile uint32 *) my_wait_event_info = 0; -} - - #endif /* WAIT_EVENT_H */ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index e5ab11275d..4fbcefd46f 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2183,6 +2183,11 @@ pg_stat_wal| SELECT w.wal_records, w.wal_sync_time, w.stats_reset FROM pg_stat_get_wal() w(wal_records, wal_fpi, wal_bytes, wal_buffers_full, wal_write, wal_sync, wal_write_time, wal_sync_time, stats_reset); +pg_stat_waitaccum| SELECT s.wait_event_type, + s.wait_event, + s.calls, + s.times + FROM pg_stat_get_waitaccum(NULL::integer) s(wait_event_type, wait_event, calls, times); pg_stat_wal_receiver| SELECT s.pid, s.status, s.receive_start_lsn, -- 2.20.1
>From ddb1adc5cd9acc9bc9de16d0cf057124b09fe1e3 Mon Sep 17 00:00:00 2001 From: Jehan-Guillaume de Rorthais <j...@dalibo.com> Date: Fri, 4 Jun 2021 18:14:51 +0200 Subject: [PATCH 2/2] [POC] Change measuring method of wait event time from INSTR_TIME to rdtsc. This patch changes measuring method of wait event time from INSTR_TIME (which uses gettimeofday or clock_gettime) to rdtsc. This might reduce the overhead of measuring overhead. Any supports like changing clock cycle to actual time or error handling are not currently implemented. --- src/backend/postmaster/pgstat.c | 8 ++++---- src/backend/utils/activity/wait_event.c | 10 +++++----- src/backend/utils/adt/pgstatfuncs.c | 2 +- src/include/pgstat.h | 4 ++-- src/include/portability/instr_time.h | 21 +++++++++++++++++++++ 5 files changed, 33 insertions(+), 12 deletions(-) diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index b4a84a2f62..e928239a29 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -180,7 +180,7 @@ static bool pgStatRunningInCollector = false; WAHash *wa_hash; -instr_time waitStart; +uint64 waitStart; /* * Structures in which backends store per-table info that's waiting to be @@ -3332,7 +3332,7 @@ pgstat_send_waitaccum() /* Clear wait events information. */ entry->calls = 0; - INSTR_TIME_SET_ZERO(entry->times); + entry->times = 0; } if (msg.m_nentries > 0) @@ -5330,7 +5330,7 @@ pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len) entry = hash->entries[i].entry; entry->calls = 0; - INSTR_TIME_SET_ZERO(entry->times); + entry->times = 0; } } @@ -5688,7 +5688,7 @@ pgstat_recv_waitaccum(PgStat_MsgWaitAccum *msg, int len) * Otherwise add the values to the existing entry. */ entry->calls += m_entry->calls; - INSTR_TIME_ADD(entry->times, m_entry->times); + entry->times += m_entry->times; } } } diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index 392a783fae..6d47eb0028 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -744,7 +744,7 @@ pgstat_report_waitaccum_start() if (pgstat_track_wait_timing) { - INSTR_TIME_SET_CURRENT(waitStart); + waitStart = rdtsc(); } } @@ -752,15 +752,15 @@ static inline void pgstat_report_waitaccum_end() { PgStat_WaitAccumEntry *entry; - instr_time diff; + uint64 diff = 0; if (wa_hash == NULL) return; if (pgstat_track_wait_timing) { - INSTR_TIME_SET_CURRENT(diff); - INSTR_TIME_SUBTRACT(diff, waitStart); + diff = rdtsc(); + diff -= waitStart; } entry = pgstat_get_wa_entry(wa_hash, *my_wait_event_info); @@ -773,7 +773,7 @@ pgstat_report_waitaccum_end() entry->calls++; if (pgstat_track_wait_timing) { - INSTR_TIME_ADD(entry->times, diff); + entry->times += diff; } } diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 56afd20f22..6faedd4938 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2450,7 +2450,7 @@ pg_stat_get_waitaccum(PG_FUNCTION_ARGS) values[2] = Int64GetDatum(entry->calls); - values[3] = UInt64GetDatum(INSTR_TIME_GET_MICROSEC(entry->times)); + values[3] = UInt64GetDatum(entry->times); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 5564907df8..9a9ac8e16d 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -549,7 +549,7 @@ typedef struct PgStat_WaitAccumEntry { uint32 wait_event_info; PgStat_Counter calls; - instr_time times; + uint64 times; } PgStat_WaitAccumEntry; typedef struct WAEntry @@ -978,7 +978,7 @@ typedef struct PgStat_FunctionCallUsage instr_time f_start; } PgStat_FunctionCallUsage; -extern instr_time waitStart; +extern uint64 waitStart; extern WAHash *wa_hash; /* ---------- diff --git a/src/include/portability/instr_time.h b/src/include/portability/instr_time.h index 39a4f0600e..507a1ca44d 100644 --- a/src/include/portability/instr_time.h +++ b/src/include/portability/instr_time.h @@ -57,6 +57,10 @@ #ifndef WIN32 +#if defined(__x86_64__) || defined(__i386__) +#include <x86intrin.h> +#endif + #ifdef HAVE_CLOCK_GETTIME /* Use clock_gettime() */ @@ -209,6 +213,8 @@ typedef struct timeval instr_time; #else /* WIN32 */ +#include <intrin.h> + /* Use QueryPerformanceCounter() */ typedef LARGE_INTEGER instr_time; @@ -254,3 +260,18 @@ GetTimerFrequency(void) (INSTR_TIME_IS_ZERO(t) ? INSTR_TIME_SET_CURRENT(t), true : false) #endif /* INSTR_TIME_H */ + +#ifndef RDTSC_H_ +#define RDTSC_H_ + +static inline uint64 rdtsc() { + uint64 result; +#if defined(__x86_64__) || defined(__i386__) || defined(WIN32) + result = __rdtsc(); +#else + result = 0; +#endif + return result; +} + +#endif -- 2.20.1
/* * fe-time.c * * Connects to database postgres and expects some pgbench database to be there. * * To create/populate one, 100,000 is adviced. Try eg.: * * pgbench -i -s 1 * * Compile with: * * gcc -lpq -I /pg/includes/path -o fe-time fe-time.c * * Run with: * * time ./fe-time * * The optional parameter is the time to wait in ms between every batch of * 1000 rows: * * time ./fe-time 100 */ #include <stdio.h> #include <stdlib.h> #include <time.h> #include <errno.h> #include "libpq-fe.h" #define DEFAULT_SLEEP 100000000 static void exit_nicely(PGconn *conn, int rc) { /* close & cleanup */ fprintf(stderr, "\n"); PQfinish(conn); exit(rc); } int main(int argc, char **argv) { PGconn *conn; PGresult *res; int status, i = 0; struct timespec delay = {0, DEFAULT_SLEEP}; errno = 0; if (argc > 1) { long nano = strtol(argv[1], NULL, 10); if (errno == 0) delay.tv_nsec = nano*1000000; } fprintf(stderr, "delay set to %dms per thousands\n", delay.tv_nsec/1000000); conn = PQconnectdb(""); /* async query a large table */ status = PQsendQuery(conn, "SELECT * FROM pgbench_accounts"); if (status != 1) { fprintf(stderr, "Async query failed: %s", PQerrorMessage(conn)); exit_nicely(conn, 1); } /* set single row mode */ status = PQsetSingleRowMode(conn); if (status != 1) { fprintf(stderr, "PQsetSingleRowMode failed: %s", PQerrorMessage(conn)); exit_nicely(conn, 1); } /* get result one row at a time and sleep every 1000 rows */ while (res = PQgetResult(conn)) { if (res == NULL) break; /* query ended */ i++; if (PQresultStatus(res) == PGRES_SINGLE_TUPLE && i%1000 == 0) { /* fake some heavy work */ nanosleep((const struct timespec *)&delay, NULL); fprintf(stderr, "processed %d rows \r", i); } PQclear(res); } exit_nicely(conn, 0); return 0; }