On Wed, Jun 03, 2020 at 09:18:19AM +0900, Kyotaro Horiguchi wrote: > Thanks to all!
Indeed, this was incorrect. And you may not have noticed, but we have a second instance of that in LogicalIncreaseRestartDecodingForSlot() that goes down to 9.4 and b89e151. I used a dirty-still-efficient hack to detect that, and that's the only instance I have spotted. I am not sure if that's worth worrying a back-patch, but we should really address that at least on HEAD. Attached is an extra patch to close the loop. -- Michael
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index dc69e5ce5f..8686d56f9f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -972,6 +972,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart { slot->candidate_restart_valid = current_lsn; slot->candidate_restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); elog(DEBUG1, "got new restart lsn %X/%X at %X/%X", (uint32) (restart_lsn >> 32), (uint32) restart_lsn, @@ -979,18 +980,25 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart } else { + XLogRecPtr candidate_restart_lsn; + XLogRecPtr candidate_restart_valid; + XLogRecPtr confirmed_flush; + + candidate_restart_lsn = slot->candidate_restart_lsn; + candidate_restart_valid = slot->candidate_restart_valid; + confirmed_flush = slot->data.confirmed_flush; + SpinLockRelease(&slot->mutex); + elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X", (uint32) (restart_lsn >> 32), (uint32) restart_lsn, (uint32) (current_lsn >> 32), (uint32) current_lsn, - (uint32) (slot->candidate_restart_lsn >> 32), - (uint32) slot->candidate_restart_lsn, - (uint32) (slot->candidate_restart_valid >> 32), - (uint32) slot->candidate_restart_valid, - (uint32) (slot->data.confirmed_flush >> 32), - (uint32) slot->data.confirmed_flush - ); + (uint32) (candidate_restart_lsn >> 32), + (uint32) candidate_restart_lsn, + (uint32) (candidate_restart_valid >> 32), + (uint32) candidate_restart_valid, + (uint32) (confirmed_flush >> 32), + (uint32) confirmed_flush); } - SpinLockRelease(&slot->mutex); /* candidates are already valid with the current flush position, apply */ if (updated_lsn)
signature.asc
Description: PGP signature