kowshik commented on a change in pull request #11345: URL: https://github.com/apache/kafka/pull/11345#discussion_r719669247
########## File path: core/src/main/scala/kafka/log/LogSegment.scala ########## @@ -77,8 +77,9 @@ class LogSegment private[log] (val log: FileRecords, timeIndex.resize(size) } - def sanityCheck(timeIndexFileNewlyCreated: Boolean): Unit = { - if (lazyOffsetIndex.file.exists) { + def sanityCheck(timeIndexFileNewlyCreated: Boolean, isActiveSegment: Boolean): Unit = { + // We allow for absence of offset index file only for an empty active segment. + if ((isActiveSegment && size == 0) || lazyOffsetIndex.file.exists) { Review comment: @junrao That is a really good point. Yes, it looks useful to force flush the empty segment during clean shutdown. I couldn't find a good reason that we don't do it today, other than the fact that the flush call [here](https://github.com/apache/kafka/blob/c6aeb5c5546f34276879c1eef4199d208f1c23dc/core/src/main/scala/kafka/log/UnifiedLog.scala#L1508) probably did not cover for the case where the active segment could be empty. I'm thinking a simple fix could be the below change to `UnifiedLog`: ``` /** * Flush all local log segments, including the active segment. */ def flush(): Unit = flush(logEndOffset + 1) ``` Could I open a separate PR with this improvement? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org