Hi all I've attached a PoC patch demonstrating how to use the linux ioctls SIOCINQ and SIOCOUTQ and its getsockopt option TCP_INFO to expose a lot of useful network socket info directly in system views.
Sample output from pg_stat_replication, \x format, trimmed, from a test run where I deliberately stopped the walreceiver (SIGSTOP) to cause the send buffer to fill up: pid | 391966 ... client_addr | 127.0.0.1 ... sync_state | async reply_time | 2020-09-30 16:13:59.179852+08 ... sock_tx_bufsz | 4194304 sock_tx_bufcontentsz | 4124021 sock_rx_bufsz | 3537513 sock_rx_bufcontentsz | 0 sock_tx_windowsz | 6912 ... sock_rtt | 13655 sock_rtt_variance | 19783 sock_recv_rtt | 204761 sock_packets_lost | 0 sock_packets_retransmitted | 0 The kernel can tell us even more than this if we use linux/tcp.h instead of netinet/tcp.h - the full-size struct tcp_info can report things like the delivery rate, how long we spent waiting for receive window space, send buffer space, totals sent/received, etc. So we can see, right there in Pg views, whether a walsender / walreceiver / logical worker is limited by the remote side, local side, or the connection itself, and some strong indications as to why. I think we're pretty clearly going to want this in some form. Especially given the increasing adoption of sealed systems where you can't expect the user to track down which socket is associated which which pg worker then run some `ss` commands themselves. However, the demo implementation isn't especially pretty. It bloats struct WalSnd, and it grabs the socket info frequently whether or not it is required. I can't see that being acceptable for real world use. I'm aiming to clean this up into something submittable, and something we can apply to the walreceiver and logical receiver workers too. That will require some design changes since I can't see anyone being happy about bloating shmem with these big stats structs or collecting them constantly just in case. I'd welcome design input. Especially because this seems like a case of a more general problem - how can we ask backends to self-report various diagnostic state or instrument their internals efficiently, and only when requested, without having to rely on the logfile? The obvious choice would seem to be to use shm_mq to make requests and send replies, but there are a few issues with that: * A shm_mq requires a lot of space. sizeof(struct shm_mq) = 56 on my x64 system. That's not easy to justify adding to struct WalSnd, struct WalRcv, struct LogicalRepWorker, etc. * A shm_mq isn't well suited to a listen/respond model where the queue is attached to, used to exchange a couple of messages, detached, reset, and ready for re-use by the next request. From experience I know it's not particularly easy to get that approach right. I'm thinking some kind of request/response interface will be needed. Something like setting a shmem variable to a dsa_pointer with storage for the requested stats and the latch of the requesting process to notify when it's stored. That way when stats aren't being collected no shmem is wasted and no syscalls are being made to collect data that's thrown away or overwritten. The downside is that stats couldn't be shown in pg_stat_replication etc. There would need to be a separate function. And sometimes a backend could take a while to re-enter its mainloop or other locations where it checks for stats requests, so it might take a while to get results. But that way we could easily run a bgworker that periodically requests stats and writes them to an unlogged relation for inspection. Or have monitoring tools poll it, that sort of thing. -- Craig Ringer http://www.2ndQuadrant.com/ 2ndQuadrant - PostgreSQL Solutions for the Enterprise
From ddfba877e3c386598e3790b674136717fc0b0321 Mon Sep 17 00:00:00 2001 From: Craig Ringer <craig.rin...@2ndquadrant.com> Date: Tue, 29 Sep 2020 11:43:02 +0800 Subject: [PATCH 1/2] Add libpq_be function to report bytes pending send in internal buffer We have pq_is_send_pending() to report whether there's anything waiting in the libpq send buffer, but not how much. For stats reporting, add pq_nbytes_pending() so we can see how much is waiting to send/receive. The send buffer is the main one of interest, as in general the receive buffer is small and transient, but it can still matter for some workloads like logical receivers. --- src/backend/libpq/pqcomm.c | 35 ++++++++++++++++++++++++++++++++++- src/backend/libpq/pqmq.c | 3 ++- src/include/libpq/libpq.h | 27 +++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index ac986c0505..f7daaebe6c 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -160,6 +160,7 @@ static int socket_putmessage(char msgtype, const char *s, size_t len); static void socket_putmessage_noblock(char msgtype, const char *s, size_t len); static void socket_startcopyout(void); static void socket_endcopyout(bool errorAbort); +static int socket_nbytes_pending(size_t * rxsz, size_t * txsz); static int internal_putbytes(const char *s, size_t len); static int internal_flush(void); @@ -176,7 +177,8 @@ static const PQcommMethods PqCommSocketMethods = { socket_putmessage, socket_putmessage_noblock, socket_startcopyout, - socket_endcopyout + socket_endcopyout, + socket_nbytes_pending }; const PQcommMethods *PqCommMethods = &PqCommSocketMethods; @@ -1502,6 +1504,9 @@ socket_flush_if_writable(void) /* -------------------------------- * socket_is_send_pending - is there any pending data in the output buffer? + * + * This doesn't report data pending in operating system socket buffers, only in + * the in-process libpq send buffer. * -------------------------------- */ static bool @@ -1510,6 +1515,34 @@ socket_is_send_pending(void) return (PqSendStart < PqSendPointer); } +/* -------------------------------- + * pq_pending_size - report internal libpq buffer content sizes + * + * Reports size of data waiting to be sent to the OS in the internal libpq send + * buffer, if any. Does not include operating system socket buffering. Also + * reports libpq receive buffer content size, i.e. data read from OS socket but + * not yet consumed by application. + * + * Use socket_is_send_pending() if you just need to determine if there's data + * waiting to send. + * + * rxsz or txsz may be null if that data isn't required. + * + * Never fails on TCP sockets. The int return code is so that the wrapper + * function pq_nbytes_pending can return STATUS_ERROR for other socket types + * that don't support this. + */ +static int +socket_nbytes_pending(size_t * rxsz, size_t * txsz) +{ + if (txsz) + *txsz = (PqSendStart < PqSendPointer) ? (PqSendPointer - PqSendStart) : 0; + if (rxsz) + *rxsz = PqRecvLength - PqRecvPointer; + + return STATUS_OK; +} + /* -------------------------------- * Message-level I/O routines begin here. * diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index f51d935daf..e4d6b7c779 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -44,7 +44,8 @@ static const PQcommMethods PqCommMqMethods = { mq_putmessage, mq_putmessage_noblock, mq_startcopyout, - mq_endcopyout + mq_endcopyout, + NULL /* TODO nbytes_pending */ }; /* diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index b1152475ac..350febbfac 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -31,6 +31,7 @@ typedef struct void (*putmessage_noblock) (char msgtype, const char *s, size_t len); void (*startcopyout) (void); void (*endcopyout) (bool errorAbort); + int (*nbytes_pending) (size_t *rxsz, size_t *txsz); } PQcommMethods; extern const PGDLLIMPORT PQcommMethods *PqCommMethods; @@ -45,6 +46,32 @@ extern const PGDLLIMPORT PQcommMethods *PqCommMethods; (PqCommMethods->putmessage_noblock(msgtype, s, len)) #define pq_startcopyout() (PqCommMethods->startcopyout()) #define pq_endcopyout(errorAbort) (PqCommMethods->endcopyout(errorAbort)) +#define pq_nbytes_pending(rxsz, txsz) ( PqCommMethods->nbytes_pending ? PqCommMethods->nbytes_pending((rxsz), (txsz)) : STATUS_ERROR ) + +/* + * Socket statistics requests. + * + * TODO: Probably needs to become a VLA where we request the fields we want by + * supplying them tagged, and the callee fills in the values. That's very DSA + * friendly too. + */ +#define PQ_SOCKSTATS_TX_BUFSZ (1<<0) +#define PQ_SOCKSTATS_RX_BUFSZ (1<<1) +#define PQ_SOCKSTATS_TX_BUFCONTENTSZ (1<<2) +#define PQ_SOCKSTATS_RX_BUFCONTENTSZ (1<<3) +#define PQ_SOCKSTATS_TX_WINDOWSZ (1<<4) +#define PQ_SOCKSTATS_RX_WINDOWSZ (1<<5) + +typedef struct PQsocketStats +{ + int32 set_fields; + size_t sock_tx_bufsz; /* SO_SNDBUF */ + size_t sock_tx_bufcontentsz; /* SIOCOUTQ */ + size_t sock_rx_bufsz; /* SO_RCVBUF */ + size_t sock_rx_bufcontentsz; /* SIOCINQ */ + size_t sock_tx_windowsz; /* scaled struct tcp_info.tcpi_snd_cwnd */ + size_t sock_rx_windowsz; /* struct tcp_info.tcpi_snd_wnd */ +} PQsocketStats; /* * External functions. -- 2.26.2
From 4b59ff717a24a0a816f0c249796814303deade84 Mon Sep 17 00:00:00 2001 From: Craig Ringer <craig.rin...@2ndquadrant.com> Date: Tue, 29 Sep 2020 14:41:44 +0800 Subject: [PATCH 2/2] PoC to expose TCP_INFO for walsender socket in pg_stat_replication --- src/backend/catalog/system_views.sql | 4 +- src/backend/libpq/pqcomm.c | 215 +++++++++++++++++++- src/backend/libpq/pqmq.c | 3 +- src/backend/replication/walsender.c | 23 ++- src/backend/utils/probes.d | 4 + src/include/catalog/pg_proc.dat | 12 +- src/include/libpq/libpq.h | 25 ++- src/include/replication/walsender_private.h | 9 + src/test/recovery/t/001_stream_rep.pl | 42 ++++ 9 files changed, 322 insertions(+), 15 deletions(-) diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index ed4f3f142d..28524b8d43 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -791,7 +791,9 @@ CREATE VIEW pg_stat_replication AS W.replay_lag, W.sync_priority, W.sync_state, - W.reply_time + W.reply_time, + W.sock_tx_bufsz, W.sock_tx_bufcontentsz, W.sock_rx_bufsz, W.sock_rx_bufcontentsz, W.sock_tx_windowsz, W.sock_rx_windowsz, + W.sock_rtt, W.sock_rtt_variance, W.sock_recv_rtt, W.sock_packets_lost, W.sock_packets_retransmitted FROM pg_stat_get_activity(NULL) AS S JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index f7daaebe6c..bef3c6c92f 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -57,6 +57,8 @@ * pq_flush - flush pending output * pq_flush_if_writable - flush pending output if writable without blocking * pq_getbyte_if_available - get a byte if available without blocking + * pq_getbufinfo - report libpq buffer state and underlying socket buffering + * state if available * * message-level I/O (and old-style-COPY-OUT cruft): * pq_putmessage - send a normal message (suppressed in COPY OUT mode) @@ -78,6 +80,11 @@ #include <sys/time.h> #include <netdb.h> #include <netinet/in.h> +#if defined(__linux__) +/* For getsockopt SIOCINQ and SIOCOUTQ and struct tcp_info */ +#include <linux/sockios.h> +#include <sys/ioctl.h> +#endif #ifdef HAVE_NETINET_TCP_H #include <netinet/tcp.h> #endif @@ -93,6 +100,7 @@ #include "storage/ipc.h" #include "utils/guc.h" #include "utils/memutils.h" +#include "utils/probes.h" /* * Cope with the various platform-specific ways to spell TCP keepalive socket @@ -161,6 +169,7 @@ static void socket_putmessage_noblock(char msgtype, const char *s, size_t len); static void socket_startcopyout(void); static void socket_endcopyout(bool errorAbort); static int socket_nbytes_pending(size_t * rxsz, size_t * txsz); +static int socket_buffer_stats(PQsocketStats* sockstats); static int internal_putbytes(const char *s, size_t len); static int internal_flush(void); @@ -178,7 +187,8 @@ static const PQcommMethods PqCommSocketMethods = { socket_putmessage_noblock, socket_startcopyout, socket_endcopyout, - socket_nbytes_pending + socket_nbytes_pending, + socket_buffer_stats }; const PQcommMethods *PqCommMethods = &PqCommSocketMethods; @@ -1543,6 +1553,209 @@ socket_nbytes_pending(size_t * rxsz, size_t * txsz) return STATUS_OK; } +/* + * Get data pending sending and/or receiving on the underlying operating system + * transport socket. + * + * Populates the passed PQsocketInfo*. + * + * The exact data returned depends on the operating system's support for + * querying the info. + * + * Returns 0 on success, STATUS_ERROR on failure of operations expected to + * succeed. Not all fields are guaranteed set on 0 return as support varies by + * OS and socket type. Check the set_fields flags to see what's valid in the + * result. + * + * On Linux this uses the platform-specific getsockopt options + * SIOCINQ, SIOCOUTQ and TCP_INFO. The kernel code at build/net/ipv4/tcp.c is + * the main reference for struct tcp_info; check the function tcp_get_info() + * and the fields of struct tcp_sock. + * + * TODO change argument into VLA where we populate only wanted segs. + */ +static int +socket_buffer_stats(PQsocketStats* ss) +{ + int res; + unsigned int ressz; + + if (MyProcPort == NULL) + return STATUS_ERROR; + + ressz = sizeof(res); + if (getsockopt(MyProcPort->sock, SOL_SOCKET, SO_RCVBUF,(void *)&res, &ressz) < 0) + { + elog(ERROR, "getsockopt(%s) failed: %m", "SO_RCVBUF"); + return STATUS_ERROR; + } + else + { + Assert(ressz = sizeof(res)); + ss->set_fields &= PQ_SOCKSTATS_RX_BUFSZ; + ss->sock_rx_bufsz = res; + } + + ressz = sizeof(res); + if (getsockopt(MyProcPort->sock, SOL_SOCKET, SO_SNDBUF,(void *)&res, &ressz) < 0) + { + elog(ERROR, "getsockopt(%s) failed: %m", "SO_SNDBUF"); + return STATUS_ERROR; + } + else + { + Assert(ressz = sizeof(res)); + ss->set_fields &= PQ_SOCKSTATS_TX_BUFSZ; + ss->sock_tx_bufsz = res; + } + +#ifdef __linux__ + /* + * Trying to read these will be treated as nonfatal for now. + */ + + /* data received by kernel but not app */ + if (ioctl(MyProcPort->sock, SIOCINQ, &res) != 0) + elog(ERROR, "ioctl(%s) failed: %m", "SIOCINQ"); + else + { + ss->set_fields &= PQ_SOCKSTATS_RX_BUFCONTENTSZ; + ss->sock_rx_bufcontentsz = res; + } + + /* waiting-to-send or sent-but-unacked data */ + if (ioctl(MyProcPort->sock, SIOCOUTQ, &res) != 0) + elog(ERROR, "ioctl(%s) failed: %m", "SIOCOUTQ"); + else + { + ss->set_fields &= PQ_SOCKSTATS_TX_BUFCONTENTSZ; + ss->sock_tx_bufcontentsz = res; + } + + /* detailed TCP socket info from TCP_INFO */ + { + /* + * We use the 'struct tcp_info' from netinet/tcp.h not the one from + * linux/tcp.h. It's more stable though it tends to lag behind the + * version in the kernel headers so some fields like tcpi_snd_wnd + * may not be available. + */ + struct tcp_info ti; + ressz = sizeof(ti); + + /* + * Guard against older kernel not setting all fields. + * + * TODO: Should check field validity with offsetof(...) against ressz + * too. + */ + memset(&ti, '\0', sizeof(ti)); + + if (getsockopt(MyProcPort->sock, SOL_TCP, TCP_INFO, (void*)&ti, &ressz) < 0) + elog(ERROR, "getsockopt(%s) failed: %m", "TCP_INFO"); + else + { + /* + * Let systemtap/perf/etc see captured info in full. + * + * Sample script: + * + * PATH=/path/to/your/postgres/bin:$PATH /usr/local/systemtap/bin/stap -DMAXSTRINGLEN=4096 -v -e 'probe process("postgres").mark("libpq__be__tcp__info") { printf("[%05d] %s\n", pid(), @cast($arg1, "struct tcp_info")$$ ); }' + * + * Samples of output: + * + * [368266] {.tcpi_state='\001', .tcpi_ca_state='\000', .tcpi_retransmits='\000', .tcpi_probes='\000', .tcpi_backoff='\000', .tcpi_options='\a', .tcpi_snd_wscale=7, .tcpi_rcv_wscale=7, .tcpi_rto=201000, .tcpi_ato=40000, .tcpi_snd_mss=65483, .tcpi_rcv_mss=536, .tcpi_unacked=0, .tcpi_sacked=0, .tcpi_lost=0, .tcpi_retrans=0, .tcpi_fackets=0, .tcpi_last_data_sent=0, .tcpi_last_ack_sent=0, .tcpi_last_data_recv=0, .tcpi_last_ack_recv=0, .tcpi_pmtu=65535, .tcpi_rcv_ssthresh=65483, .tcpi_rtt=65, .tcpi_rttvar=19, .tcpi_snd_ssthresh=33, .tcpi_snd_cwnd=18, .tcpi_advmss=65483, .tcpi_reordering=27, .tcpi_rcv_rtt=267941, .tcpi_rcv_space=87243, .tcpi_total_retrans=82} + * + * [368266] {.tcpi_state='\001', .tcpi_ca_state='\000', .tcpi_retransmits='\000', .tcpi_probes='\000', .tcpi_backoff='\000', .tcpi_options='\a', .tcpi_snd_wscale=7, .tcpi_rcv_wscale=7, .tcpi_rto=208000, .tcpi_ato=42000, .tcpi_snd_mss=65483, .tcpi_rcv_mss=536, .tcpi_unacked=1, .tcpi_sacked=0, .tcpi_lost=0, .tcpi_retrans=0, .tcpi_fackets=0, .tcpi_last_data_sent=0, .tcpi_last_ack_sent=0, .tcpi_last_data_recv=24, .tcpi_last_ack_recv=25, .tcpi_pmtu=65535, .tcpi_rcv_ssthresh=65483, .tcpi_rtt=7583, .tcpi_rttvar=3093, .tcpi_snd_ssthresh=33, .tcpi_snd_cwnd=18, .tcpi_advmss=65483, .tcpi_reordering=27, .tcpi_rcv_rtt=267941, .tcpi_rcv_space=87243, .tcpi_total_retrans=82} + * + * + */ + TRACE_POSTGRESQL_LIBPQ_BE_TCP_INFO(&ti, sizeof(ti)); + + /* + * Useful tcp_info mappings. + * + * getsockopt: SIOCINQ + * tcp.c: tp->rcv_nxt - tp->copied_seq + * Assuming no urgent-data on socket, data yet to be + * read from Rx buffer + * + * getsockopt: SIOCOUTQ + * TCP_INFO: (??) - ti.tcpi_unacked + * tcp.c: tp->write_seq - tp->snd_una + * Bytes written but not acked. Includes + * sent-but-not-acked. + * + * getsockopt: SIOCOUTQNSD + * TCP_INFO: ti.tcpi_notsent_bytes + * tcp.c: tp->write_seq - tp->snd_nxt + * Bytes Tx buffered but not sent. Does not include + * sent-but-not-acked. + * + * Other fields of interest in tcp_info: + * + * [QUEUE STATES] + * tcpi_unacked (total ) + * tcpi_rcv_space (receive-queue space?) + * tcpi_notsent_bytes (unsent bytes in tx queue) + * + * [TIMES AND LATENCIES] + * tcpi_last_data_sent + * tcpi_last_data_recv + * tcpi_last_ack_recv + * tcpi_rtt (round-trip time) + * tcpi_rttvar (round-trip time variance?) + * tcpi_rcv_rtt (rx estimated rtt) + * tcpi_delivery_rate + * + * [TOTALS AND COUNTERS] + * + * tcpi_lost + * tcpi_retrans + * tcpi_total_retrans + * tcpi_bytes_acked + * tcpi_delivered + * tcpi_bytes_sent + * tcpi_bytes_retrans + * tcpi_reord_seen + * + * [WAIT TRACKING] + * tcpi_busy_time + * tcpi_rwnd_limited + * tcpi_sndbuf_limited + * tcpi_delivery_rate_app_limited [bool] + */ + + /* Our TCP congestion window size, scaled. */ + ss->sock_tx_windowsz = ti.tcpi_snd_cwnd; + if ((ti.tcpi_options & TCPI_OPT_WSCALE) == TCPI_OPT_WSCALE) + ss->sock_tx_windowsz = ss->sock_tx_windowsz << ti.tcpi_snd_wscale; + + /* RTT info */ + ss->sock_rtt = ti.tcpi_rtt; + ss->sock_rtt_variance = ti.tcpi_rttvar; + ss->sock_recv_rtt = ti.tcpi_rcv_rtt; + ss->set_fields &= PQ_SOCKSTATS_TCP_INFO; + + /* Packet losses */ + ss->sock_packets_lost = ti.tcpi_lost; + ss->sock_packets_retransmitted = ti.tcpi_retrans; + +#if defined(HAVE_TCP_INFO_SND_WND) + /* + * If kernel reports it, store advertised receive + * window size from receiver. Value is pre-scaled. + */ + if (ressz >= offsetof(struct tcp_info, tcpi_snd_wnd) + sizeof(ti.tcpi_snd_wnd)) + ss->sock_rx_windowsz = ti.tcpi_snd_wnd; +#endif + } + } +#endif + + return STATUS_OK; +} + /* -------------------------------- * Message-level I/O routines begin here. * diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index e4d6b7c779..f86e034531 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -45,7 +45,8 @@ static const PQcommMethods PqCommMqMethods = { mq_putmessage_noblock, mq_startcopyout, mq_endcopyout, - NULL /* TODO nbytes_pending */ + NULL /* TODO nbytes_pending */, + NULL /* buffer_stats doesn't make sense for mq */ }; /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 7c9d1b67df..c607e0f882 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1334,6 +1334,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) WalSndShutdown(); + + /* Update our socket stats. TODO hack hack hack */ + pq_get_socket_stats(&MyWalSnd->socket_info); } /* reactivate latch so WalSndLoop knows to continue */ @@ -1477,6 +1480,9 @@ WalSndWaitForWal(XLogRecPtr loc) /* Send keepalive if the time has come */ WalSndKeepaliveIfNecessary(); + /* Update our socket stats. TODO hack hack hack */ + pq_get_socket_stats(&MyWalSnd->socket_info); + /* * Sleep until something happens or we time out. Also wait for the * socket becoming writable, if there's still pending output. @@ -2354,6 +2360,9 @@ WalSndLoop(WalSndSendDataCallback send_data) /* Send keepalive if the time has come */ WalSndKeepaliveIfNecessary(); + /* Update our socket stats. TODO hack hack hack */ + pq_get_socket_stats(&MyWalSnd->socket_info); + /* * Block if we have unsent data. XXX For logical replication, let * WalSndWaitForWal() handle any other blocking; idle receivers need @@ -3265,7 +3274,7 @@ offset_to_interval(TimeOffset offset) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 12 +#define PG_STAT_GET_WAL_SENDERS_COLS 23 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -3439,6 +3448,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) nulls[11] = true; else values[11] = TimestampTzGetDatum(replyTime); + + values[12] = Int64GetDatum(walsnd->socket_info.sock_tx_bufsz); + values[13] = Int64GetDatum(walsnd->socket_info.sock_tx_bufcontentsz); + values[14] = Int64GetDatum(walsnd->socket_info.sock_rx_bufsz); + values[15] = Int64GetDatum(walsnd->socket_info.sock_rx_bufcontentsz); + values[16] = Int64GetDatum(walsnd->socket_info.sock_tx_windowsz); + values[17] = Int64GetDatum(walsnd->socket_info.sock_rx_windowsz); + values[18] = Int64GetDatum(walsnd->socket_info.sock_rtt); + values[19] = Int64GetDatum(walsnd->socket_info.sock_rtt_variance); + values[20] = Int64GetDatum(walsnd->socket_info.sock_recv_rtt); + values[21] = Int64GetDatum(walsnd->socket_info.sock_packets_lost); + values[22] = Int64GetDatum(walsnd->socket_info.sock_packets_retransmitted); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d index a0b0458108..b5e17907c2 100644 --- a/src/backend/utils/probes.d +++ b/src/backend/utils/probes.d @@ -91,4 +91,8 @@ provider postgresql { probe wal__switch(); probe wal__buffer__write__dirty__start(); probe wal__buffer__write__dirty__done(); + + /* Observe raw getsockopt(..., TCP_INFO, ...) result in libpq_be */ + probe libpq__be__tcp__info(void * struct_tcp_info_p, int sizeof_struct_tcp_info); + }; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index f48f5fb4d9..68f9674874 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -37,6 +37,12 @@ # been the custom to insert new entries adjacent to related older entries. # Try to do one or the other though, don't just insert entries at random. +# +# Use the tool src/include/catalog/unused_oids to find free OIDs +# to use. Use src/include/catalog/duplicate_oids to check you didn't +# reuse any by mistake. +# + # OIDS 1 - 99 { oid => '1242', descr => 'I/O', @@ -5246,9 +5252,9 @@ proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => '', - proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}', + proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time,sock_tx_bufsz,sock_tx_bufcontentsz,sock_rx_bufsz,sock_rx_bufcontentsz,sock_tx_windowsz,sock_rx_windowsz,sock_rtt,sock_rtt_variance,sock_recv_rtt,sock_packets_lost,sock_packets_retransmitted}', prosrc => 'pg_stat_get_wal_senders' }, { oid => '3317', descr => 'statistics: information about WAL receiver', proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's', diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index 350febbfac..cf283620bd 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -21,6 +21,8 @@ #include "storage/latch.h" +struct PQsocketStats; + typedef struct { void (*comm_reset) (void); @@ -32,6 +34,7 @@ typedef struct void (*startcopyout) (void); void (*endcopyout) (bool errorAbort); int (*nbytes_pending) (size_t *rxsz, size_t *txsz); + int (*get_socket_stats) (struct PQsocketStats *ss); } PQcommMethods; extern const PGDLLIMPORT PQcommMethods *PqCommMethods; @@ -47,6 +50,7 @@ extern const PGDLLIMPORT PQcommMethods *PqCommMethods; #define pq_startcopyout() (PqCommMethods->startcopyout()) #define pq_endcopyout(errorAbort) (PqCommMethods->endcopyout(errorAbort)) #define pq_nbytes_pending(rxsz, txsz) ( PqCommMethods->nbytes_pending ? PqCommMethods->nbytes_pending((rxsz), (txsz)) : STATUS_ERROR ) +#define pq_get_socket_stats(ss) ( PqCommMethods->get_socket_stats ? PqCommMethods->get_socket_stats((ss)) : STATUS_ERROR ) /* * Socket statistics requests. @@ -59,18 +63,23 @@ extern const PGDLLIMPORT PQcommMethods *PqCommMethods; #define PQ_SOCKSTATS_RX_BUFSZ (1<<1) #define PQ_SOCKSTATS_TX_BUFCONTENTSZ (1<<2) #define PQ_SOCKSTATS_RX_BUFCONTENTSZ (1<<3) -#define PQ_SOCKSTATS_TX_WINDOWSZ (1<<4) -#define PQ_SOCKSTATS_RX_WINDOWSZ (1<<5) +/* Main fields from struct tcp_info */ +#define PQ_SOCKSTATS_TCP_INFO (1<<4) typedef struct PQsocketStats { int32 set_fields; - size_t sock_tx_bufsz; /* SO_SNDBUF */ - size_t sock_tx_bufcontentsz; /* SIOCOUTQ */ - size_t sock_rx_bufsz; /* SO_RCVBUF */ - size_t sock_rx_bufcontentsz; /* SIOCINQ */ - size_t sock_tx_windowsz; /* scaled struct tcp_info.tcpi_snd_cwnd */ - size_t sock_rx_windowsz; /* struct tcp_info.tcpi_snd_wnd */ + int32 sock_tx_bufsz; /* SO_SNDBUF */ + int32 sock_tx_bufcontentsz; /* SIOCOUTQ */ + int32 sock_rx_bufsz; /* SO_RCVBUF */ + int32 sock_rx_bufcontentsz; /* SIOCINQ */ + int32 sock_tx_windowsz; /* scaled struct tcp_info.tcpi_snd_cwnd */ + int32 sock_rx_windowsz; /* struct tcp_info.tcpi_snd_wnd */ + int32 sock_rtt; + int32 sock_rtt_variance; + int32 sock_recv_rtt; + int32 sock_packets_lost; + int32 sock_packets_retransmitted; } PQsocketStats; /* diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 509856c057..34ac7d57c5 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -19,6 +19,9 @@ #include "storage/shmem.h" #include "storage/spin.h" +/* TODO HACK for PQsocketStats */ +#include "libpq/libpq.h" + typedef enum WalSndState { WALSNDSTATE_STARTUP = 0, @@ -78,6 +81,12 @@ typedef struct WalSnd * Timestamp of the last message received from standby. */ TimestampTz replyTime; + + /* + * Giant blob of horror socket stats. TODO replace with request/reply + * mechanism. + */ + PQsocketStats socket_info; } WalSnd; extern WalSnd *MyWalSnd; diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl index 9e31a53de7..fd7d2d4601 100644 --- a/src/test/recovery/t/001_stream_rep.pl +++ b/src/test/recovery/t/001_stream_rep.pl @@ -5,6 +5,11 @@ use PostgresNode; use TestLib; use Test::More tests => 36; +# Tell PostgresNode to use TCP +$PostgresNode::use_tcp = 1; +$PostgresNode::test_pghost = $PostgresNode::test_localhost; +$ENV{PGHOST} = $PostgresNode::test_pghost; + # Initialize primary node my $node_primary = get_new_node('primary'); # A specific role is created to perform some tests related to replication, @@ -65,6 +70,8 @@ is($node_standby_1->psql('postgres', 'INSERT INTO tab_int VALUES (1)'), is($node_standby_2->psql('postgres', 'INSERT INTO tab_int VALUES (1)'), 3, 'read-only queries on standby 2'); +diag $node_primary->safe_psql('postgres', 'SELECT * FROM pg_stat_replication;'); + # Tests for connection parameter target_session_attrs note "testing connection parameter \"target_session_attrs\""; @@ -409,3 +416,38 @@ ok( ($phys_restart_lsn_pre cmp $phys_restart_lsn_post) == 0, my $primary_data = $node_primary->data_dir; ok(!-f "$primary_data/pg_wal/$segment_removed", "WAL segment $segment_removed recycled after physical slot advancing"); + +# Slam some load onto the primary and see what happens +my ($scale, $runtime, $clients, $jobs) = (100, 60, 4, 4); +diag "initing pgbench"; +IPC::Run::run(['pgbench', '-i', '-s', $scale, $node_primary->connstr('postgres')]); +diag "INIT: " . $node_primary->safe_psql('postgres', 'SELECT * FROM pg_stat_replication;'); +diag "done initing"; +my $bench = IPC::Run::start(['pgbench', '-s', $scale, '-T', $runtime, '-c', $clients, '-j', $jobs, '-P', 1, $node_primary->connstr('postgres')]); +for my $t (0 .. $runtime) +{ + diag " $runtime: " . $node_primary->safe_psql('postgres', 'SELECT * FROM pg_stat_replication;'); + sleep(1); + + if ($t == 10) + { + diag "--- pausing walreceiver ---"; + my $rxpid = $node_standby_2->safe_psql('postgres', 'SELECT pid FROM pg_stat_wal_receiver'); + kill 'STOP', $rxpid; + my $lag = $node_primary->safe_psql('postgres', 'SELECT pg_wal_lsn_diff(pg_current_wal_insert_lsn(), sent_lsn) FROM pg_stat_replication'); + diag "\npaused walreceiver $rxpid at lag $lag\n"; + } + + if ($t == 40) + { + my $lag = $node_primary->safe_psql('postgres', 'SELECT pg_wal_lsn_diff(pg_current_wal_insert_lsn(), sent_lsn) FROM pg_stat_replication'); + diag "--- resuming walreceiver with lag $lag---"; + my $rxpid = $node_standby_2->safe_psql('postgres', 'SELECT pid FROM pg_stat_wal_receiver'); + kill 'CONT', $rxpid; + diag "\nresumed walreceiver $rxpid\n"; + } +} +$bench->signal('INT'); +$bench->finish; +diag "pgbench output: \"$stdout\" and \"$stderr\""; +diag "ecode" . $bench->full_result(0); -- 2.26.2