Hi, On 17/11/17 08:35, Kyotaro HORIGUCHI wrote: > > Moving around the code allow us to place ps_is_send_pending() in > the while condition, which seems to be more proper place to do > that. I haven't added test for this particular case. > > I tested this that > > - cleanly applies on the current master HEAD and passes make > check and subscription test. > > - walsender properly chooses the slow-path even if > pq_is_send_pending() is always false. (happens on a fast enough > network) > > - walsender waits properly waits on socket and process-reply time > in WaitLatchOrSocket. > > - walsender exits by timeout on network stall. > > So, I think the patch is functionally perfect. > > I'm a reviewer of this patch but I think I'm not allowed to mark > this "Ready for Commiter" since the last change is made by me. >
Thanks for working on this, but there are couple of problems with your modifications which mean that it does not actually fix the original issue anymore (transaction taking long time to decode while sending changes over network works fine will result in walsender timout). The firs one is that you put pq_is_send_pending() in the while so the while is again never executed if there is no network send pending which makes the if above meaningless. Also you missed ProcessRepliesIfAny() when moving code around. That's needed for timeout calculations to work correctly. So one more revision attached with those things fixed. This version fixes the original issue as well. -- Petr Jelinek http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From 46bbc20a3b4d6e2eed97ddc1b05d828399241c63 Mon Sep 17 00:00:00 2001 From: Petr Jelinek <pjmo...@pjmodos.net> Date: Tue, 12 Sep 2017 17:31:28 +0900 Subject: [PATCH] Fix walsender timeouts when decoding large transaction The logical slots have fast code path for sending data in order to not impose too high per message overhead. The fast path skips checks for interrupts and timeouts. However, the existing coding failed to consider the fact that transaction with large number of changes may take very long to be processed and sent to the client. This causes walsender to ignore interrupts for potentially long time and more importantly it will cause walsender being killed due to timeout at the end of such transaction. This commit changes the fast path to also check for interrupts and only allows calling the fast path when last keeplaive check happened less than half of walsender timeout ago, otherwise the slower code path will be taken. --- src/backend/replication/walsender.c | 70 +++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index fa1db748b5..0c600ed2e3 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1151,6 +1151,8 @@ static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write) { + TimestampTz now; + /* output previously gathered data in a CopyData packet */ pq_putmessage_noblock('d', ctx->out->data, ctx->out->len); @@ -1160,23 +1162,54 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, * several releases by streaming physical replication. */ resetStringInfo(&tmpbuf); - pq_sendint64(&tmpbuf, GetCurrentTimestamp()); + now = GetCurrentTimestamp(); + pq_sendint64(&tmpbuf, now); memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)], tmpbuf.data, sizeof(int64)); - /* fast path */ + CHECK_FOR_INTERRUPTS(); + /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) WalSndShutdown(); - if (!pq_is_send_pending()) - return; + /* Try taking fast path unless we get too close to walsender timeout. */ + if (now < TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2)) + { + if (!pq_is_send_pending()) + return; + } + /* If we have pending write here, go to slow path */ for (;;) { int wakeEvents; long sleeptime; - TimestampTz now; + + /* Check for input from the client */ + ProcessRepliesIfAny(); + + now = GetCurrentTimestamp(); + + /* die if timeout was reached */ + WalSndCheckTimeOut(now); + + /* Send keepalive if the time has come */ + WalSndKeepaliveIfNecessary(now); + + if (!pq_is_send_pending()) + break; + + sleeptime = WalSndComputeSleeptime(now); + + wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | + WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT; + + /* Sleep until something happens or we time out */ + WaitLatchOrSocket(MyLatch, wakeEvents, + MyProcPort->sock, sleeptime, + WAIT_EVENT_WAL_SENDER_WRITE_DATA); /* * Emergency bailout if postmaster has died. This is to avoid the @@ -1198,35 +1231,10 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, SyncRepInitConfig(); } - /* Check for input from the client */ - ProcessRepliesIfAny(); - /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) WalSndShutdown(); - - /* If we finished clearing the buffered data, we're done here. */ - if (!pq_is_send_pending()) - break; - - now = GetCurrentTimestamp(); - - /* die if timeout was reached */ - WalSndCheckTimeOut(now); - - /* Send keepalive if the time has come */ - WalSndKeepaliveIfNecessary(now); - - sleeptime = WalSndComputeSleeptime(now); - - wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | - WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT; - - /* Sleep until something happens or we time out */ - WaitLatchOrSocket(MyLatch, wakeEvents, - MyProcPort->sock, sleeptime, - WAIT_EVENT_WAL_SENDER_WRITE_DATA); - } + }; /* reactivate latch so WalSndLoop knows to continue */ SetLatch(MyLatch); -- 2.14.1