This is something I hacked together on the way back from pgconf.eu. It's highly experimental.

The idea is to do the equivalent of pg_wal_replay_wait() on the protocol level, so that it is ideally fully transparent to the application code. The application just issues queries, and they might be serviced by a primary or a standby, but there is always a correct ordering of reads after writes.

Additionally, I'm exploring whether this is an idea for a protocol extension that might be a bit more complex than, say, longer cancel keys, something we could have a discussion around protocol versioning around.

The patch adds a protocol extension called _pq_.wait_for_lsn as well as a libpq connection option wait_for_lsn to activate the same. (Use e.g., psql -d 'wait_for_lsn=1'.)

With this protocol extension, two things are changed:

- The ReadyForQuery message sends back the current LSN.

- The Query message sends an LSN to wait for. (This doesn't handle the extended query protocol yet.)

To make any real use of this, you'd need some middleware, like a hacked pgbouncer, that transparently redirects queries among primaries and standbys, which doesn't exist yet. But if it did, I imagine it could be pretty useful.

There might be other ways to slice this. Instead of using a hypothetical middleware, the application would use two connections, one for writing, one for reading, and the LSN would be communicated between the two. I imagine in this case, at least the one half of the protocol, shipping the current LSN with ReadyForQuery, could be useful, instead of requiring application code to issue pg_current_wal_insert_lsn() explicitly.

Thoughts?
From 44b6354429847e3b3aeac21ee5712879b97d7877 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Sat, 26 Oct 2024 01:11:57 +0200
Subject: [PATCH v0] wait_for_lsn protocol option

---
 src/backend/tcop/backend_startup.c  | 15 +++++++++++++--
 src/backend/tcop/dest.c             | 12 ++++++++++++
 src/backend/tcop/postgres.c         | 23 +++++++++++++++++++++++
 src/include/libpq/libpq-be.h        |  1 +
 src/interfaces/libpq/fe-connect.c   | 26 ++++++++++++++++++++++++++
 src/interfaces/libpq/fe-exec.c      |  1 +
 src/interfaces/libpq/fe-protocol3.c | 20 ++++++++++++++++++++
 src/interfaces/libpq/fe-trace.c     |  2 ++
 src/interfaces/libpq/libpq-int.h    |  3 +++
 9 files changed, 101 insertions(+), 2 deletions(-)

diff --git a/src/backend/tcop/backend_startup.c 
b/src/backend/tcop/backend_startup.c
index 2a96c81f925..bd3b91d01eb 100644
--- a/src/backend/tcop/backend_startup.c
+++ b/src/backend/tcop/backend_startup.c
@@ -768,12 +768,23 @@ ProcessStartupPacket(Port *port, bool ssl_done, bool 
gss_done)
                                                                        valptr),
                                                         errhint("Valid values 
are: \"false\", 0, \"true\", 1, \"database\".")));
                        }
