kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r643446123



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1572,144 +1414,69 @@ class Log(@volatile private var _dir: File,
         .map(_.messageOffset)
         .getOrElse(maxOffsetInMessages - Integer.MAX_VALUE)
 
-      roll(Some(rollOffset))
+      localLog.roll(Some(rollOffset), Some(rollAction))
     } else {
       segment
     }
   }
 
   /**
-   * Roll the log over to a new active segment starting with the current 
logEndOffset.
+   * Roll the local log over to a new active segment starting with the current 
logEndOffset.
    * This will trim the index to the exact size of the number of entries it 
currently contains.
    *
    * @return The newly rolled segment
    */
   def roll(expectedNextOffset: Option[Long] = None): LogSegment = {
-    maybeHandleIOException(s"Error while rolling log segment for 
$topicPartition in dir ${dir.getParent}") {
-      val start = time.hiResClockMs()
-      lock synchronized {
-        checkIfMemoryMappedBufferClosed()
-        val newOffset = math.max(expectedNextOffset.getOrElse(0L), 
logEndOffset)
-        val logFile = Log.logFile(dir, newOffset)
-
-        if (segments.contains(newOffset)) {
-          // segment with the same base offset already exists and loaded
-          if (activeSegment.baseOffset == newOffset && activeSegment.size == 
0) {
-            // We have seen this happen (see KAFKA-6388) after shouldRoll() 
returns true for an
-            // active segment of size zero because of one of the indexes is 
"full" (due to _maxEntries == 0).
-            warn(s"Trying to roll a new log segment with start offset 
$newOffset " +
-                 s"=max(provided offset = $expectedNextOffset, LEO = 
$logEndOffset) while it already " +
-                 s"exists and is active with size 0. Size of time index: 
${activeSegment.timeIndex.entries}," +
-                 s" size of offset index: 
${activeSegment.offsetIndex.entries}.")
-            removeAndDeleteSegments(Seq(activeSegment), asyncDelete = true, 
LogRoll)
-          } else {
-            throw new KafkaException(s"Trying to roll a new log segment for 
topic partition $topicPartition with start offset $newOffset" +
-                                     s" =max(provided offset = 
$expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
-                                     s"segment is ${segments.get(newOffset)}.")
-          }
-        } else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) {
-          throw new KafkaException(
-            s"Trying to roll a new log segment for topic partition 
$topicPartition with " +
-            s"start offset $newOffset =max(provided offset = 
$expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active 
segment $activeSegment")
-        } else {
-          val offsetIdxFile = offsetIndexFile(dir, newOffset)
-          val timeIdxFile = timeIndexFile(dir, newOffset)
-          val txnIdxFile = transactionIndexFile(dir, newOffset)
-
-          for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) 
if file.exists) {
-            warn(s"Newly rolled segment file ${file.getAbsolutePath} already 
exists; deleting it first")
-            Files.delete(file.toPath)
-          }
-
-          segments.lastSegment.foreach(_.onBecomeInactiveSegment())
-        }
-
-        // take a snapshot of the producer state to facilitate recovery. It is 
useful to have the snapshot
-        // offset align with the new segment offset since this ensures we can 
recover the segment by beginning
-        // with the corresponding snapshot file and scanning the segment data. 
Because the segment base offset
-        // may actually be ahead of the current producer state end offset 
(which corresponds to the log end offset),
-        // we manually override the state offset here prior to taking the 
snapshot.
-        producerStateManager.updateMapEndOffset(newOffset)
-        producerStateManager.takeSnapshot()
-
-        val segment = LogSegment.open(dir,
-          baseOffset = newOffset,
-          config,
-          time = time,
-          initFileSize = config.initFileSize,
-          preallocate = config.preallocate)
-        addSegment(segment)
-
-        // We need to update the segment base offset and append position data 
of the metadata when log rolls.
-        // The next offset should not change.
-        updateLogEndOffset(nextOffsetMetadata.messageOffset)
-
-        // schedule an asynchronous flush of the old segment
-        scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
-
-        info(s"Rolled new log segment at offset $newOffset in 
${time.hiResClockMs() - start} ms.")
-
-        segment
-      }
+    lock synchronized {
+      localLog.roll(expectedNextOffset, Some(rollAction))
     }
   }
 
   /**
    * The number of messages appended to the log since the last flush
    */
-  private def unflushedMessages: Long = this.logEndOffset - this.recoveryPoint
+  private def unflushedMessages: Long = logEndOffset - localLog.recoveryPoint
 
   /**
-   * Flush all log segments
+   * Flush all local log segments
    */
   def flush(): Unit = flush(this.logEndOffset)
 
   /**
-   * Flush log segments for all offsets up to offset-1
+   * Flush local log segments for all offsets up to offset-1
    *
    * @param offset The offset to flush up to (non-inclusive); the new recovery 
point
    */
   def flush(offset: Long): Unit = {
     maybeHandleIOException(s"Error while flushing log for $topicPartition in 
dir ${dir.getParent} with offset $offset") {
-      if (offset > this.recoveryPoint) {
+      if (offset > localLog.recoveryPoint) {
         debug(s"Flushing log up to offset $offset, last flushed: 
$lastFlushTime,  current time: ${time.milliseconds()}, " +
           s"unflushed: $unflushedMessages")
-        val segments = logSegments(this.recoveryPoint, offset)
-        segments.foreach(_.flush())
-        // if there are any new segments, we need to flush the parent 
directory for crash consistency
-        segments.lastOption.filter(_.baseOffset >= 
this.recoveryPoint).foreach(_ => Utils.flushDir(dir.toPath))
-
+        localLog.flush(offset)
         lock synchronized {
-          checkIfMemoryMappedBufferClosed()
-          if (offset > this.recoveryPoint) {
-            this.recoveryPoint = offset
-            lastFlushedTime.set(time.milliseconds)
-          }
+          localLog.markFlushed(offset)
         }
       }
     }
   }
 
   /**
-   * Completely delete this log directory and all contents from the file 
system with no delay
+   * Completely delete the local log directory and all contents from the file 
system with no delay
    */
   private[log] def delete(): Unit = {
     maybeHandleIOException(s"Error while deleting log for $topicPartition in 
dir ${dir.getParent}") {
       lock synchronized {
-        checkIfMemoryMappedBufferClosed()
         producerExpireCheck.cancel(true)
-        removeAndDeleteSegments(logSegments, asyncDelete = false, LogDeletion)
         leaderEpochCache.foreach(_.clear())
-        Utils.delete(dir)
-        // File handlers will be closed if this log is deleted
-        isMemoryMappedBufferClosed = true
+        val deletedSegments = localLog.delete()
+        deleteProducerSnapshotAsync(deletedSegments)

Review comment:
       That's a good point. The best solution I could think of is to split the 
functionality up into few different APIs in `LocalLog`. For example, this is 
how the right implementation could look like in `Log.scala`:
   
   ```
   // In `Log.scala`:
   private[log] def delete(): Unit = {
     maybeHandleIOException(s"Error while deleting log for $topicPartition in 
dir ${dir.getParent}") {
       lock synchronized {
         producerExpireCheck.cancel(true)
         leaderEpochCache.foreach(_.clear())
         val deletedSegments = localLog.deleteAllSegments()
         deleteProducerSnapshotAsync(deletedSegments)
         localLog.deleteDir()
       }
     }
   }
     
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to