At Wed, 9 Jun 2021 17:32:25 +0500, Abbas Butt <abbas.b...@enterprisedb.com> 
wrote in 
> 
> On Wed, Jun 9, 2021 at 2:30 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
> > Does these keepalive messages are sent at the same frequency even for
> > subscribers?
> 
> Yes, I have tested it with one publisher and one subscriber.
> The moment I start pgbench session I can see keepalive messages sent and
> replied by the subscriber with same frequency.
> 
> > Basically, I wanted to check if we have logical
> > replication set up between 2 nodes then do we send these keep-alive
> > messages flood?
> 
> Yes we do.
> 
> > If not, then why is it different in the case of
> > pg_recvlogical?
> 
> Nothing, the WAL sender behaviour is same in both cases.
> 
> 
> > Is it possible that the write/flush location is not
> > updated at the pace at which we expect?

Yes. MyWalSnd->flush/write are updated far frequently but still
MyWalSnd->write is behind sentPtr by from thousands of bytes up to
less than 1 block (1block = 8192 bytes). (Flush lags are larger than
write lags, of course.)

I counted how many times keepalives are sent for each request length
to logical_read_xlog_page() for 10 seconds pgbench run and replicating
pgbench_history, using the attached change.

size:  sent /notsent/ calls: write lag/ flush lag
   8:     3 /     6 /     3:    5960 /  348962
  16:     1 /     2 /     1:     520 /  201096
  24:  2425 /  4852 /  2461:    5259 /  293569
  98:     2 /     0 /    54:       5 /    1050
 187:     2 /     0 /    94:       0 /    1060
4432:     1 /     0 /     1: 410473592 / 410473592
7617:     2 /     0 /    27:     317 /   17133
8280:     1 /     2 /     4:     390 /     390

Where,

size    is requested data length to logical_read_xlog_page()

sent    is the number of keepalives sent in the loop in WalSndWaitForWal

notsent is the number of runs of the loop in WalSndWaitForWal without
                sending a keepalive

calls   is the number of calls to WalSndWaitForWal

write lag is the bytes MyWalSnd->write is behind from sentPtr at the
        first run of the loop per call to logical_read_xlog_page.

flush lag is the the same to the above for MyWalSnd->flush.

Maybe the line of size=4432 is the first time fetch of WAL.

So this numbers show that WalSndWaitForWal is called almost only at
starting to fetching a record, and in that case the function runs the
loop three times and sends one keepalive by average.

> Well, it is async replication. The receiver can choose to update LSNs at
> its own will, say after 10 mins interval.
> It should only impact the size of WAL retained by the server.
> 
> Please see commit 41d5f8ad73
> > which seems to be talking about a similar problem.
> >
> 
> That commit does not address this problem.

Yeah, at least for me, WalSndWaitForWal send a keepalive per one call.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/access/transam/xlogreader.c 
b/src/backend/access/transam/xlogreader.c
index 42738eb940..ee78116e79 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -571,6 +571,7 @@ err:
  * We fetch the page from a reader-local cache if we know we have the required
  * data and if there hasn't been any error since caching the data.
  */
+int hogestate = -1;
 static int
 ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 {
@@ -605,6 +606,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr 
pageptr, int reqLen)
        {
                XLogRecPtr      targetSegmentPtr = pageptr - targetPageOff;
 
+               hogestate = pageptr + XLOG_BLCKSZ - state->currRecPtr;
                readLen = state->routine.page_read(state, targetSegmentPtr, 
XLOG_BLCKSZ,
                                                                                
   state->currRecPtr,
                                                                                
   state->readBuf);
@@ -623,6 +625,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr 
pageptr, int reqLen)
         * First, read the requested data length, but at least a short page 
header
         * so that we can validate it.
         */