+                       else if (strcmp(nameptr, "_pq_.wait_for_lsn") == 0)
+                       {
+                               if (strcmp(valptr, "1") == 0)
+                                       port->wait_for_lsn_enabled = true;
+                               else
+                                       ereport(FATAL,
+                                                       
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                        errmsg("invalid value 
for parameter \"%s\": \"%s\"",
+                                                                       
"wait_for_lsn",
+                                                                       valptr),
+                                                        errhint("Valid values 
are: 1.")));
+                       }
                        else if (strncmp(nameptr, "_pq_.", 5) == 0)
                        {
                                /*
                                 * Any option beginning with _pq_. is reserved 
for use as a
-                                * protocol-level option, but at present no 
such options are
-                                * defined.
+                                * protocol-level option.
                                 */
                                unrecognized_protocol_options =
                                        lappend(unrecognized_protocol_options, 
pstrdup(nameptr));
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 96f80b30463..bb9910b12d5 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -31,6 +31,8 @@
 #include "access/printsimple.h"
 #include "access/printtup.h"
 #include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
 #include "commands/copy.h"
 #include "commands/createas.h"
 #include "commands/explain.h"
@@ -40,6 +42,7 @@
 #include "executor/tstoreReceiver.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
+#include "miscadmin.h"
 
 
 /* ----------------
@@ -265,6 +268,15 @@ ReadyForQuery(CommandDest dest)
 
                                pq_beginmessage(&buf, PqMsg_ReadyForQuery);
                                pq_sendbyte(&buf, TransactionBlockStatusCode());
+                               if (MyProcPort->wait_for_lsn_enabled)
+                               {
+                                       char            xloc[MAXFNAMELEN];
+                                       XLogRecPtr      logptr;
+
+                                       logptr = GetXLogWriteRecPtr();
+                                       snprintf(xloc, sizeof(xloc), "%X/%X", 
LSN_FORMAT_ARGS(logptr));
+                                       pq_sendstring(&buf, xloc);
+                               }
                                pq_endmessage(&buf);
                        }
                        /* Flush output at end of cycle in any case. */
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 7f5eada9d45..aee6fec1fc4 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -38,6 +38,7 @@
 #include "commands/async.h"
 #include "commands/event_trigger.h"
 #include "commands/prepare.h"
+#include "commands/waitlsn.h"
 #include "common/pg_prng.h"
 #include "jit/jit.h"
 #include "libpq/libpq.h"
@@ -75,6 +76,7 @@
 #include "utils/injection_point.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 #include "utils/ps_status.h"
 #include "utils/snapmgr.h"
 #include "utils/timeout.h"
@@ -4782,6 +4784,27 @@ PostgresMain(const char *dbname, const char *username)
                                        SetCurrentStatementStartTimestamp();
 
                                        query_string = 
pq_getmsgstring(&input_message);
+                                       if (MyProcPort && 
MyProcPort->wait_for_lsn_enabled)
+                                       {
+                                               const char *wait_for_lsn = 
pq_getmsgstring(&input_message);
+                                               XLogRecPtr      lsn;
+                                               bool            error;
+
+                                               lsn = 
pg_lsn_in_internal(wait_for_lsn, &error);
+                                               if (error)
+                                                       ereport(ERROR,
+                                                                       
(errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                                        
errmsg("invalid LSN %s", wait_for_lsn)));
+                                               if (RecoveryInProgress())
+                                                       WaitForLSNReplay(lsn, 
0);
+                                               else
+                                               {
+                                                       if 
(GetXLogWriteRecPtr() != lsn)
+                                                               ereport(ERROR,
+                                                                               
(errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                                               
 errmsg("LSN mismatch")));
+                                               }
+                                       }
                                        pq_getmsgend(&input_message);
 
                                        if (am_walsender)
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 05cb1874c58..0f23a969231 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -150,6 +150,7 @@ typedef struct Port
         */
        char       *database_name;
        char       *user_name;
+       bool            wait_for_lsn_enabled;
        char       *cmdline_options;
        List       *guc_options;
 
diff --git a/src/interfaces/libpq/fe-connect.c 
b/src/interfaces/libpq/fe-connect.c
index 64787bea511..aa56ca42b2c 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -360,6 +360,10 @@ static const internalPQconninfoOption PQconninfoOptions[] 
= {
                "Target-Session-Attrs", "", 15, /* sizeof("prefer-standby") = 
15 */
        offsetof(struct pg_conn, target_session_attrs)},
 
+       {"wait_for_lsn", "PGWAITFORLSN", "0", NULL,
+               "Wait-For-LSN", "", 1,
+       offsetof(struct pg_conn, wait_for_lsn_setting)},
+
        {"load_balance_hosts", "PGLOADBALANCEHOSTS",
                DefaultLoadBalanceHosts, NULL,
                "Load-Balance-Hosts", "", 8,    /* sizeof("disable") = 8 */
@@ -1847,6 +1851,28 @@ pqConnectOptions2(PGconn *conn)
                        goto oom_error;
        }
 
+       /*
+        * validate wait_for_lsn option
+        */
+       if (conn->wait_for_lsn_setting)
+       {
+               if (strcmp(conn->wait_for_lsn_setting, "on") == 0 ||
+                       strcmp(conn->wait_for_lsn_setting, "true") == 0 ||
+                       strcmp(conn->wait_for_lsn_setting, "1") == 0)
+                       conn->wait_for_lsn_enabled = true;
+               else if (strcmp(conn->wait_for_lsn_setting, "off") == 0 ||
+                                strcmp(conn->wait_for_lsn_setting, "false") == 
0 ||
+                                strcmp(conn->wait_for_lsn_setting, "0") == 0)
+                       conn->wait_for_lsn_enabled = false;
+               else
+               {
+                       conn->status = CONNECTION_BAD;
+                       libpq_append_conn_error(conn, "invalid %s value: 
\"%s\"",
+                                                                       
"wait_for_lsn", conn->wait_for_lsn_setting);
+                       return false;
+               }
+       }
+
        /*
         * Only if we get this far is it appropriate to try to connect. (We 
need a
         * state flag, rather than just the boolean result of this function, in
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 0d224a8524e..8d2d183476b 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1454,6 +1454,7 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool 
newQuery)
        /* construct the outgoing Query message */
        if (pqPutMsgStart(PqMsg_Query, conn) < 0 ||
                pqPuts(query, conn) < 0 ||
+               (conn->wait_for_lsn_enabled && pqPuts(conn->last_lsn, conn)) ||
                pqPutMsgEnd(conn) < 0)
        {
                /* error message should be set up already */
diff --git a/src/interfaces/libpq/fe-protocol3.c 
b/src/interfaces/libpq/fe-protocol3.c
index 8c5ac1729f0..689c869ce56 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -1625,6 +1625,23 @@ getReadyForQuery(PGconn *conn)
                        break;
        }
 
+       if (conn->wait_for_lsn_enabled)
+       {
+               PQExpBufferData buf;
+
+               initPQExpBuffer(&buf);
+               if (pqGets(&buf, conn))
+               {
+                       termPQExpBuffer(&buf);
+                       return EOF;
+               }
+               else
+               {
+                       strlcpy(conn->last_lsn, buf.data, sizeof 
conn->last_lsn);
+                       termPQExpBuffer(&buf);
+               }
+       }
+
        return 0;
 }
 
@@ -2298,6 +2315,9 @@ build_startup_packet(const PGconn *conn, char *packet,
        if (conn->client_encoding_initial && conn->client_encoding_initial[0])
                ADD_STARTUP_OPTION("client_encoding", 
conn->client_encoding_initial);
 
+       if (conn->wait_for_lsn_enabled)
+               ADD_STARTUP_OPTION("_pq_.wait_for_lsn", "1");
+
        /* Add any environment-driven GUC settings needed */
        for (next_eo = options; next_eo->envName; next_eo++)
        {
diff --git a/src/interfaces/libpq/fe-trace.c b/src/interfaces/libpq/fe-trace.c
index 19c5b8a8900..30217b687af 100644
--- a/src/interfaces/libpq/fe-trace.c
+++ b/src/interfaces/libpq/fe-trace.c
@@ -478,6 +478,7 @@ pqTraceOutput_Query(FILE *f, const char *message, int 
*cursor)
 {
        fprintf(f, "Query\t");
        pqTraceOutputString(f, message, cursor, false);
+       /* FIXME */
 }
 
 static void
@@ -609,6 +610,7 @@ pqTraceOutput_ReadyForQuery(FILE *f, const char *message, 
int *cursor)
 {
        fprintf(f, "ReadyForQuery\t");
        pqTraceOutputByte1(f, message, cursor);
+       /* FIXME */
 }
 
 /*
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 9579f803538..71b9ad24d53 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -426,6 +426,7 @@ struct pg_conn
        char       *ssl_max_protocol_version;   /* maximum TLS protocol version 
*/
        char       *target_session_attrs;       /* desired session properties */
        char       *require_auth;       /* name of the expected auth method */
+       char       *wait_for_lsn_setting;
        char       *load_balance_hosts; /* load balance over hosts */
 
        bool            cancelRequest;  /* true if this connection is used to 
send a
@@ -448,6 +449,7 @@ struct pg_conn
        ConnStatusType status;
        PGAsyncStatusType asyncStatus;
        PGTransactionStatusType xactStatus; /* never changes to ACTIVE */
+       char            last_lsn[128];
        char            last_sqlstate[6];       /* last reported SQLSTATE */
        bool            options_valid;  /* true if OK to attempt connection */
        bool            nonblocking;    /* whether this connection is using 
nonblock
@@ -529,6 +531,7 @@ struct pg_conn
        PGVerbosity verbosity;          /* error/notice message verbosity */
        PGContextVisibility show_context;       /* whether to show CONTEXT 
field */
        PGlobjfuncs *lobjfuncs;         /* private state for large-object 
access fns */
+       bool            wait_for_lsn_enabled;
        pg_prng_state prng_state;       /* prng state for load balancing 
connections */
 
 
-- 
2.47.0

Reply via email to