On Fri, Dec 9, 2022 at 3:05 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Fri, Dec 9, 2022 at 7:45 AM Peter Smith <smithpb2...@gmail.com> wrote: > > > > On Thu, Dec 8, 2022 at 7:43 PM Masahiko Sawada <sawada.m...@gmail.com> > > wrote: > > > > > > On Thu, Dec 8, 2022 at 4:42 PM Amit Kapila <amit.kapil...@gmail.com> > > > wrote: > > > > > > > > On Thu, Dec 8, 2022 at 12:42 PM Masahiko Sawada <sawada.m...@gmail.com> > > > > wrote: > > > > > > > > > > On Wed, Dec 7, 2022 at 10:03 PM houzj.f...@fujitsu.com > > > > > <houzj.f...@fujitsu.com> wrote: > > > > > > > > > > > > > > > > > > > +static void > > > > > > > +ProcessParallelApplyInterrupts(void) > > > > > > > +{ > > > > > > > + CHECK_FOR_INTERRUPTS(); > > > > > > > + > > > > > > > + if (ShutdownRequestPending) > > > > > > > + { > > > > > > > + ereport(LOG, > > > > > > > + (errmsg("logical replication > > > > > > > parallel > > > > > > > apply worker for subscrip > > > > > > > tion \"%s\" has finished", > > > > > > > + > > > > > > > MySubscription->name))); > > > > > > > + > > > > > > > + apply_worker_clean_exit(false); > > > > > > > + } > > > > > > > + > > > > > > > + if (ConfigReloadPending) > > > > > > > + { > > > > > > > + ConfigReloadPending = false; > > > > > > > + ProcessConfigFile(PGC_SIGHUP); > > > > > > > + } > > > > > > > +} > > > > > > > > > > > > > > I personally think that we don't need to have a function to do > > > > > > > only > > > > > > > these few things. > > > > > > > > > > > > I thought that introduce a new function make the handling of worker > > > > > > specific > > > > > > Interrupts logic similar to other existing ones. Like: > > > > > > ProcessWalRcvInterrupts () in walreceiver.c and > > > > > > HandlePgArchInterrupts() in > > > > > > pgarch.c ... > > > > > > > > > > I think the difference from them is that there is only one place to > > > > > call ProcessParallelApplyInterrupts(). > > > > > > > > > > > > > But I feel it is better to isolate this code in a separate function. > > > > What if we decide to extend it further by having some logic to stop > > > > workers after reloading of config? > > > > > > I think we can separate the function at that time. But let's keep the > > > current code as you and Hou agree with the current code. I'm not going > > > to insist on that. > > > > > > > > > > > > > > > > > > > > --- > > > > > > > server_version = walrcv_server_version(LogRepWorkerWalRcvConn); > > > > > > > options.proto.logical.proto_version = > > > > > > > + server_version >= 160000 ? > > > > > > > LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM : > > > > > > > server_version >= 150000 ? > > > > > > > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : > > > > > > > server_version >= 140000 ? > > > > > > > LOGICALREP_PROTO_STREAM_VERSION_NUM : > > > > > > > LOGICALREP_PROTO_VERSION_NUM; > > > > > > > > > > > > > > Instead of always using the new protocol version, I think we can > > > > > > > use > > > > > > > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM if the streaming is not > > > > > > > 'parallel'. That way, we don't need to change protocl version > > > > > > > check > > > > > > > logic in pgoutput.c and don't need to expose > > > > > > > defGetStreamingMode(). > > > > > > > What do you think? > > > > > > > > > > > > I think that some user can also use the new version number when > > > > > > trying to get > > > > > > changes (via pg_logical_slot_peek_binary_changes or other > > > > > > functions), so I feel > > > > > > leave the check for new version number seems fine. > > > > > > > > > > > > Besides, I feel even if we don't use new version number, we still > > > > > > need to use > > > > > > defGetStreamingMode to check if parallel mode in used as we need to > > > > > > send > > > > > > abort_lsn when parallel is in used. I might be missing something, > > > > > > sorry for > > > > > > that. Can you please explain the idea a bit ? > > > > > > > > > > My idea is that we use LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM if > > > > > (server_version >= 160000 && MySubscription->stream == > > > > > SUBSTREAM_PARALLEL). If the stream is SUBSTREAM_ON, we use > > > > > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM even if server_version is > > > > > 160000. That way, in pgoutput.c, we can send abort_lsn if the protocol > > > > > version is LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM. We don't need > > > > > to send "streaming = parallel" to the publisher since the publisher > > > > > can decide whether or not to send abort_lsn based on the protocol > > > > > version (still needs to send "streaming = on" though). I might be > > > > > missing something. > > > > > > > > > > > > > What if we decide to send some more additional information as part of > > > > another patch like we are discussing in the thread [1]? Now, we won't > > > > be able to decide the version number based on just the streaming > > > > option. Also, in such a case, even for > > > > LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, it may not be a good > > > > idea to send additional abort information unless the user has used the > > > > streaming=parallel option. > > > > > > If we're going to send the additional information, it makes sense to > > > send streaming=parallel. But the next question came to me is why do we > > > need to increase the protocol version for parallel apply feature? If > > > sending the additional information is also controlled by an option > > > like "streaming", we can decide what we send based on these options, > > > no? > > > > > > > AFAIK the protocol version defines what protocol message bytes are > > transmitted on the wire. So I thought the protocol version should > > *always* be updated whenever the message format changes. In other > > words, I don't think we ought to be transmitting different protocol > > message formats unless it is a different protocol version. > > > > Whether the pub/sub implementation actually needs to check that > > protocol version or whether we happen to have some alternative knob we > > can check doesn't change what the protocol version is supposed to > > mean. And the PGDOCS [1] and [2] currently have clear field notes > > about when those fields are present (e.g. "This field is available > > since protocol version XXX"), but if hypothetically you don't change > > the protocol version for some new fields then now the message format > > becomes tied to the built-in implementation of pub/sub -- now what > > field note will you say instead to explain that? > > > > I think the protocol version acts as a backstop to not send some > information which clients don't understand. Now, the other way is to > believe the client when it sends a particular option (say streaming = > on (aka allow sending in-progress transactions)) that it will > understand additional information for that feature but the protocol > version acts as a backstop in that case.
Yeah, it seems that this is how the logical replication protocol has been working. New logical replication protocol versions have backward compatibility. I was thinking that the protocol version needs to bump if there is no compatibility, i.g. if most clients need to change to support new protocols. > As Peter mentioned, it will > be easier to explain the additional information we are sending across > different versions without relying on additional options for pub/sub. > Having said that, we can send additional required information based on > just the new option but I felt it is better to bump the protocol > version along with it unless we see any downside to it. What do you > think? I agree to bump the protocol version. Regards, -- Masahiko Sawada Amazon Web Services: https://aws.amazon.com