On Wed, Dec 14, 2022 at 9:50 AM houzj.f...@fujitsu.com <houzj.f...@fujitsu.com> wrote: > > On Tuesday, December 13, 2022 11:25 PM Masahiko Sawada > <sawada.m...@gmail.com> wrote: > > > > Here are comments on v59 0001, 0002 patches: > > Thanks for the comments! > > > +void > > +pa_increment_stream_block(ParallelApplyWorkerShared *wshared) { > > + while (1) > > + { > > + SpinLockAcquire(&wshared->mutex); > > + > > + /* > > + * Don't try to increment the count if the parallel > > apply worker is > > + * taking the stream lock. Otherwise, there would be > > a race condition > > + * that the parallel apply worker checks there is no > > pending streaming > > + * block and before it actually starts waiting on a > > lock, the leader > > + * sends another streaming block and take the stream > > lock again. In > > + * this case, the parallel apply worker will start > > waiting for the next > > + * streaming block whereas there is actually a > > pending streaming block > > + * available. > > + */ > > + if (!wshared->pa_wait_for_stream) > > + { > > + wshared->pending_stream_count++; > > + SpinLockRelease(&wshared->mutex); > > + break; > > + } > > + > > + SpinLockRelease(&wshared->mutex); > > + } > > +} > > > > I think we should add an assertion to check if we don't hold the stream > > lock. > > > > I think that waiting for pa_wait_for_stream to be false in a busy loop is > > not a > > good idea. It's not interruptible and there is not guarantee that we can > > break > > from this loop in a short time. For instance, if PA executes > > pa_decr_and_wait_stream_block() a bit earlier than LA executes > > pa_increment_stream_block(), LA has to wait for PA to acquire and release > > the > > stream lock in a busy loop. It should not be long in normal cases but the > > duration LA needs to wait for PA depends on PA, which could be long. Also > > what if PA raises an error in > > pa_lock_stream() due to some reasons? I think LA won't be able to detect the > > failure. > > > > I think we should at least make it interruptible and maybe need to add some > > sleep. Or perhaps we can use the condition variable for this case. >
Or we can leave this while (true) logic altogether for the first version and have a comment to explain this race. Anyway, after restarting, it will probably be solved. We can always change this part of the code later if this really turns out to be problematic. > Thanks for the analysis, I will research this part. > > > --- > > In worker.c, we have the following common pattern: > > > > case TRANS_LEADER_PARTIAL_SERIALIZE: > > write change to the file; > > do some work; > > break; > > > > case TRANS_LEADER_SEND_TO_PARALLEL: > > pa_send_data(); > > > > if (winfo->serialize_changes) > > { > > do some worker required after writing changes to the file. > > } > > : > > break; > > > > IIUC there are two different paths for partial serialization: (a) where > > apply_action is TRANS_LEADER_PARTIAL_SERIALIZE, and (b) where > > apply_action is TRANS_LEADER_PARTIAL_SERIALIZE and > > winfo->serialize_changes became true. And we need to match what we do > > in (a) and (b). Rather than having two different paths for the same case, > > how > > about falling through TRANS_LEADER_PARTIAL_SERIALIZE when we could not > > send the changes? That is, pa_send_data() just returns false when the > > timeout > > exceeds and we need to switch to serialize changes, otherwise returns true. > > If it > > returns false, we prepare for switching to serialize changes such as > > initializing > > fileset, and fall through TRANS_LEADER_PARTIAL_SERIALIZE case. The code > > would be like: > > > > case TRANS_LEADER_SEND_TO_PARALLEL: > > ret = pa_send_data(); > > > > if (ret) > > { > > do work for sending changes to PA. > > break; > > } > > > > /* prepare for switching to serialize changes */ > > winfo->serialize_changes = true; > > initialize fileset; > > acquire stream lock if necessary; > > > > /* FALLTHROUGH */ > > case TRANS_LEADER_PARTIAL_SERIALIZE: > > do work for serializing changes; > > break; > > I think that the suggestion is to extract the code that switch to serialize > mode out of the pa_send_data(), and then we need to add that logic in all the > functions which call pa_send_data(), I am not sure if it looks better as it > might introduce some more codes in each handling function. > How about extracting the common code from apply_handle_stream_commit and apply_handle_stream_prepare to a separate function say pa_xact_finish_common()? I see there is a lot of common code (unlock the stream, wait for the finish, store flush location, free worker info) in both the functions for TRANS_LEADER_PARTIAL_SERIALIZE and TRANS_LEADER_SEND_TO_PARALLEL cases. > > > --- > > void > > pa_lock_stream(TransactionId xid, LOCKMODE lockmode) { > > LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid, > > PARALLEL_APPLY_LOCK_STREAM, > > lockmode); } > > > > I think since we don't need to let the caller to specify the lock mode but > > need > > only shared and exclusive modes, we can make it simple by having a boolean > > argument say shared instead of lockmode. > > I personally think passing the lockmode would make the code more clear > than passing a Boolean value. > +1. I have made a few changes in the newly added comments and function name in the attached patch. Kindly include this if you find the changes okay. -- With Regards, Amit Kapila.
changes_amit_v60.patch
Description: Binary data