[
https://issues.apache.org/jira/browse/KAFKA-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643791#comment-16643791
]
ASF GitHub Bot commented on KAFKA-7366:
---------------------------------------
junrao closed pull request #5728: KAFKA-7366: Make topic configs segment.bytes
and segment.ms to take effect immediately
URL: https://github.com/apache/kafka/pull/5728
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index 094473a8e26..bc328d77efc 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -145,6 +145,26 @@ case class CompletedTxn(producerId: Long, firstOffset:
Long, lastOffset: Long, i
}
}
+/**
+ * A class used to hold params required to decide to rotate a log segment or
not.
+ */
+case class RollParams(maxSegmentMs: Long,
+ maxSegmentBytes: Int,
+ maxTimestampInMessages: Long,
+ maxOffsetInMessages: Long,
+ messagesSize: Int,
+ now: Long)
+
+object RollParams {
+ def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int,
now: Long): RollParams = {
+ new RollParams(config.segmentMs,
+ config.segmentSize,
+ appendInfo.maxTimestamp,
+ appendInfo.lastOffset,
+ messagesSize, now)
+ }
+}
+
/**
* An append-only log for storing messages.
*
@@ -1493,7 +1513,7 @@ class Log(@volatile var dir: File,
val maxTimestampInMessages = appendInfo.maxTimestamp
val maxOffsetInMessages = appendInfo.lastOffset
- if (segment.shouldRoll(messagesSize, maxTimestampInMessages,
maxOffsetInMessages, now)) {
+ if (segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now)))
{
debug(s"Rolling new log segment (log_size =
${segment.size}/${config.segmentSize}}, " +
s"offset_index_size =
${segment.offsetIndex.entries}/${segment.offsetIndex.maxEntries}, " +
s"time_index_size =
${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala
b/core/src/main/scala/kafka/log/LogSegment.scala
index 80763a8d797..d910a29100c 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -45,8 +45,10 @@ import scala.math._
* @param log The file records containing log entries
* @param offsetIndex The offset index
* @param timeIndex The timestamp index
+ * @param txnIndex The transaction index
* @param baseOffset A lower bound on the offsets in this segment
* @param indexIntervalBytes The approximate number of bytes between entries
in the index
+ * @param rollJitterMs The maximum random jitter subtracted from the scheduled
segment roll time
* @param time The time instance
*/
@nonthreadsafe
@@ -57,15 +59,13 @@ class LogSegment private[log] (val log: FileRecords,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
- val maxSegmentMs: Long,
- val maxSegmentBytes: Int,
val time: Time) extends Logging {
- def shouldRoll(messagesSize: Int, maxTimestampInMessages: Long,
maxOffsetInMessages: Long, now: Long): Boolean = {
- val reachedRollMs = timeWaitedForRoll(now, maxTimestampInMessages) >
maxSegmentMs - rollJitterMs
- size > maxSegmentBytes - messagesSize ||
+ def shouldRoll(rollParams: RollParams): Boolean = {
+ val reachedRollMs = timeWaitedForRoll(rollParams.now,
rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs
+ size > rollParams.maxSegmentBytes - rollParams.messagesSize ||
(size > 0 && reachedRollMs) ||
- offsetIndex.isFull || timeIndex.isFull ||
!canConvertToRelativeOffset(maxOffsetInMessages)
+ offsetIndex.isFull || timeIndex.isFull ||
!canConvertToRelativeOffset(rollParams.maxOffsetInMessages)
}
def resizeIndexes(size: Int): Unit = {
@@ -637,8 +637,6 @@ object LogSegment {
baseOffset,
indexIntervalBytes = config.indexInterval,
rollJitterMs = config.randomSegmentJitter,
- maxSegmentMs = config.segmentMs,
- maxSegmentBytes = config.segmentSize,
time)
}
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 40b687443a0..353e5537588 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -38,9 +38,8 @@ class LogSegmentTest {
/* create a segment with the given base offset */
def createSegment(offset: Long,
indexIntervalBytes: Int = 10,
- maxSegmentMs: Int = Int.MaxValue,
time: Time = Time.SYSTEM): LogSegment = {
- val seg = LogUtils.createSegment(offset, logDir, indexIntervalBytes,
maxSegmentMs, time)
+ val seg = LogUtils.createSegment(offset, logDir, indexIntervalBytes, time)
segments += seg
seg
}
@@ -163,10 +162,10 @@ class LogSegmentTest {
val maxSegmentMs = 300000
val time = new MockTime
- val seg = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
+ val seg = createSegment(0, time = time)
seg.close()
- val reopened = createSegment(0, maxSegmentMs = maxSegmentMs, time = time)
+ val reopened = createSegment(0, time = time)
assertEquals(0, seg.timeIndex.sizeInBytes)
assertEquals(0, seg.offsetIndex.sizeInBytes)
@@ -176,24 +175,21 @@ class LogSegmentTest {
assertFalse(reopened.timeIndex.isFull)
assertFalse(reopened.offsetIndex.isFull)
- assertFalse(reopened.shouldRoll(messagesSize = 1024,
- maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
- maxOffsetInMessages = 100L,
- now = time.milliseconds()))
+ var rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue,
RecordBatch.NO_TIMESTAMP,
+ maxOffsetInMessages = 100L, messagesSize = 1024, time.milliseconds())
+ assertFalse(reopened.shouldRoll(rollParams))
// The segment should not be rolled even if maxSegmentMs has been exceeded
time.sleep(maxSegmentMs + 1)
assertEquals(maxSegmentMs + 1,
reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP))
- assertFalse(reopened.shouldRoll(messagesSize = 1024,
- maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
- maxOffsetInMessages = 100L,
- now = time.milliseconds()))
+ rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue,
RecordBatch.NO_TIMESTAMP,
+ maxOffsetInMessages = 100L, messagesSize = 1024, time.milliseconds())
+ assertFalse(reopened.shouldRoll(rollParams))
// But we should still roll the segment if we cannot fit the next offset
- assertTrue(reopened.shouldRoll(messagesSize = 1024,
- maxTimestampInMessages = RecordBatch.NO_TIMESTAMP,
- maxOffsetInMessages = Int.MaxValue.toLong + 200,
- now = time.milliseconds()))
+ rollParams = RollParams(maxSegmentMs, maxSegmentBytes = Int.MaxValue,
RecordBatch.NO_TIMESTAMP,
+ maxOffsetInMessages = Int.MaxValue.toLong + 200L, messagesSize = 1024,
time.milliseconds())
+ assertTrue(reopened.shouldRoll(rollParams))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 151c4ed0ad8..77289989546 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -277,7 +277,7 @@ class LogTest {
override def addSegment(segment: LogSegment): LogSegment = {
val wrapper = new LogSegment(segment.log, segment.offsetIndex,
segment.timeIndex, segment.txnIndex, segment.baseOffset,
- segment.indexIntervalBytes, segment.rollJitterMs,
segment.maxSegmentMs, segment.maxSegmentBytes, mockTime) {
+ segment.indexIntervalBytes, segment.rollJitterMs, mockTime) {
override def read(startOffset: Long, maxOffset: Option[Long],
maxSize: Int, maxPosition: Long,
minOneMessage: Boolean): FetchDataInfo = {
diff --git a/core/src/test/scala/unit/kafka/log/LogUtils.scala
b/core/src/test/scala/unit/kafka/log/LogUtils.scala
index eb218952d0a..8652aa509d0 100644
--- a/core/src/test/scala/unit/kafka/log/LogUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogUtils.scala
@@ -29,13 +29,12 @@ object LogUtils {
def createSegment(offset: Long,
logDir: File,
indexIntervalBytes: Int = 10,
- maxSegmentMs: Int = Int.MaxValue,
time: Time = Time.SYSTEM): LogSegment = {
val ms = FileRecords.open(Log.logFile(logDir, offset))
val idx = new OffsetIndex(Log.offsetIndexFile(logDir, offset), offset,
maxIndexSize = 1000)
val timeIdx = new TimeIndex(Log.timeIndexFile(logDir, offset), offset,
maxIndexSize = 1500)
val txnIndex = new TransactionIndex(offset,
Log.transactionIndexFile(logDir, offset))
- new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0,
maxSegmentMs, Int.MaxValue, time)
+ new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0,
time)
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 510c4a3e273..cabe0a984e2 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -59,6 +59,33 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness
{
}
}
+ @Test
+ def testDynamicTopicConfigChange() {
+ val tp = new TopicPartition("test", 0)
+ val oldSegmentSize = 1000
+ val logProps = new Properties()
+ logProps.put(SegmentBytesProp, oldSegmentSize.toString)
+ createTopic(tp.topic, 1, 1, logProps)
+ TestUtils.retry(10000) {
+ val logOpt = this.servers.head.logManager.getLog(tp)
+ assertTrue(logOpt.isDefined)
+ assertEquals(oldSegmentSize, logOpt.get.config.segmentSize)
+ }
+
+ val log = servers.head.logManager.getLog(tp).get
+
+ val newSegmentSize = 2000
+ logProps.put(SegmentBytesProp, newSegmentSize.toString)
+ adminZkClient.changeTopicConfig(tp.topic, logProps)
+ TestUtils.retry(10000) {
+ assertEquals(newSegmentSize, log.config.segmentSize)
+ }
+
+ (1 to 50).foreach(i => TestUtils.produceMessage(servers, tp.topic,
i.toString))
+ // Verify that the new config is used for all segments
+ assertTrue("Log segment size change not applied",
log.logSegments.forall(_.size > 1000))
+ }
+
private def testQuotaConfigChange(user: String, clientId: String,
rootEntityType: String, configEntityName: String) {
assertTrue("Should contain a ConfigHandler for " + rootEntityType ,
this.servers.head.dynamicConfigHandlers.contains(rootEntityType))
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> topic level segment.bytes and segment.ms not taking effect immediately
> ----------------------------------------------------------------------
>
> Key: KAFKA-7366
> URL: https://issues.apache.org/jira/browse/KAFKA-7366
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 1.1.0, 2.0.0
> Reporter: Jun Rao
> Assignee: Manikumar
> Priority: Major
>
> It used to be that topic level configs such as segment.bytes takes effect
> immediately. Because of KAFKA-6324 in 1.1, those configs now only take effect
> after the active segment has rolled. The relevant part of KAFKA-6324 is that
> in Log.maybeRoll, the checking of the segment rolling is moved to
> LogSegment.shouldRoll().
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)