This seems similar to what's in
https://issues.apache.org/jira/browse/KAFKA-1065.

Also, could you explain why the preallocated size is set to config.segmentSize
- 2 * config.maxMessageSize, instead of just config.segmentSize?

Thanks,

Jun

On Mon, May 4, 2015 at 8:12 PM, Honghai Chen <honghai.c...@microsoft.com>
wrote:

>   Hi guys,
>         I'm trying add test cases, but below case crashed at line "
> segReopen.recover(64*1024)--> index.trimToValidSize()  ", any idea for it?
> Appreciate your help.
>         The case assume kafka suddenly crash, and need recover the last
> segment.
>
> kafka.log.LogSegmentTest > testCreateWithInitFileSizeCrash FAILED
>     java.io.IOException: The requested operation cannot be performed on a
> file w
> ith a user-mapped section open
>         at java.io.RandomAccessFile.setLength(Native Method)
>         at
> kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292)
>         at
> kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
>         at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283)
>         at
> kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI
> ndex.scala:272)
>         at
> kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
> ala:272)
>         at
> kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
> ala:272)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
>         at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271)
>         at kafka.log.LogSegment.recover(LogSegment.scala:199)
>         at
> kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentTe
> st.scala:306)
>
>   def recover(maxMessageSize: Int): Int = {
>     index.truncate()
>     index.resize(index.maxIndexSize)
>     var validBytes = 0
>     var lastIndexEntry = 0
>     val iter = log.iterator(maxMessageSize)
>     try {
>       while(iter.hasNext) {
>         val entry = iter.next
>         entry.message.ensureValid()
>         if(validBytes - lastIndexEntry > indexIntervalBytes) {
>           // we need to decompress the message, if required, to get the
> offset of the first uncompressed message
>           val startOffset =
>             entry.message.compressionCodec match {
>               case NoCompressionCodec =>
>                 entry.offset
>               case _ =>
>
> ByteBufferMessageSet.deepIterator(entry.message).next().offset
>           }
>           index.append(startOffset, validBytes)
>           lastIndexEntry = validBytes
>         }
>         validBytes += MessageSet.entrySize(entry.message)
>       }
>     } catch {
>       case e: InvalidMessageException =>
>         logger.warn("Found invalid messages in log segment %s at byte
> offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
>     }
>     val truncated = log.sizeInBytes - validBytes
>     log.truncateTo(validBytes)
>     index.trimToValidSize()
>     truncated
>   }
>
> /* create a segment with   pre allocate and Crash*/
>   @Test
>   def testCreateWithInitFileSizeCrash() {
>     val tempDir = TestUtils.tempDir()
>     val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, false,
> 512*1024*1024, true)
>
>     val ms = messages(50, "hello", "there")
>     seg.append(50, ms)
>     val ms2 = messages(60, "alpha", "beta")
>     seg.append(60, ms2)
>     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
>     assertEquals(ms2.toList, read.messageSet.toList)
>     val oldSize = seg.log.sizeInBytes()
>     val oldPosition = seg.log.channel.position
>     val oldFileSize = seg.log.file.length
>     assertEquals(512*1024*1024, oldFileSize)
>     seg.flush()
>     seg.log.channel.close()
>     seg.index.close()
>
>     val segReopen = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime,
> true)
>     segReopen.recover(64*1024)
>     val size = segReopen.log.sizeInBytes()
>     val position = segReopen.log.channel.position
>     val fileSize = segReopen.log.file.length
>     assertEquals(oldPosition, position)
>     assertEquals(oldSize, size)
>     assertEquals(size, fileSize)
>   }
>
>
>
> Thanks, Honghai Chen
> http://aka.ms/kafka
> http://aka.ms/manifold
>
> -----Original Message-----
> From: Sriram Subramanian [mailto:srsubraman...@linkedin.com.INVALID]
> Sent: Friday, April 24, 2015 12:57 AM
> To: dev@kafka.apache.org
> Cc: Roshan Naik
> Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume
> performance under windows and some old Linux file system
>
> +1
>
> Some information on how this will be tested would be useful.
>
> On 4/23/15 9:33 AM, "Jay Kreps" <jay.kr...@gmail.com> wrote:
>
> >Yeah if we understand the optimal policy for a config we always want to
> >set it automatically. In this case I don't think we do yet, but down
> >the road that could be nice. I think for now we should consider this
> >option experimental to give people a chance to try it out.
> >
> >-Jay
> >
> >On Wed, Apr 22, 2015 at 7:32 PM, Honghai Chen
> ><honghai.c...@microsoft.com>
> >wrote:
> >
> >> Hi Roshan,
> >>         Use the 'auto' value maybe will break the rule and mess up
> >> the configuration. @Jay, any thoughts?
> >>
> >> Thanks, Honghai Chen
> >>
> >> -----Original Message-----
> >> From: Sriharsha Chintalapani [mailto:harsh...@fastmail.fm]
> >> Sent: Thursday, April 23, 2015 6:27 AM
> >> To: dev@kafka.apache.org; Roshan Naik
> >> Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve
> >> consume performance under windows and some old Linux file system
> >>
> >> +1 (non-binding).
> >>
> >> --
> >> Harsha
> >>
> >>
> >> On April 22, 2015 at 2:52:12 PM, Roshan Naik (ros...@hortonworks.com)
> >> wrote:
> >>
> >> I see that it is safe to keep it this off by default due to some
> >>concerns.
> >> Eventually, for settings such as this whose 'preferred' value is
> >>platform  specific (or based on other criteria), it might be worth
> >>considering  having a default value that is not a constant but an
> >>'auto' value ..
> >>When
> >> kafka boots up it can automatically use the preferred value. Ofcourse
> >>it  would have to documented as to what auto means for a given platform.
> >>
> >> -roshan
> >>
> >>
> >> On 4/22/15 1:21 PM, "Jakob Homan" <jgho...@gmail.com> wrote:
> >>
> >> >+1. This is an important performance fix for Windows-based clusters.
> >> >
> >> >-Jakob
> >> >
> >> >On 22 April 2015 at 03:25, Honghai Chen <honghai.c...@microsoft.com>
> >> >wrote:
> >> >> Fix the issue Sriram mentioned. Code review and jira/KIP updated.
> >> >>
> >> >> Below are detail description for the scenarios:
> >> >> 1.If do clear shutdown, the last log file will be truncated to its
> >> >>real size since the close() function of FileMessageSet will call
> >>trim(),
> >> >> 2.If crash, then when restart, will go through the process of
> >> >>recover() and the last log file will be truncate to its real size,
> >>(and
> >> >>the position will be moved to end of the file)  3.When service
> >> >>start and open existing file  a.Will run the LogSegment constructor
> >> >>which has NO parameter "preallocate",  b.Then in FileMessageSet,
> >> >>the "end" in FileMessageSet will be Int.MaxValue, and then
> >> >>"channel.position(math.min(channel.size().toInt, end))" will make
> >> >>the position be end of the file,  c.If recover needed, the recover
> >> >>function will truncate file to end
> >>of
> >> >>valid data, and also move the position to it,
> >> >>
> >> >> 4.When service running and need create new log segment and new
> >> >>FileMessageSet
> >> >>
> >> >> a.If preallocate = truei.the "end" in FileMessageSet will be 0,
> >> >>the file size will be "initFileSize", and then
> >> >>"channel.position(math.min(channel.size().toInt, end))" will make
> >> >>the position be 0,
> >> >>
> >> >> b.Else if preallocate = falsei.backward compatible, the "end" in
> >> >>FileMessageSet will be Int.MaxValue, the file size will be "0", and
> >> >>then "channel.position(math.min(channel.size().toInt, end))" will
> >> >>make the position be 0,
> >> >>
> >> >>
> >> >>
> >>
> >>https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+
> >>pre
> >>
> >>>>allocate+to+improve+consume+performance+under+windows+and+some+old+L
> >>>>allocate+to+improve+consume+performance+under+windows+and+some+old+i
> >>>>allocate+to+improve+consume+performance+under+windows+and+some+old+n
> >>>>allocate+to+improve+consume+performance+under+windows+and+some+old+u
> >>>>x+
> >> >>file+system
> >> >> https://issues.apache.org/jira/browse/KAFKA-1646
> >> >> https://reviews.apache.org/r/33204/diff/2/
> >> >>
> >> >> Thanks, Honghai Chen
> >> >> http://aka.ms/kafka
> >> >> http://aka.ms/manifold
> >> >>
> >> >> -----Original Message-----
> >> >> From: Honghai Chen
> >> >> Sent: Wednesday, April 22, 2015 11:12 AM
> >> >> To: dev@kafka.apache.org
> >> >> Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve
> >>consume
> >> >>performance under windows and some old Linux file system
> >> >>
> >> >> Hi Sriram,
> >> >> One sentence of code missed, will update code review board and KIP
> >> >>soon.
> >> >> For LogSegment and FileMessageSet, must use different constructor
> >> >>function for existing file and new file, then the code "
> >> >>channel.position(math.min(channel.size().toInt, end)) " will make
> >> >>sure the position at end of existing file.
> >> >>
> >> >> Thanks, Honghai Chen
> >> >>
> >> >> -----Original Message-----
> >> >> From: Jay Kreps [mailto:jay.kr...@gmail.com]
> >> >> Sent: Wednesday, April 22, 2015 5:22 AM
> >> >> To: dev@kafka.apache.org
> >> >> Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve
> >>consume
> >> >>performance under windows and some old Linux file system
> >> >>
> >> >> My understanding of the patch is that clean shutdown truncates the
> >>file
> >> >>back to it's true size (and reallocates it on startup). Hard crash
> >> >>is handled by the normal recovery which should truncate off the
> >> >>empty portion of the file.
> >> >>
> >> >> On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian <
> >> >>srsubraman...@linkedin.com.invalid> wrote:
> >> >>
> >> >>> Could you describe how recovery works in this mode? Say, we had a
> >>250
> >> >>> MB preallocated segment and we wrote till 50MB and crashed. Till
> >>what
> >> >>> point do we recover? Also, on startup, how is the append end
> >> >>> pointer set even on a clean shutdown? How does the FileChannel
> >> >>> end position get set to 50 MB instead of 250 MB? The existing
> >> >>> code might just
> >>work
> >> >>> for it but explaining that would be useful.
> >> >>>
> >> >>> On 4/21/15 9:40 AM, "Neha Narkhede" <n...@confluent.io> wrote:
> >> >>>
> >> >>> >+1. I've tried this on Linux and it helps reduce the spikes in
> >>append
> >> >>> >+(and
> >> >>> >hence producer) latency for high throughput writes. I am not
> >>entirely
> >> >>> >sure why but my suspicion is that in the absence of
> >> >>> >preallocation, you see spikes writes need to happen faster than
> >> >>> >the time it takes Linux to allocate the next block to the file.
> >> >>> >
> >> >>> >It will be great to see some performance test results too.
> >> >>> >
> >> >>> >On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps <jay.kr...@gmail.com>
> >> >>>wrote:
> >> >>> >
> >> >>> >> I'm also +1 on this. The change is quite small and may
> >> >>> >>actually help perf on Linux as well (we've never tried this).
> >> >>> >>
> >> >>> >> I have a lot of concerns on testing the various failure
> >>conditions
> >> >>> >> but I think since it will be off by default the risk is not
> >> >>> >> too
> >> >>>high.
> >> >>> >>
> >> >>> >> -Jay
> >> >>> >>
> >> >>> >> On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen
> >> >>> >><honghai.c...@microsoft.com>
> >> >>> >> wrote:
> >> >>> >>
> >> >>> >> > I wrote a KIP for this after some discussion on KAFKA-1646.
> >> >>> >> > https://issues.apache.org/jira/browse/KAFKA-1646
> >> >>> >> >
> >> >>> >> >
> >> >>> >>
> >> >>> >>
> >> >>>
> >>https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+
> >> >>> pre
> >> >>>
> >>
> >>>>>>>allocate+to+improve+consume+performance+under+windows+and+some+old+L
> >>>>>>>in
> >> >>>>>ux+
> >> >>> >>file+system
> >> >>> >> > The RB is here: https://reviews.apache.org/r/33204/diff/
> >> >>> >> >
> >> >>> >> > Thanks, Honghai
> >> >>> >> >
> >> >>> >> >
> >> >>> >>
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> >--
> >> >>> >Thanks,
> >> >>> >Neha
> >> >>>
> >> >>>
> >>
> >>
>
>

Reply via email to