Hi guys,
        I'm trying add test cases, but below case crashed at line " 
segReopen.recover(64*1024)--> index.trimToValidSize()  ", any idea for it? 
Appreciate your help.
        The case assume kafka suddenly crash, and need recover the last segment.

kafka.log.LogSegmentTest > testCreateWithInitFileSizeCrash FAILED
    java.io.IOException: The requested operation cannot be performed on a file w
ith a user-mapped section open
        at java.io.RandomAccessFile.setLength(Native Method)
        at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292)
        at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
        at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283)
        at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI
ndex.scala:272)
        at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:272)
        at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:272)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
        at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271)
        at kafka.log.LogSegment.recover(LogSegment.scala:199)
        at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentTe
st.scala:306)

  def recover(maxMessageSize: Int): Int = {
    index.truncate()
    index.resize(index.maxIndexSize)
    var validBytes = 0
    var lastIndexEntry = 0
    val iter = log.iterator(maxMessageSize)
    try {
      while(iter.hasNext) {
        val entry = iter.next
        entry.message.ensureValid()
        if(validBytes - lastIndexEntry > indexIntervalBytes) {
          // we need to decompress the message, if required, to get the offset 
of the first uncompressed message
          val startOffset =
            entry.message.compressionCodec match {
              case NoCompressionCodec =>
                entry.offset
              case _ =>
                ByteBufferMessageSet.deepIterator(entry.message).next().offset
          }
          index.append(startOffset, validBytes)
          lastIndexEntry = validBytes
        }
        validBytes += MessageSet.entrySize(entry.message)
      }
    } catch {
      case e: InvalidMessageException => 
        logger.warn("Found invalid messages in log segment %s at byte offset 
%d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
    }
    val truncated = log.sizeInBytes - validBytes
    log.truncateTo(validBytes)
    index.trimToValidSize()
    truncated
  }

/* create a segment with   pre allocate and Crash*/
  @Test
  def testCreateWithInitFileSizeCrash() {
    val tempDir = TestUtils.tempDir()
    val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, false, 
512*1024*1024, true)

    val ms = messages(50, "hello", "there")
    seg.append(50, ms)
    val ms2 = messages(60, "alpha", "beta")
    seg.append(60, ms2)
    val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
    assertEquals(ms2.toList, read.messageSet.toList)
    val oldSize = seg.log.sizeInBytes()
    val oldPosition = seg.log.channel.position
    val oldFileSize = seg.log.file.length
    assertEquals(512*1024*1024, oldFileSize)
    seg.flush()
    seg.log.channel.close()
    seg.index.close()

    val segReopen = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, true)
    segReopen.recover(64*1024)
    val size = segReopen.log.sizeInBytes()
    val position = segReopen.log.channel.position
    val fileSize = segReopen.log.file.length
    assertEquals(oldPosition, position)
    assertEquals(oldSize, size)
    assertEquals(size, fileSize)
  }



Thanks, Honghai Chen
http://aka.ms/kafka 
http://aka.ms/manifold 

-----Original Message-----
From: Sriram Subramanian [mailto:srsubraman...@linkedin.com.INVALID] 
Sent: Friday, April 24, 2015 12:57 AM
To: dev@kafka.apache.org
Cc: Roshan Naik
Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume 
performance under windows and some old Linux file system

+1

Some information on how this will be tested would be useful.

On 4/23/15 9:33 AM, "Jay Kreps" <jay.kr...@gmail.com> wrote:

