Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-20 Thread Jeyhun Karimov
Hi,

Thanks for your comments. I rethink about including rich functions into
this KIP.
I think once we include rich functions in this KIP and then fix
ProcessorContext in another KIP and incorporate with existing rich
functions, the code will not be backwards compatible.

I see Damian's and your point more clearly now.

Conclusion: we include only withKey interfaces in this KIP (I updated the
KIP), I will try also initiate another KIP for rich functions.

Cheers,
Jeyhun

On Fri, May 19, 2017 at 10:50 PM Matthias J. Sax 
wrote:

> With the current KIP, using ValueMapper and ValueMapperWithKey
> interfaces, RichFunction seems to be an independent add-on. To fix the
> original issue to allow key access, RichFunctions are not required IMHO.
>
> I initially put the RichFunction idea on the table, because I was hoping
> to get a uniform API. And I think, is was good to consider them --
> however, the discussion showed that they are not necessary for key
> access. Thus, it seems to be better to handle RichFunctions in an own
> KIP. The ProcessorContext/RecordContext issues seems to be a main
> challenge for this. And introducing RichFunctions with parameter-less
> init() method, seem not to help too much. We would get an "intermediate"
> API that we want to change anyway later on...
>
> As you put already much effort into RichFunction, feel free do push this
> further and start a new KIP (we can do this even in parallel) -- we
> don't want to slow you down :) But it make the discussion and code
> review easier, if we separate both IMHO.
>
>
> -Matthias
>
>
> On 5/19/17 2:25 AM, Jeyhun Karimov wrote:
> > Hi Damian,
> >
> > Thanks for your comments. I think providing to users *interface* rather
> > than *abstract class* should be preferred (Matthias also raised this
> issue
> > ), anyway I changed the corresponding parts of KIP.
> >
> > Regarding with passing additional contextual information, I think it is a
> > tradeoff,
> > 1) first, we fix the context parameter for *init() *method in another PR
> > and solve Rich functions afterwards
> > 2) first, we fix the requested issues on jira ([1-3]) with providing
> > (not-complete) Rich functions and integrate the context parameters to
> this
> > afterwards (like improvement)
> >
> > To me, the second approach seems more incremental. However you are right,
> > the names might confuse the users.
> >
> >
> >
> > [1] https://issues.apache.org/jira/browse/KAFKA-4218
> > [2] https://issues.apache.org/jira/browse/KAFKA-4726
> > [3] https://issues.apache.org/jira/browse/KAFKA-3745
> >
> >
> > Cheers,
> > Jeyhun
> >
> >
> > On Fri, May 19, 2017 at 10:42 AM Damian Guy 
> wrote:
> >
> >> Hi,
> >>
> >> I see you've removed the `ProcessorContext` from the RichFunction which
> is
> >> good, but why is it a `RichFunction`? I'd have expected it to pass some
> >> additional contextual information, i.e., the `RecordContext` that
> contains
> >> just the topic, partition, timestamp, offset.  I'm ok with it not
> passing
> >> this contextual information, but is the naming incorrect? I'm not sure,
> >> tbh. I'm wondering if we should drop `RichFunctions` until we can do it
> >> properly with the correct context?
> >>
> >> Also, i don't like the abstract classes: RichValueMapper,
> RichValueJoiner,
> >> RichInitializer etc. Why can't they not just be interfaces? Generally we
> >> should provide users with Intefaces to code against, not classes.
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Fri, 19 May 2017 at 00:50 Jeyhun Karimov 
> wrote:
> >>
> >>> Hi,
> >>>
> >>> Thanks. I initiated the PR as well, to get a better overview.
> >>>
> >>> The only reason that I used abstract class instead of interface for
> Rich
> >>> functions is that in future if we will have some AbstractRichFunction
> >>> abstract classes,
> >>> we can easily extend:
> >>>
> >>> public abstract class RichValueMapper  implements
> >>> ValueMapperWithKey, RichFunction *extends
> >> AbstractRichFunction*{
> >>> }
> >>>  With interfaces we are only limited to interfaces for inheritance.
> >>>
> >>> However, I think we can easily change it (from abstract class ->
> >> interface)
> >>> if you think interface is a better fit.
> >>>
> >>>
> >>> Cheers,
> >>> Jeyhun
> >>>
> >>>
> >>> On Fri, May 19, 2017 at 1:00 AM Matthias J. Sax  >
> >>> wrote:
> >>>
>  Thanks for the update and explanations. The KIP is quite improved now
> >> --
>  great job!
> 
>  One more question: Why are RichValueMapper etc abstract classes and
> not
>  interfaces?
> 
> 
>  Overall, I like the KIP a lot!
> 
> 
>  -Matthias
> 
> 
>  On 5/16/17 7:03 AM, Jeyhun Karimov wrote:
> > Hi,
> >
> > Thanks for your comments.
> >
> > I think supporting Lambdas for `withKey` and `AbstractRichFunction`
> >> don't go together, as Lambdas are only supported for interfaces
> >> AFAIK.
> >
> >
> > Maybe I misunderstood your comment.
> > *withKey* and and *withOnlyValue* are interf

[GitHub] kafka pull request #3109: KAFKA-4144: Allow per stream/table timestamp extra...

2017-05-20 Thread jeyhunkarimov
GitHub user jeyhunkarimov opened a pull request:

https://github.com/apache/kafka/pull/3109

KAFKA-4144: Allow per stream/table timestamp extractor

A follow up RP to fix 
[issue](https://github.com/confluentinc/examples/commit/2cd0b87bc8a7eab0e7199fa0079db6417f0e6b63#commitcomment-22200864)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jeyhunkarimov/kafka KAFKA-4144-follow-up

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3109.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3109


commit a35b23a19358d7c2df421e290e5918a45c9cc617
Author: Jeyhun Karimov 
Date:   2017-05-20T09:26:49Z

Follow up overload needed for globalTable()




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-05-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16018382#comment-16018382
 ] 

ASF GitHub Bot commented on KAFKA-4144:
---

GitHub user jeyhunkarimov opened a pull request:

https://github.com/apache/kafka/pull/3109

KAFKA-4144: Allow per stream/table timestamp extractor

