On Tuesday, December 13, 2022 11:25 PM Masahiko Sawada <sawada.m...@gmail.com> wrote: > > On Sun, Dec 11, 2022 at 8:45 PM houzj.f...@fujitsu.com > <houzj.f...@fujitsu.com> wrote: > > > > On Friday, December 9, 2022 3:14 PM Amit Kapila > <amit.kapil...@gmail.com> wrote: > > > > > > On Thu, Dec 8, 2022 at 12:37 PM houzj.f...@fujitsu.com > > > <houzj.f...@fujitsu.com> wrote: > > > > > > > > > > Review comments > > > > Thanks for the comments! > > > > > ============== > > > 1. Currently, we don't release the stream lock in LA (leade apply > > > worker) for "rollback to savepoint" and the reason is mentioned in > > > comments of > > > apply_handle_stream_abort() in the patch. But, today, while testing, > > > I found that can lead to deadlock which otherwise, won't happen on > > > the publisher. The key point is rollback to savepoint releases the > > > locks acquired by the particular subtransaction, so parallel apply > > > worker should also do the same. Consider the following example where > > > the transaction in session-1 is being performed by the parallel > > > apply worker and the transaction in session-2 is being performed by the > leader apply worker. I have simulated it by using GUC force_stream_mode. > > > Publisher > > > ========== > > > Session-1 > > > postgres=# begin; > > > BEGIN > > > postgres=*# savepoint s1; > > > SAVEPOINT > > > postgres=*# truncate t1; > > > TRUNCATE TABLE > > > > > > Session-2 > > > postgres=# begin; > > > BEGIN > > > postgres=*# insert into t1 values(4); > > > > > > Session-1 > > > postgres=*# rollback to savepoint s1; ROLLBACK > > > > > > Session-2 > > > Commit; > > > > > > With or without commit of Session-2, this scenario will lead to > > > deadlock on the subscriber because PA (parallel apply worker) is > > > waiting for LA to send the next command, and LA is blocked by > > > Exclusive of PA. There is no deadlock on the publisher because > > > rollback to savepoint will release the lock acquired by truncate. > > > > > > To solve this, How about if we do three things before sending abort > > > of sub-transaction (a) unlock the stream lock, (b) increment > > > pending_stream_count, > > > (c) take the stream lock again? > > > > > > Now, if the PA is not already waiting on the stop, it will not wait > > > at stream_stop but will wait after applying abort of sub-transaction > > > and if it is already waiting at stream_stop, the wait will be > > > released. If this works then probably we should try to do (b) before (a) > > > to > match the steps with stream_start. > > > > The solution works for me, I have changed the code as suggested. > > > > > > > 2. There seems to be another general problem in the way the patch > > > waits for stream_stop in PA (parallel apply worker). Currently, PA > > > checks, if there are no more pending streams then it tries to wait > > > for the next stream by waiting on a stream lock. However, it is > > > possible after PA checks there is no pending stream and before it > > > actually starts waiting on a lock, the LA sends another stream for > > > which even stream_stop is sent, in this case, PA will start waiting > > > for the next stream whereas there is actually a pending stream > > > available. In this case, it won't lead to any problem apart from > > > delay in applying the changes in such cases but for the case mentioned in > the previous point (Pont 1), it can lead to deadlock even after we implement > the > solution proposed to solve it. > > > > Thanks for reporting, I have introduced another flag in shared memory > > and use it to prevent the leader from incrementing the > > pending_stream_count if the parallel apply worker is trying to lock the > > stream > lock. > > > > > > > 3. The other point to consider is that for > > > stream_commit/prepare/abort, in LA, we release the stream lock after > > > sending the message whereas for stream_start we release it before > > > sending the message. I think for the earlier cases > > > (stream_commit/prepare/abort), the patch has done like this because > > > pa_send_data() may need to require the lock again when it times out > > > and start serializing, so there will be no sense in first releasing > > > it, then re-acquiring it, and then again releasing it. Can't we also > > > release the lock for stream_start after > > > pa_send_data() only if it is not switched to serialize mode? > > > > Changed. > > > > Attach the new version patch set which addressed above comments. > > Here are comments on v59 0001, 0002 patches:
Thanks for the comments! > +void > +pa_increment_stream_block(ParallelApplyWorkerShared *wshared) { > + while (1) > + { > + SpinLockAcquire(&wshared->mutex); > + > + /* > + * Don't try to increment the count if the parallel > apply worker is > + * taking the stream lock. Otherwise, there would be > a race condition > + * that the parallel apply worker checks there is no > pending streaming > + * block and before it actually starts waiting on a > lock, the leader > + * sends another streaming block and take the stream > lock again. In > + * this case, the parallel apply worker will start > waiting for the next > + * streaming block whereas there is actually a > pending streaming block > + * available. > + */ > + if (!wshared->pa_wait_for_stream) > + { > + wshared->pending_stream_count++; > + SpinLockRelease(&wshared->mutex); > + break; > + } > + > + SpinLockRelease(&wshared->mutex); > + } > +} > > I think we should add an assertion to check if we don't hold the stream lock. > > I think that waiting for pa_wait_for_stream to be false in a busy loop is not > a > good idea. It's not interruptible and there is not guarantee that we can break > from this loop in a short time. For instance, if PA executes > pa_decr_and_wait_stream_block() a bit earlier than LA executes > pa_increment_stream_block(), LA has to wait for PA to acquire and release the > stream lock in a busy loop. It should not be long in normal cases but the > duration LA needs to wait for PA depends on PA, which could be long. Also > what if PA raises an error in > pa_lock_stream() due to some reasons? I think LA won't be able to detect the > failure. > > I think we should at least make it interruptible and maybe need to add some > sleep. Or perhaps we can use the condition variable for this case. Thanks for the analysis, I will research this part. > --- > In worker.c, we have the following common pattern: > > case TRANS_LEADER_PARTIAL_SERIALIZE: > write change to the file; > do some work; > break; > > case TRANS_LEADER_SEND_TO_PARALLEL: > pa_send_data(); > > if (winfo->serialize_changes) > { > do some worker required after writing changes to the file. > } > : > break; > > IIUC there are two different paths for partial serialization: (a) where > apply_action is TRANS_LEADER_PARTIAL_SERIALIZE, and (b) where > apply_action is TRANS_LEADER_PARTIAL_SERIALIZE and > winfo->serialize_changes became true. And we need to match what we do > in (a) and (b). Rather than having two different paths for the same case, how > about falling through TRANS_LEADER_PARTIAL_SERIALIZE when we could not > send the changes? That is, pa_send_data() just returns false when the timeout > exceeds and we need to switch to serialize changes, otherwise returns true. > If it > returns false, we prepare for switching to serialize changes such as > initializing > fileset, and fall through TRANS_LEADER_PARTIAL_SERIALIZE case. The code > would be like: > > case TRANS_LEADER_SEND_TO_PARALLEL: > ret = pa_send_data(); > > if (ret) > { > do work for sending changes to PA. > break; > } > > /* prepare for switching to serialize changes */ > winfo->serialize_changes = true; > initialize fileset; > acquire stream lock if necessary; > > /* FALLTHROUGH */ > case TRANS_LEADER_PARTIAL_SERIALIZE: > do work for serializing changes; > break; I think that the suggestion is to extract the code that switch to serialize mode out of the pa_send_data(), and then we need to add that logic in all the functions which call pa_send_data(), I am not sure if it looks better as it might introduce some more codes in each handling function. > --- > /* > - * Unlock the shared object lock so that > parallel apply worker can > - * continue to receive and apply changes. > + * Parallel apply worker might have applied > some changes, so write > + * the STREAM_ABORT message so that it can rollback > the > + * subtransaction if needed. > */ > - pa_unlock_stream(xid, AccessExclusiveLock); > + stream_open_and_write_change(xid, > LOGICAL_REP_MSG_STREAM_ABORT, > + > &original_msg); > + > + if (toplevel_xact) > + { > + pa_unlock_stream(xid, AccessExclusiveLock); > + pa_set_fileset_state(winfo->shared, > FS_SERIALIZE_DONE); > + (void) pa_free_worker(winfo, xid); > + } > > At every place except for the above code, we set the fileset state > FS_SERIALIZE_DONE first then unlock the stream lock. Is there any reason for > that? No, I think we should make them consistent, will change this. > --- > + case TRANS_LEADER_SEND_TO_PARALLEL: > + Assert(winfo); > + > + /* > + * Unlock the shared object lock so that > parallel apply worker can > + * continue to receive and apply changes. > + */ > + pa_unlock_stream(xid, AccessExclusiveLock); > + > + /* > + * For the case of aborting the > subtransaction, we increment the > + * number of streaming blocks and take the > lock again before > + * sending the STREAM_ABORT to ensure that the > parallel apply > + * worker will wait on the lock for the next > set of changes after > + * processing the STREAM_ABORT message if it > is not already waiting > + * for STREAM_STOP message. > + */ > + if (!toplevel_xact) > + { > + pa_increment_stream_block(winfo->shared); > + pa_lock_stream(xid, AccessExclusiveLock); > + } > + > + /* Send STREAM ABORT message to the parallel > apply worker. */ > + pa_send_data(winfo, s->len, s->data); > + > + if (toplevel_xact) > + (void) pa_free_worker(winfo, xid); > + > + break; > > In apply_handle_stream_abort(), it's better to add the comment why we don't > need to wait for PA to finish. Will add. > > Also, given that we don't wait for PA to finish in this case, does it really > make > sense to call pa_free_worker() immediately after sending STREAM_ABORT? I think it's possible that the PA finish the ROLLBACK quickly and the LA can free the worker here in time. > --- > PA acquires the transaction lock in AccessShare mode whereas LA acquires it in > AccessExclusiveMode. Is it better to do the opposite? > Like a backend process acquires a lock on its XID in Exclusive mode, we can > have PA acquire the lock on its XID in Exclusive mode whereas other attempts > to acquire it in Share mode to wait. Agreed, will improve. > --- > void > pa_lock_stream(TransactionId xid, LOCKMODE lockmode) { > LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid, > PARALLEL_APPLY_LOCK_STREAM, > lockmode); } > > I think since we don't need to let the caller to specify the lock mode but > need > only shared and exclusive modes, we can make it simple by having a boolean > argument say shared instead of lockmode. I personally think passing the lockmode would make the code more clear than passing a Boolean value. Best regards, Hou zj