On Tuesday, September 27, 2022 2:32 PM Kuroda, Hayato/黒田 隼人 <kuroda.hay...@fujitsu.com> > > Dear Wang, > > Followings are comments for your patchset.
Thanks for the comments. > ==== > 0001 > > > 01. launcher.c - logicalrep_worker_stop_internal() > > ``` > + > + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); > + > ``` Changed. > I think it should be Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, > LW_SHARED)) because the lock is released one and acquired again as > LW_SHARED. > If newer function has been acquired lock as LW_EXCLUSIVE and call > logicalrep_worker_stop_internal(), > its lock may become weaker after calling it. > > 02. launcher.c - apply_handle_stream_start() > > ``` > + /* > + * Notify handle methods we're processing a remote > in-progress > + * transaction. > + */ > + in_streamed_transaction = true; > > - MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); > - FileSetInit(MyLogicalRepWorker->stream_fileset); > + /* > + * Start a transaction on stream start, this > transaction > will be > + * committed on the stream stop unless it is a > tablesync worker in > + * which case it will be committed after processing > all > the > + * messages. We need the transaction for handling the > buffile, > + * used for serializing the streaming data and subxact > info. > + */ > + begin_replication_step(); > ``` > > Previously in_streamed_transaction was set after the begin_replication_step(), > but the ordering is modified. Maybe we don't have to modify it if there is no > particular reason. > > 03. launcher.c - apply_handle_stream_stop() > > ``` > + /* Commit the per-stream transaction */ > + CommitTransactionCommand(); > + > + /* Reset per-stream context */ > + MemoryContextReset(LogicalStreamingContext); > + > + pgstat_report_activity(STATE_IDLE, NULL); > + > + in_streamed_transaction = false; > ``` > > Previously in_streamed_transaction was set after the MemoryContextReset(), > but the ordering is modified. > Maybe we don't have to modify it if there is no particular reason. I adjusted the position of this due to some other improvements this time. > > 04. applyparallelworker.c - LogicalParallelApplyLoop() > > ``` > + shmq_res = shm_mq_receive(mqh, &len, &data, false); > ... > + if (ConfigReloadPending) > + { > + ConfigReloadPending = false; > + ProcessConfigFile(PGC_SIGHUP); > + } > ``` > > > Here the parallel apply worker waits to receive messages and after dispatching > it ProcessConfigFile() is called. > It means that .conf will be not read until the parallel apply worker receives > new > messages and apply them. > > It may be problematic when users change log_min_message to debugXXX for > debugging but the streamed transaction rarely come. > They expected that detailed description appears on the log from next > streaming chunk, but it does not. > > This does not occur in leader worker when it waits messages from publisher, > because it uses libpqrcv_receive(), which works asynchronously. > > I 'm not sure whether it should be documented that the evaluation of GUCs may > be delayed, how do you think? I changed the shm_mq_receive to asynchronous mode which is also consistent with what we did for Gather node when reading data from parallel query workers. > > === > 0004 > > 05. logical-replication.sgml > > ``` > ... > In that case, it may be necessary to change the streaming mode to on or off > and > cause the same conflicts again so the finish LSN of the failed transaction > will be > written to the server log. > ... > ``` > > Above sentence is added by 0001, but it is not modified by 0004. > Such transactions will be retried as streaming=on mode, so some descriptions > related with it should be added. Added. Best regards, Hou zj