On Wednesday, December 7, 2022 7:51 PM Masahiko Sawada <sawada.m...@gmail.com> wrote: > > On Mon, Dec 5, 2022 at 1:29 PM houzj.f...@fujitsu.com > <houzj.f...@fujitsu.com> wrote: > > > > On Sunday, December 4, 2022 7:17 PM houzj.f...@fujitsu.com > <houzj.f...@fujitsu.com> > > > > > > Thursday, December 1, 2022 8:40 PM Amit Kapila > <amit.kapil...@gmail.com> > > > wrote: > > > > Some other comments: > > > ... > > > Attach the new version patch set which addressed most of the comments > > > received so far except some comments being discussed[1]. > > > [1] > https://www.postgresql.org/message-id/OS0PR01MB57167BF64FC0891734C > 8E81A94149%40OS0PR01MB5716.jpnprd01.prod.outlook.com > > > > Attach a new version patch set which fixed a testcase failure on CFbot. > > Here are some comments on v56 0001, 0002 patches. Please ignore > comments if you already incorporated them in v57.
Thanks for the comments! > +static void > +ProcessParallelApplyInterrupts(void) > +{ > + CHECK_FOR_INTERRUPTS(); > + > + if (ShutdownRequestPending) > + { > + ereport(LOG, > + (errmsg("logical replication parallel > apply worker for subscrip > tion \"%s\" has finished", > + MySubscription->name))); > + > + apply_worker_clean_exit(false); > + } > + > + if (ConfigReloadPending) > + { > + ConfigReloadPending = false; > + ProcessConfigFile(PGC_SIGHUP); > + } > +} > > I personally think that we don't need to have a function to do only > these few things. I thought that introduce a new function make the handling of worker specific Interrupts logic similar to other existing ones. Like: ProcessWalRcvInterrupts () in walreceiver.c and HandlePgArchInterrupts() in pgarch.c ... > > Should we change the names to something like > LOGICALREP_STREAM_PARALLEL? Agreed, will change. > --- > + * The lock graph for the above example will look as follows: > + * LA (waiting to acquire the lock on the unique index) -> PA (waiting to > + * acquire the lock on the remote transaction) -> LA > > and > > + * The lock graph for the above example will look as follows: > + * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire > the > + * lock due to unique index constraint) -> PA-1 (waiting to acquire the > stream > + * lock) -> LA > > "(waiting to acquire the lock on the remote transaction)" in the first > example and "(waiting to acquire the stream lock)" in the second > example is the same meaning, right? If so, I think we should use > either term for consistency. Will change. > --- > + bool write_abort_info = (data->streaming == > SUBSTREAM_PARALLEL); > > I think that instead of setting write_abort_info every time when > pgoutput_stream_abort() is called, we can set it once, probably in > PGOutputData, at startup. I thought that since we already have a "stream" flag in PGOutputData, I am not sure if it would be better to introduce another flag for the same option. > --- > server_version = walrcv_server_version(LogRepWorkerWalRcvConn); > options.proto.logical.proto_version = > + server_version >= 160000 ? > LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM : > server_version >= 150000 ? > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : > server_version >= 140000 ? > LOGICALREP_PROTO_STREAM_VERSION_NUM : > LOGICALREP_PROTO_VERSION_NUM; > > Instead of always using the new protocol version, I think we can use > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM if the streaming is not > 'parallel'. That way, we don't need to change protocl version check > logic in pgoutput.c and don't need to expose defGetStreamingMode(). > What do you think? I think that some user can also use the new version number when trying to get changes (via pg_logical_slot_peek_binary_changes or other functions), so I feel leave the check for new version number seems fine. Besides, I feel even if we don't use new version number, we still need to use defGetStreamingMode to check if parallel mode in used as we need to send abort_lsn when parallel is in used. I might be missing something, sorry for that. Can you please explain the idea a bit ? > --- > When max_parallel_apply_workers_per_subscription is changed to a value > lower than the number of parallel worker running at that time, do we > need to stop extra workers? I think we can do this, like adding a check in the main loop of leader worker, and check every time after reloading the conf. OTOH, we will also stop the worker after finishing a transaction, so I am slightly not sure do we need to add another check logic here. But I am fine to add it if you think it would be better. > --- > If a value of max_parallel_apply_workers_per_subscription is not > sufficient, we get the LOG "out of parallel apply workers" every time > when the apply worker doesn't launch a worker. But do we really need > this log? It seems not consistent with > max_sync_workers_per_subscription behavior. I think we can check if > the number of running parallel workers is less than > max_parallel_apply_workers_per_subscription before calling > logicalrep_worker_launch(). What do you think? > > --- > + if (server_version >= 160000 && > + MySubscription->stream == SUBSTREAM_PARALLEL) > + { > + options.proto.logical.streaming_str = pstrdup("parallel"); > + MyLogicalRepWorker->parallel_apply = true; > + } > + else if (server_version >= 140000 && > + MySubscription->stream != SUBSTREAM_OFF) > + { > + options.proto.logical.streaming_str = pstrdup("on"); > + MyLogicalRepWorker->parallel_apply = false; > + } > > I think we don't need to use pstrdup(). Will remove. > --- > - BeginTransactionBlock(); > - CommitTransactionCommand(); /* Completes the preceding Begin > command. */ > + if (!IsTransactionBlock()) > + { > + BeginTransactionBlock(); > + CommitTransactionCommand(); /* Completes the preceding > Begin command. */ > + } > > Do we need this change? In my environment, 'make check-world' passes > without this change. We will start a transaction block when defining the savepoint and we will get a warning[1] if enter this function later. I think there would be some WARNs in the log of " 022_twophase_cascade" test if we remove this check. [1] WARN: there is already a transaction in progress" Best regards, Hou zj