>Yeah if we understand the optimal policy for a config we always want to 
>set it automatically. In this case I don't think we do yet, but down 
>the road that could be nice. I think for now we should consider this 
>option experimental to give people a chance to try it out.
>
>-Jay
>
>On Wed, Apr 22, 2015 at 7:32 PM, Honghai Chen 
><honghai.c...@microsoft.com>
>wrote:
>
>> Hi Roshan,
>>         Use the 'auto' value maybe will break the rule and mess up 
>> the configuration. @Jay, any thoughts?
>>
>> Thanks, Honghai Chen
>>
>> -----Original Message-----
>> From: Sriharsha Chintalapani [mailto:harsh...@fastmail.fm]
>> Sent: Thursday, April 23, 2015 6:27 AM
>> To: dev@kafka.apache.org; Roshan Naik
>> Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve 
>> consume performance under windows and some old Linux file system
>>
>> +1 (non-binding).
>>
>> --
>> Harsha
>>
>>
>> On April 22, 2015 at 2:52:12 PM, Roshan Naik (ros...@hortonworks.com)
>> wrote:
>>
>> I see that it is safe to keep it this off by default due to some 
>>concerns.
>> Eventually, for settings such as this whose 'preferred' value is 
>>platform  specific (or based on other criteria), it might be worth 
>>considering  having a default value that is not a constant but an 
>>'auto' value ..
>>When
>> kafka boots up it can automatically use the preferred value. Ofcourse 
>>it  would have to documented as to what auto means for a given platform.
>>
>> -roshan
>>
>>
>> On 4/22/15 1:21 PM, "Jakob Homan" <jgho...@gmail.com> wrote:
>>
>> >+1. This is an important performance fix for Windows-based clusters.
>> >
>> >-Jakob
>> >
>> >On 22 April 2015 at 03:25, Honghai Chen <honghai.c...@microsoft.com>
>> >wrote:
>> >> Fix the issue Sriram mentioned. Code review and jira/KIP updated.
>> >>
>> >> Below are detail description for the scenarios:
>> >> 1.If do clear shutdown, the last log file will be truncated to its 
>> >>real size since the close() function of FileMessageSet will call
>>trim(),
>> >> 2.If crash, then when restart, will go through the process of
>> >>recover() and the last log file will be truncate to its real size,
>>(and
>> >>the position will be moved to end of the file)  3.When service 
>> >>start and open existing file  a.Will run the LogSegment constructor 
>> >>which has NO parameter "preallocate",  b.Then in FileMessageSet, 
>> >>the "end" in FileMessageSet will be Int.MaxValue, and then 
>> >>"channel.position(math.min(channel.size().toInt, end))" will make 
>> >>the position be end of the file,  c.If recover needed, the recover 
>> >>function will truncate file to end
>>of
>> >>valid data, and also move the position to it,
>> >>
>> >> 4.When service running and need create new log segment and new 
>> >>FileMessageSet
>> >>
>> >> a.If preallocate = truei.the "end" in FileMessageSet will be 0, 
>> >>the file size will be "initFileSize", and then 
>> >>"channel.position(math.min(channel.size().toInt, end))" will make 
>> >>the position be 0,
>> >>
>> >> b.Else if preallocate = falsei.backward compatible, the "end" in 
>> >>FileMessageSet will be Int.MaxValue, the file size will be "0", and 
>> >>then "channel.position(math.min(channel.size().toInt, end))" will 
>> >>make the position be 0,
>> >>
>> >>
>> >>
>> 
>>https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+
>>pre
>> 
>>>>allocate+to+improve+consume+performance+under+windows+and+some+old+L
>>>>allocate+to+improve+consume+performance+under+windows+and+some+old+i
>>>>allocate+to+improve+consume+performance+under+windows+and+some+old+n
>>>>allocate+to+improve+consume+performance+under+windows+and+some+old+u
>>>>x+
>> >>file+system
>> >> https://issues.apache.org/jira/browse/KAFKA-1646
>> >> https://reviews.apache.org/r/33204/diff/2/
>> >>
>> >> Thanks, Honghai Chen
>> >> http://aka.ms/kafka
>> >> http://aka.ms/manifold
>> >>
>> >> -----Original Message-----
>> >> From: Honghai Chen
>> >> Sent: Wednesday, April 22, 2015 11:12 AM
>> >> To: dev@kafka.apache.org
>> >> Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve
>>consume
>> >>performance under windows and some old Linux file system
>> >>
>> >> Hi Sriram,
>> >> One sentence of code missed, will update code review board and KIP 
>> >>soon.
>> >> For LogSegment and FileMessageSet, must use different constructor 
>> >>function for existing file and new file, then the code "
>> >>channel.position(math.min(channel.size().toInt, end)) " will make 
>> >>sure the position at end of existing file.
>> >>
>> >> Thanks, Honghai Chen
>> >>
>> >> -----Original Message-----
>> >> From: Jay Kreps [mailto:jay.kr...@gmail.com]
>> >> Sent: Wednesday, April 22, 2015 5:22 AM
>> >> To: dev@kafka.apache.org
>> >> Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve
>>consume
>> >>performance under windows and some old Linux file system
>> >>
>> >> My understanding of the patch is that clean shutdown truncates the
>>file
>> >>back to it's true size (and reallocates it on startup). Hard crash 
>> >>is handled by the normal recovery which should truncate off the 
>> >>empty portion of the file.
>> >>
>> >> On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian < 
>> >>srsubraman...@linkedin.com.invalid> wrote:
>> >>
>> >>> Could you describe how recovery works in this mode? Say, we had a
>>250
>> >>> MB preallocated segment and we wrote till 50MB and crashed. Till
>>what
>> >>> point do we recover? Also, on startup, how is the append end 
>> >>> pointer set even on a clean shutdown? How does the FileChannel 
>> >>> end position get set to 50 MB instead of 250 MB? The existing 
>> >>> code might just
>>work
>> >>> for it but explaining that would be useful.
>> >>>
>> >>> On 4/21/15 9:40 AM, "Neha Narkhede" <n...@confluent.io> wrote:
>> >>>
>> >>> >+1. I've tried this on Linux and it helps reduce the spikes in
>>append
>> >>> >+(and
>> >>> >hence producer) latency for high throughput writes. I am not
>>entirely
>> >>> >sure why but my suspicion is that in the absence of 
>> >>> >preallocation, you see spikes writes need to happen faster than 
>> >>> >the time it takes Linux to allocate the next block to the file.
>> >>> >
>> >>> >It will be great to see some performance test results too.
>> >>> >
>> >>> >On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps <jay.kr...@gmail.com>
>> >>>wrote:
>> >>> >
>> >>> >> I'm also +1 on this. The change is quite small and may 
>> >>> >>actually help perf on Linux as well (we've never tried this).
>> >>> >>
>> >>> >> I have a lot of concerns on testing the various failure
>>conditions
>> >>> >> but I think since it will be off by default the risk is not 
>> >>> >> too
>> >>>high.
>> >>> >>
>> >>> >> -Jay
>> >>> >>
>> >>> >> On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen 
>> >>> >><honghai.c...@microsoft.com>
>> >>> >> wrote:
>> >>> >>
>> >>> >> > I wrote a KIP for this after some discussion on KAFKA-1646.
>> >>> >> > https://issues.apache.org/jira/browse/KAFKA-1646
>> >>> >> >
>> >>> >> >
>> >>> >>
>> >>> >>
>> >>> 
>>https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+
>> >>> pre
>> >>>
>> 
>>>>>>>allocate+to+improve+consume+performance+under+windows+and+some+old+L
>>>>>>>in
>> >>>>>ux+
>> >>> >>file+system
>> >>> >> > The RB is here: https://reviews.apache.org/r/33204/diff/
>> >>> >> >
>> >>> >> > Thanks, Honghai
>> >>> >> >
>> >>> >> >
>> >>> >>
>> >>> >
>> >>> >
>> >>> >
>> >>> >--
>> >>> >Thanks,
>> >>> >Neha
>> >>>
>> >>>
>>
>>

Reply via email to