On Wed, Mar 16, 2022 at 11:57 AM wangw.f...@fujitsu.com <wangw.f...@fujitsu.com> wrote: > > On Wed, Mar 9, 2022 at 2:45 PM Masahiko Sawada <sawada.m...@gmail.com> wrote: > > > Thanks for your comments. > > > On Wed, Mar 9, 2022 at 10:26 AM I wrote: > > > On Tue, Mar 8, 2022 at 3:52 PM Masahiko Sawada <sawada.m...@gmail.com> > > wrote: > > > > I've looked at the patch and have a question: > > > Thanks for your review and comments. > > > > > > > +void > > > > +SendKeepaliveIfNecessary(LogicalDecodingContext *ctx, bool skipped) { > > > > + static int skipped_changes_count = 0; > > > > + > > > > + /* > > > > + * skipped_changes_count is reset when processing changes that > > > > do > > not > > > > + * need to be skipped. > > > > + */ > > > > + if (!skipped) > > > > + { > > > > + skipped_changes_count = 0; > > > > + return; > > > > + } > > > > + > > > > + /* > > > > + * After continuously skipping SKIPPED_CHANGES_THRESHOLD > > > > changes, try to send a > > > > + * keepalive message. > > > > + */ > > > > + #define SKIPPED_CHANGES_THRESHOLD 10000 > > > > + > > > > + if (++skipped_changes_count >= SKIPPED_CHANGES_THRESHOLD) > > > > + { > > > > + /* Try to send a keepalive message. */ > > > > + OutputPluginUpdateProgress(ctx, true); > > > > + > > > > + /* After trying to send a keepalive message, reset the > > > > flag. */ > > > > + skipped_changes_count = 0; > > > > + } > > > > +} > > > > > > > > Since we send a keepalive after continuously skipping 10000 changes, the > > > > originally reported issue can still occur if skipping 10000 changes > > > > took more > > than > > > > the timeout and the walsender didn't send any change while that, is that > > right? > > > Yes, theoretically so. > > > But after testing, I think this value should be conservative enough not to > > reproduce > > > this bug. > > > > But it really depends on the workload, the server condition, and the > > timeout value, right? The logical decoding might involve disk I/O much > > to spill/load intermediate data and the system might be under the > > high-load condition. Why don't we check both the count and the time? > > That is, I think we can send a keep-alive either if we skipped 10000 > > changes or if we didn't sent anything for wal_sender_timeout / 2. > Yes, you are right. > Do you mean that when skipping every change, check if it has been more than > (wal_sender_timeout / 2) without sending anything? > IIUC, I tried to send keep-alive messages based on time before[1], but after > testing, I found that it will brings slight overhead. So I am not sure, in a > function(pgoutput_change) that is invoked frequently, should this kind of > overhead be introduced? > > > Also, the patch changes the current behavior of wal senders; with the > > patch, we send keep-alive messages even when wal_sender_timeout = 0. > > But I'm not sure it's a good idea. The subscriber's > > wal_receiver_timeout might be lower than wal_sender_timeout. Instead, > > I think it's better to periodically check replies and send a reply to > > the keep-alive message sent from the subscriber if necessary, for > > example, every 10000 skipped changes. > Sorry, I could not follow what you said. I am not sure, do you mean the > following? > 1. When we didn't sent anything for (wal_sender_timeout / 2) or we skipped > 10000 changes continuously, we will invoke the function WalSndKeepalive in the > function WalSndUpdateProgress, and send a keepalive message to the subscriber > with requesting an immediate reply. > 2. If after sending a keepalive message, and then 10000 changes are skipped > continuously again. In this case, we need to handle the reply from the > subscriber-side when processing the 10000th change. The handling approach is > to > reply to the confirmation message from the subscriber.
After more thought, can we check only wal_sender_timeout without skip-count? That is, in WalSndUpdateProgress(), if we have received any reply from the subscriber in last (wal_sender_timeout / 2), we don't need to do anything in terms of keep-alive. If not, we do ProcessRepliesIfAny() (and probably WalSndCheckTimeOut()?) then WalSndKeepalivesIfNecessary(). That way, we can send keep-alive messages every (wal_sender_timeout / 2). And since we don't call them for every change, we would not need to worry about the overhead much. Actually, WalSndWriteData() does similar things; even in the case where we don't skip consecutive changes (i.e., sending consecutive changes to the subscriber), we do ProcessRepliesIfAny() at least every (wal_sender_timeout / 2). I think this would work in most common cases where the user sets both wal_sender_timeout and wal_receiver_timeout to the same value. Regards, -- Masahiko Sawada EDB: https://www.enterprisedb.com/