+       hogestate = pageptr + Max(reqLen, SizeOfXLogShortPHD) - 
state->currRecPtr;
        readLen = state->routine.page_read(state, pageptr, Max(reqLen, 
SizeOfXLogShortPHD),
                                                                           
state->currRecPtr,
                                                                           
state->readBuf);
@@ -642,6 +645,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr 
pageptr, int reqLen)
        /* still not enough */
        if (readLen < XLogPageHeaderSize(hdr))
        {
+               hogestate = pageptr + XLogPageHeaderSize(hdr) - 
state->currRecPtr;
                readLen = state->routine.page_read(state, pageptr, 
XLogPageHeaderSize(hdr),
                                                                                
   state->currRecPtr,
                                                                                
   state->readBuf);
@@ -649,6 +653,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr 
pageptr, int reqLen)
                        goto err;
        }
 
+       hogestate = -1;
        /*
         * Now that we know we have the full header, validate it.
         */
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 109c723f4e..0de10c4a31 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1363,17 +1363,45 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, 
XLogRecPtr lsn, TransactionId
  * if we detect a shutdown request (either from postmaster or client)
  * we will return early, so caller must always check.
  */
+unsigned long counts[32768][3] = {0};
+unsigned long lagw[32768] = {0};
+unsigned long lagf[32768] = {0};
+
+void
+PrintCounts(void)
+{
+       int i = 0;
+       for (i = 0 ; i < 32768 ; i++)
+       {
+               if (counts[i][0] + counts[i][1] + counts[i][2] > 0)
+               {
+                       unsigned long wl = 0, fl = 0;
+                       if (counts[i][1] > 0)
+                       {
+                               wl = lagw[i] / counts[i][0];
+                               fl = lagf[i] / counts[i][0];
+                       
+                               ereport(LOG, (errmsg ("[%5d]: %5lu / %5lu / 
%5lu: %5lu  %5lu",
+                                                                         i, 
counts[i][1], counts[i][2], counts[i][0], wl, fl), errhidestmt(true)));
+                       }
+               }
+       }
+}
+
 static XLogRecPtr
 WalSndWaitForWal(XLogRecPtr loc)
 {
        int                     wakeEvents;
        static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+       extern int hogestate;
+       bool            lagtaken = false;
 
        /*
         * Fast path to avoid acquiring the spinlock in case we already know we
         * have enough WAL available. This is particularly interesting if we're
         * far behind.
         */
+       counts[hogestate][0]++;
        if (RecentFlushPtr != InvalidXLogRecPtr &&
                loc <= RecentFlushPtr)
                return RecentFlushPtr;
@@ -1439,7 +1467,39 @@ WalSndWaitForWal(XLogRecPtr loc)
                if (MyWalSnd->flush < sentPtr &&
                        MyWalSnd->write < sentPtr &&
                        !waiting_for_ping_response)
+               {
+                       if (hogestate >= 0)
+                       {
+                               counts[hogestate][1]++;
+                               if (!lagtaken)
+                               {
+                                       lagf[hogestate] += sentPtr - 
MyWalSnd->flush;
+                                       lagw[hogestate] += sentPtr - 
MyWalSnd->write;
+                                       lagtaken = true;
+                               }
+                       }
+//                     ereport(LOG, (errmsg ("KA[%lu/%lu/%lu]: %X/%X %X/%X 
%X/%X %d: %ld",
+//                                                               ka, na, ka + 
na,
+//                                                               
LSN_FORMAT_ARGS(MyWalSnd->flush),
+//                                                               
LSN_FORMAT_ARGS(MyWalSnd->write),
+//                                                               
LSN_FORMAT_ARGS(sentPtr),
+//                                                               
waiting_for_ping_response,
+//                                                               sentPtr - 
MyWalSnd->write)));
                        WalSndKeepalive(false);
+               }
+               else
+               {
+                       if (hogestate >= 0)
+                               counts[hogestate][2]++;
+
+//                     ereport(LOG, (errmsg ("kap[%lu/%lu/%lu]: %X/%X %X/%X 
%X/%X %d: %ld",
+//                                                               ka, na, ka + 
na,
+//                                                               
LSN_FORMAT_ARGS(MyWalSnd->flush),
+//                                                               
LSN_FORMAT_ARGS(MyWalSnd->write),
+//                                                               
LSN_FORMAT_ARGS(sentPtr),
+//                                                               
waiting_for_ping_response,
+//                                                               sentPtr - 
MyWalSnd->write)));
+               }
 
                /* check whether we're done */
                if (loc <= RecentFlushPtr)

Reply via email to