On Mon, Dec 5, 2022 at 1:29 PM houzj.f...@fujitsu.com <houzj.f...@fujitsu.com> wrote: > > On Sunday, December 4, 2022 7:17 PM houzj.f...@fujitsu.com > <houzj.f...@fujitsu.com> > > > > Thursday, December 1, 2022 8:40 PM Amit Kapila <amit.kapil...@gmail.com> > > wrote: > > > Some other comments: > > ... > > Attach the new version patch set which addressed most of the comments > > received so far except some comments being discussed[1]. > > [1] > > https://www.postgresql.org/message-id/OS0PR01MB57167BF64FC0891734C8E81A94149%40OS0PR01MB5716.jpnprd01.prod.outlook.com > > Attach a new version patch set which fixed a testcase failure on CFbot.
Here are some comments on v56 0001, 0002 patches. Please ignore comments if you already incorporated them in v57. +static void +ProcessParallelApplyInterrupts(void) +{ + CHECK_FOR_INTERRUPTS(); + + if (ShutdownRequestPending) + { + ereport(LOG, + (errmsg("logical replication parallel apply worker for subscrip tion \"%s\" has finished", + MySubscription->name))); + + apply_worker_clean_exit(false); + } + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } +} I personally think that we don't need to have a function to do only these few things. --- +/* Disallow streaming in-progress transactions. */ +#define SUBSTREAM_OFF 'f' + +/* + * Streaming in-progress transactions are written to a temporary file and + * applied only after the transaction is committed on upstream. + */ +#define SUBSTREAM_ON 't' + +/* + * Streaming in-progress transactions are applied immediately via a parallel + * apply worker. + */ +#define SUBSTREAM_PARALLEL 'p' + While these names look good to me, we already have the following existing values: */ #define LOGICALREP_TWOPHASE_STATE_DISABLED 'd' #define LOGICALREP_TWOPHASE_STATE_PENDING 'p' #define LOGICALREP_TWOPHASE_STATE_ENABLED 'e' /* * The subscription will request the publisher to * have any origin. */ #define LOGICALREP_ORIGIN_NONE "none" /* * The subscription will request the publisher to * of their origin. */ #define LOGICALREP_ORIGIN_ANY "any" Should we change the names to something like LOGICALREP_STREAM_PARALLEL? --- + * The lock graph for the above example will look as follows: + * LA (waiting to acquire the lock on the unique index) -> PA (waiting to + * acquire the lock on the remote transaction) -> LA and + * The lock graph for the above example will look as follows: + * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the + * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream + * lock) -> LA "(waiting to acquire the lock on the remote transaction)" in the first example and "(waiting to acquire the stream lock)" in the second example is the same meaning, right? If so, I think we should use either term for consistency. --- + bool write_abort_info = (data->streaming == SUBSTREAM_PARALLEL); I think that instead of setting write_abort_info every time when pgoutput_stream_abort() is called, we can set it once, probably in PGOutputData, at startup. --- server_version = walrcv_server_version(LogRepWorkerWalRcvConn); options.proto.logical.proto_version = + server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM; Instead of always using the new protocol version, I think we can use LOGICALREP_PROTO_TWOPHASE_VERSION_NUM if the streaming is not 'parallel'. That way, we don't need to change protocl version check logic in pgoutput.c and don't need to expose defGetStreamingMode(). What do you think? --- When max_parallel_apply_workers_per_subscription is changed to a value lower than the number of parallel worker running at that time, do we need to stop extra workers? --- If a value of max_parallel_apply_workers_per_subscription is not sufficient, we get the LOG "out of parallel apply workers" every time when the apply worker doesn't launch a worker. But do we really need this log? It seems not consistent with max_sync_workers_per_subscription behavior. I think we can check if the number of running parallel workers is less than max_parallel_apply_workers_per_subscription before calling logicalrep_worker_launch(). What do you think? --- + if (server_version >= 160000 && + MySubscription->stream == SUBSTREAM_PARALLEL) + { + options.proto.logical.streaming_str = pstrdup("parallel"); + MyLogicalRepWorker->parallel_apply = true; + } + else if (server_version >= 140000 && + MySubscription->stream != SUBSTREAM_OFF) + { + options.proto.logical.streaming_str = pstrdup("on"); + MyLogicalRepWorker->parallel_apply = false; + } I think we don't need to use pstrdup(). --- - BeginTransactionBlock(); - CommitTransactionCommand(); /* Completes the preceding Begin command. */ + if (!IsTransactionBlock()) + { + BeginTransactionBlock(); + CommitTransactionCommand(); /* Completes the preceding Begin command. */ + } Do we need this change? In my environment, 'make check-world' passes without this change. Regards, -- Masahiko Sawada Amazon Web Services: https://aws.amazon.com