A follow up RP to fix 
[issue](https://github.com/confluentinc/examples/commit/2cd0b87bc8a7eab0e7199fa0079db6417f0e6b63#commitcomment-22200864)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jeyhunkarimov/kafka KAFKA-4144-follow-up

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3109.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3109


commit a35b23a19358d7c2df421e290e5918a45c9cc617
Author: Jeyhun Karimov 
Date:   2017-05-20T09:26:49Z

Follow up overload needed for globalTable()




> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api, kip
> Fix For: 0.11.0.0
>
>
> At the moment the timestamp extractor is configured via a {{StreamConfig}} 
> value to {{KafkaStreams}}. That means you can only have a single timestamp 
> extractor per app, even though you may be joining multiple streams/tables 
> that require different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> {{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
> serdes that override the StreamConfig defaults.
> KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788
> Specifying a per-stream extractor should only be possible for sources, but 
> not for intermediate topics. For PAPI we cannot enforce this, but for DSL 
> {{through()}} should not allow to set a custom extractor by the user. In 
> contrast, with regard to KAFKA-4785, is must internally set an extractor that 
> returns the record's metadata timestamp in order to overwrite the global 
> extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}). This change should be done in 
> KAFKA-4785 though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-123: Allow per stream/table timestamp extractor

2017-05-20 Thread Jeyhun Karimov
Dear community,

I want to inform you that because of the case [1] we needed to add an extra
overloaded method to KIP:
*KStreamBuilder.globalTable(final Serde keySerde, final Serde
valSerde, final String topic, final String storeName) *

[1]
https://github.com/confluentinc/examples/commit/2cd0b87bc8a7eab0e7199fa0079db6417f0e6b63#commitcomment-22200864


Cheers,
Jeyhun


On Sat, May 6, 2017 at 1:57 AM Guozhang Wang  wrote:

> THank you, Jeyhun!
>
> Will made a final pass on the PR soon.
>
> Guozhang
>
> On Tue, Apr 25, 2017 at 11:13 PM, Michael Noll 
> wrote:
>
> > Thanks for your work and for driving this, Jeyhun! :-)
> >
> > -Michael
> >
> >
> > On Tue, Apr 25, 2017 at 11:11 PM, Jeyhun Karimov 
> > wrote:
> >
> > > Dear all,
> > >
> > > I am closing this vote now. The KIP got accepted with
> > >
> > > +3 binding (Guozhang, Ewen, Gwen)
> > >
> > > Thanks all (especially for Mathias) for guiding me throughout my first
> > KIP.
> > >
> > >
> > > Cheers,
> > > Jeyhun
> > >
> > > On Mon, Apr 24, 2017 at 9:32 PM Thomas Becker 
> wrote:
> > >
> > > > +1 (non-binding)
> > > >
> > > > On Tue, 2017-02-28 at 08:59 +, Jeyhun Karimov wrote:
> > > > > Dear community,
> > > > >
> > > > > I'd like to start the vote for KIP-123:
> > > > > https://cwiki.apache.org/confluence/pages/viewpage.
> > action?pageId=6871
> > > > > 4788
> > > > >
> > > > >
> > > > > Cheers,
> > > > > Jeyhun
> > > > --
> > > >
> > > >
> > > > Tommy Becker
> > > >
> > > > Senior Software Engineer
> > > >
> > > > O +1 919.460.4747 <(919)%20460-4747> <(919)%20460-4747>
> > > >
> > > > tivo.com
> > > >
> > > >
> > > > 
> > > >
> > > > This email and any attachments may contain confidential and
> privileged
> > > > material for the sole use of the intended recipient. Any review,
> > copying,
> > > > or distribution of this email (or any attachments) by others is
> > > prohibited.
> > > > If you are not the intended recipient, please contact the sender
> > > > immediately and permanently delete this email and any attachments. No
> > > > employee or agent of TiVo Inc. is authorized to conclude any binding
> > > > agreement on behalf of TiVo Inc. by email. Binding agreements with
> TiVo
> > > > Inc. may only be made by a signed written agreement.
> > > >
> > > --
> > > -Cheers
> > >
> > > Jeyhun
> > >
> >
>
>
>
> --
> -- Guozhang
>
-- 
-Cheers

Jeyhun


[jira] [Assigned] (KAFKA-4660) Improve test coverage KafkaStreams

