[ https://issues.apache.org/jira/browse/KAFKA-5155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15994831#comment-15994831 ]
Petr Plavjaník commented on KAFKA-5155: --------------------------------------- Hi [~huxi_2b] and [~mihbor], this defect is about the potential data loss that has occurred in our test scenario and not about the ordering of messages based on their timestamps. We were using a non-Java producer that used version 0 of the message format (without timestamp) but the first message in each partition was written by KafkaProducer in Java that used version 1 message format with timestamp. Some messages were lost (written to the log but deleted before they were read) when the first retention was done. The fix for it should not change how timestamps are used elsewhere just make sure that time-based retention. It is listed in section _Potential breaking changes in 0.10.1.0_: {quote}The log retention time is no longer based on last modified time of the log segments. Instead it will be based on the largest timestamp of the messages in a log segment.{quote} But it can be surprising that old producers create messages with no timestamps and that that these are not taken into consideration when the segment is deleted. When I first read it I thought that timestamp of messages is the log append timestamp. The circumstances when the data loss has occurred are quite rare (a segment where the message at the beginning of the log segment has a timestamp and the rest do not) but data loss is not good in any case. I am not sure what is the right way to fix it. One way is just to change the {{deleteRetenionMsBreachedSegments()}} to account for {{lastModified}} timestamp as before. {code} private def deleteRetenionMsBreachedSegments() : Int = { if (config.retentionMs < 0) return 0 val startMs = time.milliseconds deleteOldSegments(startMs - Math.max(_.largestTimestamp, _.lastModified) > config.retentionMs) } {code} The other way is to use current time if {{appendInfo.maxTimestamp}} is {{-1}} in {{Log.append()}}. This also affects log segment rolling. But {{LogSegment.maxTimestampSoFar}} can be changed by {{LogSegment.truncateTo()}} to lower values and then {{Log.deleteRetenionMsBreachedSegments()}} that used {{LogSegment.largestTimestamp}} (uses {{LogSegment.maxTimestampSoFar}}) will not take messages with no timestamp into account. > Messages can be deleted prematurely when some producers use timestamps and > some not > ----------------------------------------------------------------------------------- > > Key: KAFKA-5155 > URL: https://issues.apache.org/jira/browse/KAFKA-5155 > Project: Kafka > Issue Type: Bug > Components: log > Affects Versions: 0.10.2.0 > Reporter: Petr Plavjaník > > Some messages can be deleted prematurely and never read in following > scenario. A producer uses timestamps and produces messages that are appended > to the beginning of a log segment. Other producer produces messages without a > timestamp. In that case the largest timestamp is made by the old messages > with a timestamp and new messages with the timestamp does not influence and > the log segment with old and new messages can be delete immediately after the > last new message with no timestamp is appended. When all appended messages > have no timestamp, then they are not deleted because {{lastModified}} > attribute of a {{LogSegment}} is used. > New test case to {{kafka.log.LogTest}} that fails: > {code} > @Test > def > shouldNotDeleteTimeBasedSegmentsWhenTimestampIsNotProvidedForSomeMessages() { > val retentionMs = 10000000 > val old = TestUtils.singletonRecords("test".getBytes, timestamp = 0) > val set = TestUtils.singletonRecords("test".getBytes, timestamp = -1, > magicValue = 0) > val log = createLog(set.sizeInBytes, retentionMs = retentionMs) > // append some messages to create some segments > log.append(old) > for (_ <- 0 until 12) > log.append(set) > assertEquals("No segment should be deleted", 0, log.deleteOldSegments()) > } > {code} > It can be prevented by using {{def largestTimestamp = > Math.max(maxTimestampSoFar, lastModified)}} in LogSegment, or by using > current timestamp when messages with timestamp {{-1}} are appended. -- This message was sent by Atlassian JIRA (v6.3.15#6346)