kowshik commented on a change in pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#discussion_r719669247



##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -77,8 +77,9 @@ class LogSegment private[log] (val log: FileRecords,
     timeIndex.resize(size)
   }
 
-  def sanityCheck(timeIndexFileNewlyCreated: Boolean): Unit = {
-    if (lazyOffsetIndex.file.exists) {
+  def sanityCheck(timeIndexFileNewlyCreated: Boolean, isActiveSegment: 
Boolean): Unit = {
+    // We allow for absence of offset index file only for an empty active 
segment.
+    if ((isActiveSegment && size == 0) || lazyOffsetIndex.file.exists) {

Review comment:
       @junrao That is a really good point. Yes, it looks useful to force flush 
the empty segment during clean shutdown. I couldn't find a good reason that we 
don't do it today, other than the fact that the flush call 
[here](https://github.com/apache/kafka/blob/c6aeb5c5546f34276879c1eef4199d208f1c23dc/core/src/main/scala/kafka/log/UnifiedLog.scala#L1508)
 probably did not cover for the case where the active segment could be empty. 
I'm thinking a simple fix could be the below change to `UnifiedLog`:
   
   ```
     /**
      * Flush all local log segments, including the active segment.
      */
     def flush(): Unit = flush(logEndOffset + 1)
   ```
   
   Could I open a separate PR with this improvement?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to