Hi hackers,
Logical replication apply worker by default switches off asynchronous
commit. Cite from documentation of subscription parameters:
```
|synchronous_commit|(|enum|)<https://www.postgresql.org/docs/devel/sql-createsubscription.html#SQL-CREATESUBSCRIPTION-PARAMS-WITH-SYNCHRONOUS-COMMIT>
The value of this parameter overrides thesynchronous_commit
<https://www.postgresql.org/docs/devel/runtime-config-wal.html#GUC-SYNCHRONOUS-COMMIT>setting
within this subscription's apply worker processes. The default value
is|off|.
It is safe to use|off|for logical replication: If the subscriber
loses transactions because of missing synchronization, the data will
be sent again from the publisher.
```
So subscriber can confirm transaction which are not persisted. But
consider a PostgreSQL HA setup with:
* primary node
* (cold) standby node streaming WAL from the primary
* synchronous replication enabled, so that you get zero data loss if
the primary dies
* the primary/standby cluster is a subscriber to a remote PostgreSQL
server
It can happen that:
* the primary streams some transactions from the remote PostgreSQL,
with logical replication
* the primary crashes. Failover to the standby happens
* the standby tries to stream the transactions from the subscriber.
But some transactions are missed, because the primary had already
reported a higher flush LSN.
I wonder if such scenario is considered as an "expected behavior" or
"bug" by community?
It seems to be quite easily fixed (see attached patch).
So should we take in account sync replication in LR apply worker or not?
Thanks to Heikki Linnakangas <hlinn...@iki.fi> for describing this
scenario and Arseny Sher <a...@neon.tech> for providing the patch.
diff --git a/src/backend/replication/logical/worker.c
b/src/backend/replication/logical/worker.c
index 245e9be6f2..e878d882ae 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -173,6 +173,7 @@
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/walreceiver.h"
+#include "replication/walsender_private.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
@@ -3415,6 +3416,17 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
dlist_mutable_iter iter;
XLogRecPtr local_flush = GetFlushRecPtr(NULL);
+ /*
+ * If synchronous replication is configured, take into account its
position.
+ */
+ if (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
+ {
+ /* assumes u64 read is atomic */
+ XLogRecPtr sync_rep_flush =
WalSndCtl->lsn[SYNC_REP_WAIT_FLUSH];
+
+ local_flush = Min(local_flush, sync_rep_flush);
+ }
+
*write = InvalidXLogRecPtr;
*flush = InvalidXLogRecPtr;