[
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587106#comment-16587106
]
ASF GitHub Bot commented on KAFKA-7278:
---------------------------------------
lindong28 closed pull request #5535: Cherry-pick KAFKA-7278; replaceSegments()
should not call asyncDeleteSegment() for segments which have been removed from
segments list
URL: https://github.com/apache/kafka/pull/5535
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index 8b62918bc97..9b423ba5933 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1608,7 +1608,9 @@ class Log(@volatile var dir: File,
}
/**
- * Perform an asynchronous delete on the given file if it exists (otherwise
do nothing)
+ * Perform an asynchronous delete on the given file.
+ *
+ * This method assumes that the file exists and the method is not
thread-safe.
*
* This method does not need to convert IOException (thrown from
changeFileSuffixes) to KafkaStorageException because
* it is either called before all logs are loaded or the caller will catch
and handle IOException
@@ -1655,6 +1657,8 @@ class Log(@volatile var dir: File,
*/
private[log] def replaceSegments(newSegment: LogSegment, oldSegments:
Seq[LogSegment], isRecoveredSwapFile: Boolean = false) {
lock synchronized {
+ val existingOldSegments = oldSegments.filter(seg =>
segments.containsKey(seg.baseOffset))
+
checkIfMemoryMappedBufferClosed()
// need to do this in two phases to be crash safe AND do the delete
asynchronously
// if we crash in the middle of this we complete the swap in
loadSegments()
@@ -1663,7 +1667,7 @@ class Log(@volatile var dir: File,
addSegment(newSegment)
// delete the old files
- for (seg <- oldSegments) {
+ for (seg <- existingOldSegments) {
// remove the index entry
if (seg.baseOffset != newSegment.baseOffset)
segments.remove(seg.baseOffset)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index ae949bf6b85..f6001e9f375 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -21,6 +21,7 @@ import java.io.File
import java.nio._
import java.nio.file.Paths
import java.util.Properties
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import kafka.common._
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
@@ -88,6 +89,74 @@ class LogCleanerTest extends JUnitSuite {
assertEquals(expectedBytesRead, stats.bytesRead)
}
+ @Test
+ def testCleanSegmentsWithConcurrentSegmentDeletion(): Unit = {
+ val deleteStartLatch = new CountDownLatch(1)
+ val deleteCompleteLatch = new CountDownLatch(1)
+
+ // Construct a log instance. The replaceSegments() method of the log
instance is overridden so that
+ // it waits for another thread to execute deleteOldSegments()
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 1024 : java.lang.Integer)
+ logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," +
LogConfig.Delete)
+ val topicPartition = Log.parseTopicPartitionName(dir)
+ val producerStateManager = new ProducerStateManager(topicPartition, dir)
+ val log = new Log(dir,
+ config = LogConfig.fromProps(logConfig.originals,
logProps),
+ logStartOffset = 0L,
+ recoveryPoint = 0L,
+ scheduler = time.scheduler,
+ brokerTopicStats = new BrokerTopicStats, time,
+ maxProducerIdExpirationMs = 60 * 60 * 1000,
+ producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
+ topicPartition = topicPartition,
+ producerStateManager = producerStateManager,
+ logDirFailureChannel = new LogDirFailureChannel(10)) {
+ override def replaceSegments(newSegment: LogSegment, oldSegments:
Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {
+ deleteStartLatch.countDown()
+ if (!deleteCompleteLatch.await(5000, TimeUnit.MILLISECONDS)) {
+ throw new IllegalStateException("Log segment deletion timed out")
+ }
+ super.replaceSegments(newSegment, oldSegments, isRecoveredSwapFile)
+ }
+ }
+
+ // Start a thread which execute log.deleteOldSegments() right before
replaceSegments() is executed
+ val t = new Thread() {
+ override def run(): Unit = {
+ deleteStartLatch.await(5000, TimeUnit.MILLISECONDS)
+ log.maybeIncrementLogStartOffset(log.activeSegment.baseOffset)
+ log.onHighWatermarkIncremented(log.activeSegment.baseOffset)
+ log.deleteOldSegments()
+ deleteCompleteLatch.countDown()
+ }
+ }
+ t.start()
+
+ // Append records so that segment number increase to 3
+ while (log.numberOfSegments < 3) {
+ log.appendAsLeader(record(key = 0, log.logEndOffset.toInt), leaderEpoch
= 0)
+ log.roll()
+ }
+ assertEquals(3, log.numberOfSegments)
+
+ // Remember reference to the first log and determine its file name
expected for async deletion
+ val firstLogFile = log.logSegments.head.log
+ val expectedFileName = CoreUtils.replaceSuffix(firstLogFile.file.getPath,
"", Log.DeletedFileSuffix)
+
+ // Clean the log. This should trigger replaceSegments() and
deleteOldSegments();
+ val offsetMap = new FakeOffsetMap(Int.MaxValue)
+ val cleaner = makeCleaner(Int.MaxValue)
+ val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
+ val stats = new CleanerStats()
+ cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap,
stats)
+ cleaner.cleanSegments(log, segments, offsetMap, 0L, stats)
+
+ // Validate based on the file name that log segment file is renamed
exactly once for async deletion
+ assertEquals(expectedFileName, firstLogFile.file().getPath)
+ assertEquals(2, log.numberOfSegments)
+ }
+
@Test
def testSizeTrimmedForPreallocatedAndCompactedTopic(): Unit = {
val originalMaxFileSize = 1024;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> replaceSegments() should not call asyncDeleteSegment() for segments which
> have been removed from segments list
> --------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
> Issue Type: Improvement
> Reporter: Dong Lin
> Assignee: Dong Lin
> Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every
> segment listed in the `oldSegments`. oldSegments should be constructed from
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is
> determined to the time Log.replaceSegments() is called. If there are
> concurrent async deletion of the same log segment file, Log.replaceSegments()
> will call asyncDeleteSegment() for a segment that does not exist and Kafka
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by
> only deleting segment if the segment can be found in Log.segments.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)