2017-05-20 Thread Umesh Chaudhary (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Umesh Chaudhary reassigned KAFKA-4660:
--

Assignee: Umesh Chaudhary

> Improve test coverage KafkaStreams
> --
>
> Key: KAFKA-4660
> URL: https://issues.apache.org/jira/browse/KAFKA-4660
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>Assignee: Umesh Chaudhary
>Priority: Minor
> Fix For: 0.11.0.0
>
>
> {{toString}} is used to print the topology, so probably should have a unit 
> test.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-05-20 Thread Jeyhun Karimov
Dear community,

As we discussed in KIP-149 [DISCUSS] thread [1], I would like to initiate
KIP for rich functions (interfaces) [2].
I would like to get your comments.


[1]
http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj=Re+DISCUSS+KIP+149+Enabling+key+access+in+ValueTransformer+ValueMapper+and+ValueJoiner
[2]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams


Cheers,
Jeyhun
-- 
-Cheers

Jeyhun


[jira] [Created] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments

2017-05-20 Thread Kyle Ambroff (JIRA)
Kyle Ambroff created KAFKA-5297:
---

 Summary: Broker can take a long time to shut down if there are 
many active log segments
 Key: KAFKA-5297
 URL: https://issues.apache.org/jira/browse/KAFKA-5297
 Project: Kafka
  Issue Type: Improvement
Reporter: Kyle Ambroff
Priority: Minor
 Attachments: shutdown-flame-graph.png

After the changes for KIP-33 were merged, we started noticing that our cluster 
restart times were quite a bit longer. In some cases it was taking four times 
as long as expected to do a rolling restart of every broker in the cluster. 
This meant that doing a deploy to one of our Kafka clusters went from taking 
about 3 hours to more than 12 hours!

We looked into this and we have some data from a couple of runs with a sampling 
profiler. It turns out that it isn't unusual for us to have a broker sit in 
kafka.log.Log#close for up to 30 minutes if it has been running for several 
weeks. There are just so many active log segments that it just takes a long 
time to truncate all of the indexes.

I've attached a flame graph that was generated from 10 minutes of stack samples 
collected during shutdown of a broker that took about 30 minutes total to shut 
down cleanly.

* About 60% of the time was spent in kafka.log.AbstractIndex#resize, where 
every index and timeindex file is truncated to the size of the number of 
entries in that index.
* Another big chunk of time is spent reading the last entry from the index, 
which is used to make any final updates to the timeindex file. This is 
something that can be cached. For a broker that's been running for a long time 
the bulk of these indexes are not likely to be in the page cache anymore. We 
cache the largestTimestamp and offsetOfLargestTimestamp in LogSegment, so we 
could add a cache for this as well.

Looking at these changes and considering KIP-33, it isn't surprising that the 
broker shutdown time has increased so dramatically. The extra index plus the 
extra reads have increased the amount of work performed by kafka.log.Log#close 
by about 4x (in terms of system calls and potential page faults). Breaking down 
what this function does:

# Read the max timestamp from the timeindex. Could lead to a disk seek.
# Read the max offset from the index. Could lead to a disk seek.
# Append the timestamp and offset of the most recently written message to the 
timeindex if it hasn't been written there for some reason.
# Truncate the index file
## Get the position in the index of the last entry written
## If on Windows then unmap and close the index
## reopen
## truncate to the number of entries * entry size. (ftruncate() system call)
## mmap()
## Set the position back to where it was before the original. Leads to lseek() 
system call.
## Close the newly reopenned and mapped index
# Same thing as #4 but for the timeindex.
## Get the position in the timeindex of the last entry written
## If on Windows then unmap and close the timeindex
## reopen
## truncate to the number of entries * entry size. (ftruncate() system call)
## mmap()
## Set the position back to where it was before the original. Leads to lseek() 
system call.
## Close the newly reopenned and mapped timeindex
# Finalize the log segment
## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for 
that log segment.
## Truncate the log segment if it doesn't have enough messages written to fill 
up the whole thing. Potentially leads to a ftruncate() system call.
## Set the position to the end of the segment after truncation. Leads to a 
lseek() system call.
## Close and unmap the channel.

Looking in to the current implementation of kafka.log.AbstractIndex#resize, it 
appears to do quite a bit of extra work to avoid keeping an instance of 
RandomAccessFile around. It has to reopen the file, truncate, mmap(), 
potentially perform an additional disk seek, all before imediately closing the 
file.

You wouldn't think this would amount to much, but I put together a benchmark 
using jmh to measure the difference between the current code and a new 
implementation that didn't have to recreate the page mapping during resize(), 
and the difference is pretty dramatic.

{noformat}
Result "currentImplementation":
  2063.386 ±(99.9%) 81.758 ops/s [Average]
  (min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863
  CI (99.9%): [1981.628, 2145.144] (assumes normal distribution)

Result "optimizedImplementation":
  3497.354 ±(99.9%) 31.575 ops/s [Average]
  (min, avg, max) = (3261.232, 3497.354, 3605.527), stdev = 70.623
  CI (99.9%): [3465.778, 3528.929] (assumes normal distribution)

# Run complete. Total time: 00:03:37

Benchmark Mode  Cnt ScoreError  
Units
LogSegmentBenchmark.currentImplementationthrpt   60  2063.386 ± 81.758  
ops/s
LogSegmentBenchmark.optimizedImplementation  thrpt   60  3497.354 ± 31.575  
o

[jira] [Commented] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments

2017-05-20 Thread Kyle Ambroff (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16018527#comment-16018527
 ] 

Kyle Ambroff commented on KAFKA-5297:
-

Patch incoming.

> Broker can take a long time to shut down if there are many active log segments
> --
>
> Key: KAFKA-5297
> URL: https://issues.apache.org/jira/browse/KAFKA-5297
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kyle Ambroff
>Priority: Minor
> Attachments: shutdown-flame-graph.png
>
>
> After the changes for KIP-33 were merged, we started noticing that our 
> cluster restart times were quite a bit longer. In some cases it was taking 
> four times as long as expected to do a rolling restart of every broker in the 
> cluster. This meant that doing a deploy to one of our Kafka clusters went 
> from taking about 3 hours to more than 12 hours!
> We looked into this and we have some data from a couple of runs with a 
> sampling profiler. It turns out that it isn't unusual for us to have a broker 
> sit in kafka.log.Log#close for up to 30 minutes if it has been running for 
> several weeks. There are just so many active log segments that it just takes 
> a long time to truncate all of the indexes.
> I've attached a flame graph that was generated from 10 minutes of stack 
> samples collected during shutdown of a broker that took about 30 minutes 
> total to shut down cleanly.
> * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where 
> every index and timeindex file is truncated to the size of the number of 
> entries in that index.
> * Another big chunk of time is spent reading the last entry from the index, 
> which is used to make any final updates to the timeindex file. This is 
> something that can be cached. For a broker that's been running for a long 
> time the bulk of these indexes are not likely to be in the page cache 
> anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in 
> LogSegment, so we could add a cache for this as well.
> Looking at these changes and considering KIP-33, it isn't surprising that the 
> broker shutdown time has increased so dramatically. The extra index plus the 
> extra reads have increased the amount of work performed by 
> kafka.log.Log#close by about 4x (in terms of system calls and potential page 
> faults). Breaking down what this function does:
> # Read the max timestamp from the timeindex. Could lead to a disk seek.
> # Read the max offset from the index. Could lead to a disk seek.
> # Append the timestamp and offset of the most recently written message to the 
> timeindex if it hasn't been written there for some reason.
> # Truncate the index file
> ## Get the position in the index of the last entry written
> ## If on Windows then unmap and close the index
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped index
> # Same thing as #4 but for the timeindex.
> ## Get the position in the timeindex of the last entry written
> ## If on Windows then unmap and close the timeindex
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped timeindex
> # Finalize the log segment
> ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for 
> that log segment.
> ## Truncate the log segment if it doesn't have enough messages written to 
> fill up the whole thing. Potentially leads to a ftruncate() system call.
> ## Set the position to the end of the segment after truncation. Leads to a 
> lseek() system call.
> ## Close and unmap the channel.
> Looking in to the current implementation of kafka.log.AbstractIndex#resize, 
> it appears to do quite a bit of extra work to avoid keeping an instance of 
> RandomAccessFile around. It has to reopen the file, truncate, mmap(), 
> potentially perform an additional disk seek, all before imediately closing 
> the file.
> You wouldn't think this would amount to much, but I put together a benchmark 
> using jmh to measure the difference between the current code and a new 
> implementation that didn't have to recreate the page mapping during resize(), 
> and the difference is pretty dramatic.
> {noformat}
> Result "currentImplementation":
>   2063.386 ±(99.9%) 81.758 ops/s [Average]
>   (min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863
>   CI (99.9%): [1981.628, 2145.144] (assumes normal distribution)
> Result "optimizedImplementation":
>   3497.354 ±(99.9%) 31.575 ops/s [Average]
>   (mi

[jira] [Updated] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments

2017-05-20 Thread Kyle Ambroff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Ambroff updated KAFKA-5297:

Attachment: LogSegmentBenchmark.java

Attaching benchmark code in case anyone wants to validate the way I measured 
this. Requires adding a test dependency.

{noformat}
diff --git a/build.gradle b/build.gradle
index 8d6e703..0c55650 100644
--- a/build.gradle
+++ b/build.gradle
@@ -520,6 +520,9 @@ project(':core') {
 testCompile libs.junit
 testCompile libs.scalaTest
 testCompile libs.jfreechart
+testCompile 'org.openjdk.jmh:jmh-core:1.13'
+testCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.13'
+
 
 scoverage libs.scoveragePlugin
 scoverage libs.scoverageRuntime
{noformat}

> Broker can take a long time to shut down if there are many active log segments
> --
>
> Key: KAFKA-5297
> URL: https://issues.apache.org/jira/browse/KAFKA-5297
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kyle Ambroff
>Priority: Minor
> Attachments: LogSegmentBenchmark.java, shutdown-flame-graph.png
>
>
> After the changes for KIP-33 were merged, we started noticing that our 
> cluster restart times were quite a bit longer. In some cases it was taking 
> four times as long as expected to do a rolling restart of every broker in the 
> cluster. This meant that doing a deploy to one of our Kafka clusters went 
> from taking about 3 hours to more than 12 hours!
> We looked into this and we have some data from a couple of runs with a 
> sampling profiler. It turns out that it isn't unusual for us to have a broker 
> sit in kafka.log.Log#close for up to 30 minutes if it has been running for 
> several weeks. There are just so many active log segments that it just takes 
> a long time to truncate all of the indexes.
> I've attached a flame graph that was generated from 10 minutes of stack 
> samples collected during shutdown of a broker that took about 30 minutes 
> total to shut down cleanly.
> * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where 
> every index and timeindex file is truncated to the size of the number of 
> entries in that index.
> * Another big chunk of time is spent reading the last entry from the index, 
> which is used to make any final updates to the timeindex file. This is 
> something that can be cached. For a broker that's been running for a long 
> time the bulk of these indexes are not likely to be in the page cache 
> anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in 
> LogSegment, so we could add a cache for this as well.
> Looking at these changes and considering KIP-33, it isn't surprising that the 
> broker shutdown time has increased so dramatically. The extra index plus the 
> extra reads have increased the amount of work performed by 
> kafka.log.Log#close by about 4x (in terms of system calls and potential page 
> faults). Breaking down what this function does:
> # Read the max timestamp from the timeindex. Could lead to a disk seek.
> # Read the max offset from the index. Could lead to a disk seek.
> # Append the timestamp and offset of the most recently written message to the 
> timeindex if it hasn't been written there for some reason.
> # Truncate the index file
> ## Get the position in the index of the last entry written
> ## If on Windows then unmap and close the index
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped index
> # Same thing as #4 but for the timeindex.
> ## Get the position in the timeindex of the last entry written
> ## If on Windows then unmap and close the timeindex
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped timeindex
> # Finalize the log segment
> ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for 
> that log segment.
> ## Truncate the log segment if it doesn't have enough messages written to 
> fill up the whole thing. Potentially leads to a ftruncate() system call.
> ## Set the position to the end of the segment after truncation. Leads to a 
> lseek() system call.
> ## Close and unmap the channel.
> Looking in to the current implementation of kafka.log.AbstractIndex#resize, 
> it appears to do quite a bit of extra work to avoid keeping an instance of 
> RandomAccessFile around. It has to reopen the file, truncate, mmap(), 
> potentially perform an additional disk seek, all before imediately closing 
> the file.
> You wouldn't think this

[jira] [Comment Edited] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments

2017-05-20 Thread Kyle Ambroff (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16018528#comment-16018528
 ] 

Kyle Ambroff edited comment on KAFKA-5297 at 5/20/17 4:13 PM:
--

Attaching benchmark code in case anyone wants to validate the way I measured 
this. Requires adding a test dependency. I also copied the existing 
implementation so that I could compare both in one run.

{noformat}
diff --git a/build.gradle b/build.gradle
index 8d6e703..0c55650 100644
--- a/build.gradle
+++ b/build.gradle
@@ -520,6 +520,9 @@ project(':core') {
 testCompile libs.junit
 testCompile libs.scalaTest
 testCompile libs.jfreechart
+testCompile 'org.openjdk.jmh:jmh-core:1.13'
+testCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.13'
+
 
 scoverage libs.scoveragePlugin
 scoverage libs.scoverageRuntime
{noformat}


was (Author: ambroff):
Attaching benchmark code in case anyone wants to validate the way I measured 
this. Requires adding a test dependency.

{noformat}
diff --git a/build.gradle b/build.gradle
index 8d6e703..0c55650 100644
--- a/build.gradle
+++ b/build.gradle
@@ -520,6 +520,9 @@ project(':core') {
 testCompile libs.junit
 testCompile libs.scalaTest
 testCompile libs.jfreechart
+testCompile 'org.openjdk.jmh:jmh-core:1.13'
+testCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.13'
+
 
 scoverage libs.scoveragePlugin
 scoverage libs.scoverageRuntime
{noformat}

> Broker can take a long time to shut down if there are many active log segments
> --
>
> Key: KAFKA-5297
> URL: https://issues.apache.org/jira/browse/KAFKA-5297
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kyle Ambroff
>Priority: Minor
> Attachments: LogSegmentBenchmark.java, shutdown-flame-graph.png
>
>
> After the changes for KIP-33 were merged, we started noticing that our 
> cluster restart times were quite a bit longer. In some cases it was taking 
> four times as long as expected to do a rolling restart of every broker in the 
> cluster. This meant that doing a deploy to one of our Kafka clusters went 
> from taking about 3 hours to more than 12 hours!
> We looked into this and we have some data from a couple of runs with a 
> sampling profiler. It turns out that it isn't unusual for us to have a broker 
> sit in kafka.log.Log#close for up to 30 minutes if it has been running for 
> several weeks. There are just so many active log segments that it just takes 
> a long time to truncate all of the indexes.
> I've attached a flame graph that was generated from 10 minutes of stack 
> samples collected during shutdown of a broker that took about 30 minutes 
> total to shut down cleanly.
> * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where 
> every index and timeindex file is truncated to the size of the number of 
> entries in that index.
> * Another big chunk of time is spent reading the last entry from the index, 
> which is used to make any final updates to the timeindex file. This is 
> something that can be cached. For a broker that's been running for a long 
> time the bulk of these indexes are not likely to be in the page cache 
> anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in 
> LogSegment, so we could add a cache for this as well.
> Looking at these changes and considering KIP-33, it isn't surprising that the 
> broker shutdown time has increased so dramatically. The extra index plus the 
> extra reads have increased the amount of work performed by 
> kafka.log.Log#close by about 4x (in terms of system calls and potential page 
> faults). Breaking down what this function does:
> # Read the max timestamp from the timeindex. Could lead to a disk seek.
> # Read the max offset from the index. Could lead to a disk seek.
> # Append the timestamp and offset of the most recently written message to the 
> timeindex if it hasn't been written there for some reason.
> # Truncate the index file
> ## Get the position in the index of the last entry written
> ## If on Windows then unmap and close the index
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped index
> # Same thing as #4 but for the timeindex.
> ## Get the position in the timeindex of the last entry written
> ## If on Windows then unmap and close the timeindex
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped timeindex
> # Finalize

[jira] [Updated] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments

2017-05-20 Thread Kyle Ambroff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Ambroff updated KAFKA-5297:

Attachment: (was: LogSegmentBenchmark.java)

> Broker can take a long time to shut down if there are many active log segments
> --
>
> Key: KAFKA-5297
> URL: https://issues.apache.org/jira/browse/KAFKA-5297
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kyle Ambroff
>Priority: Minor
> Attachments: LogSegmentBenchmark.java, shutdown-flame-graph.png
>
>
> After the changes for KIP-33 were merged, we started noticing that our 
> cluster restart times were quite a bit longer. In some cases it was taking 
> four times as long as expected to do a rolling restart of every broker in the 
> cluster. This meant that doing a deploy to one of our Kafka clusters went 
> from taking about 3 hours to more than 12 hours!
> We looked into this and we have some data from a couple of runs with a 
> sampling profiler. It turns out that it isn't unusual for us to have a broker 
> sit in kafka.log.Log#close for up to 30 minutes if it has been running for 
> several weeks. There are just so many active log segments that it just takes 
> a long time to truncate all of the indexes.
> I've attached a flame graph that was generated from 10 minutes of stack 
> samples collected during shutdown of a broker that took about 30 minutes 
> total to shut down cleanly.
> * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where 
> every index and timeindex file is truncated to the size of the number of 
> entries in that index.
> * Another big chunk of time is spent reading the last entry from the index, 
> which is used to make any final updates to the timeindex file. This is 
> something that can be cached. For a broker that's been running for a long 
> time the bulk of these indexes are not likely to be in the page cache 
> anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in 
> LogSegment, so we could add a cache for this as well.
> Looking at these changes and considering KIP-33, it isn't surprising that the 
> broker shutdown time has increased so dramatically. The extra index plus the 
> extra reads have increased the amount of work performed by 
> kafka.log.Log#close by about 4x (in terms of system calls and potential page 
> faults). Breaking down what this function does:
> # Read the max timestamp from the timeindex. Could lead to a disk seek.
> # Read the max offset from the index. Could lead to a disk seek.
> # Append the timestamp and offset of the most recently written message to the 
> timeindex if it hasn't been written there for some reason.
> # Truncate the index file
> ## Get the position in the index of the last entry written
> ## If on Windows then unmap and close the index
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped index
> # Same thing as #4 but for the timeindex.
> ## Get the position in the timeindex of the last entry written
> ## If on Windows then unmap and close the timeindex
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped timeindex
> # Finalize the log segment
> ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for 
> that log segment.
> ## Truncate the log segment if it doesn't have enough messages written to 
> fill up the whole thing. Potentially leads to a ftruncate() system call.
> ## Set the position to the end of the segment after truncation. Leads to a 
> lseek() system call.
> ## Close and unmap the channel.
> Looking in to the current implementation of kafka.log.AbstractIndex#resize, 
> it appears to do quite a bit of extra work to avoid keeping an instance of 
> RandomAccessFile around. It has to reopen the file, truncate, mmap(), 
> potentially perform an additional disk seek, all before imediately closing 
> the file.
> You wouldn't think this would amount to much, but I put together a benchmark 
> using jmh to measure the difference between the current code and a new 
> implementation that didn't have to recreate the page mapping during resize(), 
> and the difference is pretty dramatic.
> {noformat}
> Result "currentImplementation":
>   2063.386 ±(99.9%) 81.758 ops/s [Average]
>   (min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863
>   CI (99.9%): [1981.628, 2145.144] (assumes normal distribution)
> Result "optimizedImplementation":
>   3497.354 ±(99.9%) 31.575 ops/s [Average]
>  

[jira] [Updated] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments

2017-05-20 Thread Kyle Ambroff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Ambroff updated KAFKA-5297:

Attachment: LogSegmentBenchmark.java

> Broker can take a long time to shut down if there are many active log segments
> --
>
> Key: KAFKA-5297
> URL: https://issues.apache.org/jira/browse/KAFKA-5297
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kyle Ambroff
>Priority: Minor
> Attachments: LogSegmentBenchmark.java, shutdown-flame-graph.png
>
>
> After the changes for KIP-33 were merged, we started noticing that our 
> cluster restart times were quite a bit longer. In some cases it was taking 
> four times as long as expected to do a rolling restart of every broker in the 
> cluster. This meant that doing a deploy to one of our Kafka clusters went 
> from taking about 3 hours to more than 12 hours!
> We looked into this and we have some data from a couple of runs with a 
> sampling profiler. It turns out that it isn't unusual for us to have a broker 
> sit in kafka.log.Log#close for up to 30 minutes if it has been running for 
> several weeks. There are just so many active log segments that it just takes 
> a long time to truncate all of the indexes.
> I've attached a flame graph that was generated from 10 minutes of stack 
> samples collected during shutdown of a broker that took about 30 minutes 
> total to shut down cleanly.
> * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where 
> every index and timeindex file is truncated to the size of the number of 
> entries in that index.
> * Another big chunk of time is spent reading the last entry from the index, 
> which is used to make any final updates to the timeindex file. This is 
> something that can be cached. For a broker that's been running for a long 
> time the bulk of these indexes are not likely to be in the page cache 
> anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in 
> LogSegment, so we could add a cache for this as well.
> Looking at these changes and considering KIP-33, it isn't surprising that the 
> broker shutdown time has increased so dramatically. The extra index plus the 
> extra reads have increased the amount of work performed by 
> kafka.log.Log#close by about 4x (in terms of system calls and potential page 
> faults). Breaking down what this function does:
> # Read the max timestamp from the timeindex. Could lead to a disk seek.
> # Read the max offset from the index. Could lead to a disk seek.
> # Append the timestamp and offset of the most recently written message to the 
> timeindex if it hasn't been written there for some reason.
> # Truncate the index file
> ## Get the position in the index of the last entry written
> ## If on Windows then unmap and close the index
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped index
> # Same thing as #4 but for the timeindex.
> ## Get the position in the timeindex of the last entry written
> ## If on Windows then unmap and close the timeindex
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped timeindex
> # Finalize the log segment
> ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for 
> that log segment.
> ## Truncate the log segment if it doesn't have enough messages written to 
> fill up the whole thing. Potentially leads to a ftruncate() system call.
> ## Set the position to the end of the segment after truncation. Leads to a 
> lseek() system call.
> ## Close and unmap the channel.
> Looking in to the current implementation of kafka.log.AbstractIndex#resize, 
> it appears to do quite a bit of extra work to avoid keeping an instance of 
> RandomAccessFile around. It has to reopen the file, truncate, mmap(), 
> potentially perform an additional disk seek, all before imediately closing 
> the file.
> You wouldn't think this would amount to much, but I put together a benchmark 
> using jmh to measure the difference between the current code and a new 
> implementation that didn't have to recreate the page mapping during resize(), 
> and the difference is pretty dramatic.
> {noformat}
> Result "currentImplementation":
>   2063.386 ±(99.9%) 81.758 ops/s [Average]
>   (min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863
>   CI (99.9%): [1981.628, 2145.144] (assumes normal distribution)
> Result "optimizedImplementation":
>   3497.354 ±(99.9%) 31.575 ops/s [Average]
>   (min, avg,

[jira] [Updated] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments

2017-05-20 Thread Kyle Ambroff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Ambroff updated KAFKA-5297:

Issue Type: Bug  (was: Improvement)

> Broker can take a long time to shut down if there are many active log segments
> --
>
> Key: KAFKA-5297
> URL: https://issues.apache.org/jira/browse/KAFKA-5297
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff
>Priority: Minor
> Attachments: LogSegmentBenchmark.java, shutdown-flame-graph.png
>
>
> After the changes for KIP-33 were merged, we started noticing that our 
> cluster restart times were quite a bit longer. In some cases it was taking 
> four times as long as expected to do a rolling restart of every broker in the 
> cluster. This meant that doing a deploy to one of our Kafka clusters went 
> from taking about 3 hours to more than 12 hours!
> We looked into this and we have some data from a couple of runs with a 
> sampling profiler. It turns out that it isn't unusual for us to have a broker 
> sit in kafka.log.Log#close for up to 30 minutes if it has been running for 
> several weeks. There are just so many active log segments that it just takes 
> a long time to truncate all of the indexes.
> I've attached a flame graph that was generated from 10 minutes of stack 
> samples collected during shutdown of a broker that took about 30 minutes 
> total to shut down cleanly.
> * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where 
> every index and timeindex file is truncated to the size of the number of 
> entries in that index.
> * Another big chunk of time is spent reading the last entry from the index, 
> which is used to make any final updates to the timeindex file. This is 
> something that can be cached. For a broker that's been running for a long 
> time the bulk of these indexes are not likely to be in the page cache 
> anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in 
> LogSegment, so we could add a cache for this as well.
> Looking at these changes and considering KIP-33, it isn't surprising that the 
> broker shutdown time has increased so dramatically. The extra index plus the 
> extra reads have increased the amount of work performed by 
> kafka.log.Log#close by about 4x (in terms of system calls and potential page 
> faults). Breaking down what this function does:
> # Read the max timestamp from the timeindex. Could lead to a disk seek.
> # Read the max offset from the index. Could lead to a disk seek.
> # Append the timestamp and offset of the most recently written message to the 
> timeindex if it hasn't been written there for some reason.
> # Truncate the index file
> ## Get the position in the index of the last entry written
> ## If on Windows then unmap and close the index
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped index
> # Same thing as #4 but for the timeindex.
> ## Get the position in the timeindex of the last entry written
> ## If on Windows then unmap and close the timeindex
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped timeindex
> # Finalize the log segment
> ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for 
> that log segment.
> ## Truncate the log segment if it doesn't have enough messages written to 
> fill up the whole thing. Potentially leads to a ftruncate() system call.
> ## Set the position to the end of the segment after truncation. Leads to a 
> lseek() system call.
> ## Close and unmap the channel.
> Looking in to the current implementation of kafka.log.AbstractIndex#resize, 
> it appears to do quite a bit of extra work to avoid keeping an instance of 
> RandomAccessFile around. It has to reopen the file, truncate, mmap(), 
> potentially perform an additional disk seek, all before imediately closing 
> the file.
> You wouldn't think this would amount to much, but I put together a benchmark 
> using jmh to measure the difference between the current code and a new 
> implementation that didn't have to recreate the page mapping during resize(), 
> and the difference is pretty dramatic.
> {noformat}
> Result "currentImplementation":
>   2063.386 ±(99.9%) 81.758 ops/s [Average]
>   (min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863
>   CI (99.9%): [1981.628, 2145.144] (assumes normal distribution)
> Result "optimizedImplementation":
>   3497.354 ±(99.9%) 31.575 ops/s [Average]
>   (min, avg, max) = (

[GitHub] kafka pull request #3110: KAFKA-5297: Reduce time required for shutdown

2017-05-20 Thread ambroff
GitHub user ambroff opened a pull request:

https://github.com/apache/kafka/pull/3110

KAFKA-5297: Reduce time required for shutdown

Avoid reopening and recreating page mappings for index files during
truncation unless it's required by the platform.

In my tests this increases throughput of Log#close by about 70%. See
KAFKA-5297 for details.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ambroff/kafka KAFKA-5297

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3110.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3110


commit d648174f4ef8cedb3c46061e3e348ed83e01f26a
Author: Kyle Ambroff 
Date:   2017-05-20T16:36:21Z

KAFKA-5297: Reduce time required for shutdown

Avoid reopening and recreating page mappings for index files during
truncation unless it's required by the platform.

In my tests this increases throughput of Log#close by about 70%. See
KAFKA-5297 for details.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments

2017-05-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16018542#comment-16018542
 ] 

ASF GitHub Bot commented on KAFKA-5297:
---

GitHub user ambroff opened a pull request:

https://github.com/apache/kafka/pull/3110

KAFKA-5297: Reduce time required for shutdown

Avoid reopening and recreating page mappings for index files during
truncation unless it's required by the platform.

In my tests this increases throughput of Log#close by about 70%. See
KAFKA-5297 for details.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ambroff/kafka KAFKA-5297

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3110.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3110


commit d648174f4ef8cedb3c46061e3e348ed83e01f26a
Author: Kyle Ambroff 
Date:   2017-05-20T16:36:21Z

KAFKA-5297: Reduce time required for shutdown

Avoid reopening and recreating page mappings for index files during
truncation unless it's required by the platform.

In my tests this increases throughput of Log#close by about 70%. See
KAFKA-5297 for details.




> Broker can take a long time to shut down if there are many active log segments
> --
>
> Key: KAFKA-5297
> URL: https://issues.apache.org/jira/browse/KAFKA-5297
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff
>Priority: Minor
> Attachments: LogSegmentBenchmark.java, shutdown-flame-graph.png
>
>
> After the changes for KIP-33 were merged, we started noticing that our 
> cluster restart times were quite a bit longer. In some cases it was taking 
> four times as long as expected to do a rolling restart of every broker in the 
> cluster. This meant that doing a deploy to one of our Kafka clusters went 
> from taking about 3 hours to more than 12 hours!
> We looked into this and we have some data from a couple of runs with a 
> sampling profiler. It turns out that it isn't unusual for us to have a broker 
> sit in kafka.log.Log#close for up to 30 minutes if it has been running for 
> several weeks. There are just so many active log segments that it just takes 
> a long time to truncate all of the indexes.
> I've attached a flame graph that was generated from 10 minutes of stack 
> samples collected during shutdown of a broker that took about 30 minutes 
> total to shut down cleanly.
> * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where 
> every index and timeindex file is truncated to the size of the number of 
> entries in that index.
> * Another big chunk of time is spent reading the last entry from the index, 
> which is used to make any final updates to the timeindex file. This is 
> something that can be cached. For a broker that's been running for a long 
> time the bulk of these indexes are not likely to be in the page cache 
> anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in 
> LogSegment, so we could add a cache for this as well.
> Looking at these changes and considering KIP-33, it isn't surprising that the 
> broker shutdown time has increased so dramatically. The extra index plus the 
> extra reads have increased the amount of work performed by 
> kafka.log.Log#close by about 4x (in terms of system calls and potential page 
> faults). Breaking down what this function does:
> # Read the max timestamp from the timeindex. Could lead to a disk seek.
> # Read the max offset from the index. Could lead to a disk seek.
> # Append the timestamp and offset of the most recently written message to the 
> timeindex if it hasn't been written there for some reason.
> # Truncate the index file
> ## Get the position in the index of the last entry written
> ## If on Windows then unmap and close the index
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped index
> # Same thing as #4 but for the timeindex.
> ## Get the position in the timeindex of the last entry written
> ## If on Windows then unmap and close the timeindex
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped timeindex
> # Finalize the log segment
> ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for 
> that log segment.
> ## Truncate the lo

[jira] [Commented] (KAFKA-5266) Follow-up improvements for consumer offset reset tool (KIP-122)

2017-05-20 Thread Jorge Quilcate (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16018581#comment-16018581
 ] 

Jorge Quilcate commented on KAFKA-5266:
---

I like 'to-current', I'll implement it.

On 4, you're right, NPE is possible in those cases. Wouldn't be better to 
filter those 'null's and print some warning logs before #toMap? And then catch 
any other exception on the all #resetOffsets implementation?

> Follow-up improvements for consumer offset reset tool (KIP-122)
> ---
>
> Key: KAFKA-5266
> URL: https://issues.apache.org/jira/browse/KAFKA-5266
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Jason Gustafson
>Assignee: Jorge Quilcate
> Fix For: 0.11.0.0
>
>
> 1. We should try to ensure that offsets are in range for the topic partition. 
> We currently only verify this for the shift option.
> 2. If you provide a CSV file, you shouldn't need to specify one of the 
> --all-topics or --topic options.
> 3. We currently support a "reset to current offsets" option if none of the 
> supported reset options are provided. This seems kind of useless. Perhaps we 
> should just enforce that one of the reset options is provided.
> 4. The command fails with an NPE if we cannot find one of the offsets we are 
> trying to reset. It would be better to raise an exception with a friendlier 
> message.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5298) MirrorMaker deadlocks with missing topics

2017-05-20 Thread Raymond Conn (JIRA)
Raymond Conn created KAFKA-5298:
---

 Summary: MirrorMaker deadlocks with missing topics
 Key: KAFKA-5298
 URL: https://issues.apache.org/jira/browse/KAFKA-5298
 Project: Kafka
  Issue Type: Bug
  Components: clients, tools
Affects Versions: 0.10.2.1
Reporter: Raymond Conn


When mirrorMaker mirrors a topic to destination brokers that have topic auto 
create disabled and a topic doesn't exist on the destination brokers, the 
producer in mirror maker logs the following 
{code}
Error while fetching metadata with correlation id 467 : 
\{mirror-test2=UNKNOWN_TOPIC_OR_PARTITION\}
Error while fetching metadata with correlation id 468 : 
{mirror-test2=UNKNOWN_TOPIC_OR_PARTITION}
{code}

This log message is fine and expected. The problem is the log message stops ~5 
min later. At which point the logs look fine, but mirror maker is not mirroring 
any of its topics. 

What's worse is mirrorMaker is basically in an unrecoverable state once this 
happens (the log statement stops). If you create the topic at the destination 
mirrorMaker still won't mirror data until a restart. Attempts to restart 
mirrorMaker (cleanly) fail because the process is more or less deadlocked in 
its shutdown hook.

Here is the reasoning:

* MirrorMaker becomes unrecoverable after 5 minutes because of this loop in the 
[producer|https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L543-L561]
* The producer will keep waiting for metadata for the missing topic or until 
the max timeout is reached. (max long in this case) 
* after 5 minutes the producer stops making a metadata request for the topic 
because that topic expires 
[here|https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L218]
 
* topic is never re-added for metadata requests since the only add is before 
entering the loop 
[here|(https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L528]

So basically after 5 minutes all metadata requests moving forward are for no 
topics since the topic expired. The mirrorMaker thread essentially gets stuck 
waiting forever since there will never be a metadata request for the topic the 
thread is waiting on

All of this basically leads to a deadlock state in the shutdown hook. 
* shutdown hook sends a shutdown to the mirrorMaker threads 
* waits for threads to exit their loop by waitind on a 
[latch|https://github.com/apache/kafka/blob/0.10.2/core/src/main/scala/kafka/tools/MirrorMaker.scala#L396]
* latch is never counted down in 
[produce|https://github.com/apache/kafka/blob/0.10.2/core/src/main/scala/kafka/tools/MirrorMaker.scala#L434]
 
* thread will never exit the loop to countdown the latch on line 462.

This can be seen with a thread dump of the shutdown hook thread
{code}
Name: MirrorMakerShutdownHook
State: WAITING on java.util.concurrent.CountDownLatch$Sync@3ffebeac
Total blocked: 0  Total waited: 1

Stack trace: 
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
kafka.tools.MirrorMaker$MirrorMakerThread.awaitShutdown(MirrorMaker.scala:498)
kafka.tools.MirrorMaker$$anonfun$cleanShutdown$4.apply(MirrorMaker.scala:396)
kafka.tools.MirrorMaker$$anonfun$cleanShutdown$4.apply(MirrorMaker.scala:396)
scala.collection.Iterator$class.foreach(Iterator.scala:893)
{code}

The root of the issue more or less related to the issue documented here where 
the producer can block waiting for metadata. 
https://issues.apache.org/jira/browse/KAFKA-3450



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3450) Producer blocks on send to topic that doesn't exist if auto create is disabled

2017-05-20 Thread Raymond Conn (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16018675#comment-16018675
 ] 

Raymond Conn commented on KAFKA-3450:
-

Also it looks like if the {{max.block.ms}} is greater than 5 minutes the caller 
will be forced to wait the max block time even if the topic does get created. 
After 5 minutes the topic will get removed from the topics map inside of the 
[Metadata | 
https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L218]
 class.
It will never get added back since it looks like the only [add | 
https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L528]
 is out side the [loop | 
https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L561]
 that the producer is "blocking" on. Basically after 5 minutes all metadata 
requests are for no topics.


{code}
2017-05-20 20:20:05 DEBUG NetworkClient - Sending metadata request 
(type=MetadataRequest, topics=topic-test2) to node 2
2017-05-20 20:20:05 TRACE NetworkClient - Sending {topics=[topic-test2]} to 
node 2.
2017-05-20 20:20:12 DEBUG Metadata - Removing unused topic topic-test2 from the 
metadata list, expiryMs 1495324449201 now 1495326005464
2017-05-20 20:20:22 DEBUG NetworkClient - Sending metadata request 
(type=MetadataRequest, topics=) to node 5
2017-05-20 20:20:22 TRACE NetworkClient - Sending {topics=[]} to node 5.
2017-05-20 20:20:35 DEBUG NetworkClient - Sending metadata request 
(type=MetadataRequest, topics=) to node 2
2017-05-20 20:20:35 TRACE NetworkClient - Sending {topics=[]} to node 2.
2017-05-20 20:20:48 DEBUG NetworkClient - Sending metadata request 
(type=MetadataRequest, topics=) to node 1
2017-05-20 20:20:48 TRACE NetworkClient - Sending {topics=[]} to node 1.
{code}


> Producer blocks on send to topic that doesn't exist if auto create is disabled
> --
>
> Key: KAFKA-3450
> URL: https://issues.apache.org/jira/browse/KAFKA-3450
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.9.0.1
>Reporter: Michal Turek
>Priority: Critical
>
> {{producer.send()}} is blocked for {{max.block.ms}} (default 60 seconds) if 
> the destination topic doesn't exist and if their automatic creation is 
> disabled. Warning from NetworkClient containing UNKNOWN_TOPIC_OR_PARTITION is 
> logged every 100 ms in a loop until the 60 seconds timeout expires, but the 
> operation is not recoverable.
> Preconditions
> - Kafka 0.9.0.1 with default configuration and auto.create.topics.enable=false
> - Kafka 0.9.0.1 clients.
> Example minimalist code
> https://github.com/avast/kafka-tests/blob/master/src/main/java/com/avast/kafkatests/othertests/nosuchtopic/NoSuchTopicTest.java
> {noformat}
> /**
>  * Test of sending to a topic that does not exist while automatic creation of 
> topics is disabled in Kafka (auto.create.topics.enable=false).
>  */
> public class NoSuchTopicTest {
> private static final Logger LOGGER = 
> LoggerFactory.getLogger(NoSuchTopicTest.class);
> public static void main(String[] args) {
> Properties properties = new Properties();
> properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9092");
> properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> NoSuchTopicTest.class.getSimpleName());
> properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); 
> // Default is 60 seconds
> try (Producer producer = new 
> KafkaProducer<>(properties, new StringSerializer(), new StringSerializer())) {
> LOGGER.info("Sending message");
> producer.send(new ProducerRecord<>("ThisTopicDoesNotExist", 
> "key", "value"), (metadata, exception) -> {
> if (exception != null) {
> LOGGER.error("Send failed: {}", exception.toString());
> } else {
> LOGGER.info("Send successful: {}-{}/{}", 
> metadata.topic(), metadata.partition(), metadata.offset());
> }
> });
> LOGGER.info("Sending message");
> producer.send(new ProducerRecord<>("ThisTopicDoesNotExistToo", 
> "key", "value"), (metadata, exception) -> {
> if (exception != null) {
> LOGGER.error("Send failed: {}", exception.toString());
> } else {
> LOGGER.info("Send successful: {}-{}/{}", 
> metadata.topic(), metadata.partition(), metadata.offset());
> }
> });
> }
> }
> }
> {noformat}
> Relate

[jira] [Created] (KAFKA-5299) MirrorMaker with New.consumer doesn't consume message from multiple topics whitelisted

2017-05-20 Thread Jyoti (JIRA)
Jyoti created KAFKA-5299:


 Summary: MirrorMaker with New.consumer doesn't consume message 
from multiple topics whitelisted 
 Key: KAFKA-5299
 URL: https://issues.apache.org/jira/browse/KAFKA-5299
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.9.0.1
Reporter: Jyoti






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #3111: HOTFIX: Increase timeouts in distributed connect t...

2017-05-20 Thread kkonstantine
GitHub user kkonstantine opened a pull request:

https://github.com/apache/kafka/pull/3111

HOTFIX: Increase timeouts in distributed connect tests.

  * There's considerably increased logging in DEBUG mode due to the class 
scanning 
performed with the new isolation scheme.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kkonstantine/kafka 
HOTFIX-Increase-timeouts-in-distributed-connect-tests-to-handle-increase-startup-due-to-class-scanning

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3111.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3111


commit 5538eaa65d7ed614061d6de52ce7f61f7dd7fcd8
Author: Konstantine Karantasis 
Date:   2017-05-21T05:45:40Z

HOTFIX: Increase timeouts in distributed connect tests.

  * There's considerably increased logging due to the class scanning
  performed with the new isolation scheme in DEBUG mode.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---