junrao commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r659056491
##########
File path: core/src/main/scala/kafka/log/LogLoader.scala
##########
@@ -90,11 +90,63 @@ object LogLoader extends Logging {
* overflow index offset
*/
def load(params: LoadLogParams): LoadedLogOffsets = {
- // first do a pass through the files in the log directory and remove any
temporary files
+
+ // First pass: through the files in the log directory and remove any
temporary files
// and find any interrupted swap operations
val swapFiles = removeTempFilesAndCollectSwapFiles(params)
- // Now do a second pass and load all the log and index files.
+ // The remaining valid swap files must come from compaction or segment
split operation. We can
+ // simply rename them to regular segment files. But, before renaming, we
should figure out which
+ // segments are compacted and delete these segment files: this is done by
calculating min/maxSwapFileOffset.
+ // We store segments that require renaming in this code block, and do the
actual renaming later.
+ var minSwapFileOffset = Long.MaxValue
+ var maxSwapFileOffset = Long.MinValue
+ val toRenameSwapFiles = mutable.Set[File]()
+ swapFiles.filter(f => Log.isLogFile(new
File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "")))).foreach { f =>
+ val baseOffset = offsetFromFile(f)
+ val segment = LogSegment.open(f.getParentFile,
+ baseOffset = baseOffset,
+ params.config,
+ time = params.time,
+ fileSuffix = Log.SwapFileSuffix)
+ toRenameSwapFiles += f
+ info(s"${params.logIdentifier}Found log file ${f.getPath} from
interrupted swap operation, which is recoverable from ${Log.SwapFileSuffix}
files by renaming.")
+ minSwapFileOffset = Math.min(segment.baseOffset, minSwapFileOffset)
+ maxSwapFileOffset = Math.max(segment.offsetIndex.lastOffset,
maxSwapFileOffset)
Review comment:
This is an existing problem. Calculating the end offset that a segment
covers can be tricky. The problem is that in compaction, we remove records in
the .clean and .swap files. So, the offset of the last record in a segment
doesn't tell us the true end offset of the original segment.
One possibility is to use the base offset of the next segment if present.
##########
File path: core/src/main/scala/kafka/log/LogLoader.scala
##########
@@ -90,11 +90,63 @@ object LogLoader extends Logging {
* overflow index offset
*/
def load(params: LoadLogParams): LoadedLogOffsets = {
- // first do a pass through the files in the log directory and remove any
temporary files
+
+ // First pass: through the files in the log directory and remove any
temporary files
// and find any interrupted swap operations
val swapFiles = removeTempFilesAndCollectSwapFiles(params)
- // Now do a second pass and load all the log and index files.
+ // The remaining valid swap files must come from compaction or segment
split operation. We can
+ // simply rename them to regular segment files. But, before renaming, we
should figure out which
+ // segments are compacted and delete these segment files: this is done by
calculating min/maxSwapFileOffset.
+ // We store segments that require renaming in this code block, and do the
actual renaming later.
+ var minSwapFileOffset = Long.MaxValue
+ var maxSwapFileOffset = Long.MinValue
+ val toRenameSwapFiles = mutable.Set[File]()
+ swapFiles.filter(f => Log.isLogFile(new
File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "")))).foreach { f =>
+ val baseOffset = offsetFromFile(f)
+ val segment = LogSegment.open(f.getParentFile,
+ baseOffset = baseOffset,
+ params.config,
+ time = params.time,
+ fileSuffix = Log.SwapFileSuffix)
+ toRenameSwapFiles += f
+ info(s"${params.logIdentifier}Found log file ${f.getPath} from
interrupted swap operation, which is recoverable from ${Log.SwapFileSuffix}
files by renaming.")
+ minSwapFileOffset = Math.min(segment.baseOffset, minSwapFileOffset)
+ maxSwapFileOffset = Math.max(segment.offsetIndex.lastOffset,
maxSwapFileOffset)
+ }
+
+ // Second pass: delete segments that are between minSwapFileOffset and
maxSwapFileOffset. As
+ // discussed above, these segments were compacted but haven't been renamed
to .delete before
Review comment:
The swap files can also be created during splitting.
##########
File path: core/src/main/scala/kafka/log/LogLoader.scala
##########
@@ -106,7 +158,14 @@ object LogLoader extends Logging {
loadSegmentFiles(params)
})
- completeSwapOperations(swapFiles, params)
+ // Forth pass: rename remaining index swap files. They must be left due to
a broker crash when
Review comment:
Hmm, not sure why we still have swap files at the point. We have renamed
all existing swap files and no new swap files are created.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]