Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner
Hi, Thanks for your comments. I rethink about including rich functions into this KIP. I think once we include rich functions in this KIP and then fix ProcessorContext in another KIP and incorporate with existing rich functions, the code will not be backwards compatible. I see Damian's and your point more clearly now. Conclusion: we include only withKey interfaces in this KIP (I updated the KIP), I will try also initiate another KIP for rich functions. Cheers, Jeyhun On Fri, May 19, 2017 at 10:50 PM Matthias J. Sax wrote: > With the current KIP, using ValueMapper and ValueMapperWithKey > interfaces, RichFunction seems to be an independent add-on. To fix the > original issue to allow key access, RichFunctions are not required IMHO. > > I initially put the RichFunction idea on the table, because I was hoping > to get a uniform API. And I think, is was good to consider them -- > however, the discussion showed that they are not necessary for key > access. Thus, it seems to be better to handle RichFunctions in an own > KIP. The ProcessorContext/RecordContext issues seems to be a main > challenge for this. And introducing RichFunctions with parameter-less > init() method, seem not to help too much. We would get an "intermediate" > API that we want to change anyway later on... > > As you put already much effort into RichFunction, feel free do push this > further and start a new KIP (we can do this even in parallel) -- we > don't want to slow you down :) But it make the discussion and code > review easier, if we separate both IMHO. > > > -Matthias > > > On 5/19/17 2:25 AM, Jeyhun Karimov wrote: > > Hi Damian, > > > > Thanks for your comments. I think providing to users *interface* rather > > than *abstract class* should be preferred (Matthias also raised this > issue > > ), anyway I changed the corresponding parts of KIP. > > > > Regarding with passing additional contextual information, I think it is a > > tradeoff, > > 1) first, we fix the context parameter for *init() *method in another PR > > and solve Rich functions afterwards > > 2) first, we fix the requested issues on jira ([1-3]) with providing > > (not-complete) Rich functions and integrate the context parameters to > this > > afterwards (like improvement) > > > > To me, the second approach seems more incremental. However you are right, > > the names might confuse the users. > > > > > > > > [1] https://issues.apache.org/jira/browse/KAFKA-4218 > > [2] https://issues.apache.org/jira/browse/KAFKA-4726 > > [3] https://issues.apache.org/jira/browse/KAFKA-3745 > > > > > > Cheers, > > Jeyhun > > > > > > On Fri, May 19, 2017 at 10:42 AM Damian Guy > wrote: > > > >> Hi, > >> > >> I see you've removed the `ProcessorContext` from the RichFunction which > is > >> good, but why is it a `RichFunction`? I'd have expected it to pass some > >> additional contextual information, i.e., the `RecordContext` that > contains > >> just the topic, partition, timestamp, offset. I'm ok with it not > passing > >> this contextual information, but is the naming incorrect? I'm not sure, > >> tbh. I'm wondering if we should drop `RichFunctions` until we can do it > >> properly with the correct context? > >> > >> Also, i don't like the abstract classes: RichValueMapper, > RichValueJoiner, > >> RichInitializer etc. Why can't they not just be interfaces? Generally we > >> should provide users with Intefaces to code against, not classes. > >> > >> Thanks, > >> Damian > >> > >> On Fri, 19 May 2017 at 00:50 Jeyhun Karimov > wrote: > >> > >>> Hi, > >>> > >>> Thanks. I initiated the PR as well, to get a better overview. > >>> > >>> The only reason that I used abstract class instead of interface for > Rich > >>> functions is that in future if we will have some AbstractRichFunction > >>> abstract classes, > >>> we can easily extend: > >>> > >>> public abstract class RichValueMapper implements > >>> ValueMapperWithKey, RichFunction *extends > >> AbstractRichFunction*{ > >>> } > >>> With interfaces we are only limited to interfaces for inheritance. > >>> > >>> However, I think we can easily change it (from abstract class -> > >> interface) > >>> if you think interface is a better fit. > >>> > >>> > >>> Cheers, > >>> Jeyhun > >>> > >>> > >>> On Fri, May 19, 2017 at 1:00 AM Matthias J. Sax > > >>> wrote: > >>> > Thanks for the update and explanations. The KIP is quite improved now > >> -- > great job! > > One more question: Why are RichValueMapper etc abstract classes and > not > interfaces? > > > Overall, I like the KIP a lot! > > > -Matthias > > > On 5/16/17 7:03 AM, Jeyhun Karimov wrote: > > Hi, > > > > Thanks for your comments. > > > > I think supporting Lambdas for `withKey` and `AbstractRichFunction` > >> don't go together, as Lambdas are only supported for interfaces > >> AFAIK. > > > > > > Maybe I misunderstood your comment. > > *withKey* and and *withOnlyValue* are interf
[GitHub] kafka pull request #3109: KAFKA-4144: Allow per stream/table timestamp extra...
GitHub user jeyhunkarimov opened a pull request: https://github.com/apache/kafka/pull/3109 KAFKA-4144: Allow per stream/table timestamp extractor A follow up RP to fix [issue](https://github.com/confluentinc/examples/commit/2cd0b87bc8a7eab0e7199fa0079db6417f0e6b63#commitcomment-22200864) You can merge this pull request into a Git repository by running: $ git pull https://github.com/jeyhunkarimov/kafka KAFKA-4144-follow-up Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3109.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3109 commit a35b23a19358d7c2df421e290e5918a45c9cc617 Author: Jeyhun Karimov Date: 2017-05-20T09:26:49Z Follow up overload needed for globalTable() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-4144) Allow per stream/table timestamp extractor
[ https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16018382#comment-16018382 ] ASF GitHub Bot commented on KAFKA-4144: --- GitHub user jeyhunkarimov opened a pull request: https://github.com/apache/kafka/pull/3109 KAFKA-4144: Allow per stream/table timestamp extractor A follow up RP to fix [issue](https://github.com/confluentinc/examples/commit/2cd0b87bc8a7eab0e7199fa0079db6417f0e6b63#commitcomment-22200864) You can merge this pull request into a Git repository by running: $ git pull https://github.com/jeyhunkarimov/kafka KAFKA-4144-follow-up Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3109.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3109 commit a35b23a19358d7c2df421e290e5918a45c9cc617 Author: Jeyhun Karimov Date: 2017-05-20T09:26:49Z Follow up overload needed for globalTable() > Allow per stream/table timestamp extractor > -- > > Key: KAFKA-4144 > URL: https://issues.apache.org/jira/browse/KAFKA-4144 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Jeyhun Karimov > Labels: api, kip > Fix For: 0.11.0.0 > > > At the moment the timestamp extractor is configured via a {{StreamConfig}} > value to {{KafkaStreams}}. That means you can only have a single timestamp > extractor per app, even though you may be joining multiple streams/tables > that require different timestamp extraction methods. > You should be able to specify a timestamp extractor via > {{KStreamBuilder.stream()/table()}}, just like you can specify key and value > serdes that override the StreamConfig defaults. > KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788 > Specifying a per-stream extractor should only be possible for sources, but > not for intermediate topics. For PAPI we cannot enforce this, but for DSL > {{through()}} should not allow to set a custom extractor by the user. In > contrast, with regard to KAFKA-4785, is must internally set an extractor that > returns the record's metadata timestamp in order to overwrite the global > extractor from {{StreamsConfig}} (ie, set > {{FailOnInvalidTimestampExtractor}}). This change should be done in > KAFKA-4785 though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Re: [VOTE] KIP-123: Allow per stream/table timestamp extractor
Dear community, I want to inform you that because of the case [1] we needed to add an extra overloaded method to KIP: *KStreamBuilder.globalTable(final Serde keySerde, final Serde valSerde, final String topic, final String storeName) * [1] https://github.com/confluentinc/examples/commit/2cd0b87bc8a7eab0e7199fa0079db6417f0e6b63#commitcomment-22200864 Cheers, Jeyhun On Sat, May 6, 2017 at 1:57 AM Guozhang Wang wrote: > THank you, Jeyhun! > > Will made a final pass on the PR soon. > > Guozhang > > On Tue, Apr 25, 2017 at 11:13 PM, Michael Noll > wrote: > > > Thanks for your work and for driving this, Jeyhun! :-) > > > > -Michael > > > > > > On Tue, Apr 25, 2017 at 11:11 PM, Jeyhun Karimov > > wrote: > > > > > Dear all, > > > > > > I am closing this vote now. The KIP got accepted with > > > > > > +3 binding (Guozhang, Ewen, Gwen) > > > > > > Thanks all (especially for Mathias) for guiding me throughout my first > > KIP. > > > > > > > > > Cheers, > > > Jeyhun > > > > > > On Mon, Apr 24, 2017 at 9:32 PM Thomas Becker > wrote: > > > > > > > +1 (non-binding) > > > > > > > > On Tue, 2017-02-28 at 08:59 +, Jeyhun Karimov wrote: > > > > > Dear community, > > > > > > > > > > I'd like to start the vote for KIP-123: > > > > > https://cwiki.apache.org/confluence/pages/viewpage. > > action?pageId=6871 > > > > > 4788 > > > > > > > > > > > > > > > Cheers, > > > > > Jeyhun > > > > -- > > > > > > > > > > > > Tommy Becker > > > > > > > > Senior Software Engineer > > > > > > > > O +1 919.460.4747 <(919)%20460-4747> <(919)%20460-4747> > > > > > > > > tivo.com > > > > > > > > > > > > > > > > > > > > This email and any attachments may contain confidential and > privileged > > > > material for the sole use of the intended recipient. Any review, > > copying, > > > > or distribution of this email (or any attachments) by others is > > > prohibited. > > > > If you are not the intended recipient, please contact the sender > > > > immediately and permanently delete this email and any attachments. No > > > > employee or agent of TiVo Inc. is authorized to conclude any binding > > > > agreement on behalf of TiVo Inc. by email. Binding agreements with > TiVo > > > > Inc. may only be made by a signed written agreement. > > > > > > > -- > > > -Cheers > > > > > > Jeyhun > > > > > > > > > -- > -- Guozhang > -- -Cheers Jeyhun
[jira] [Assigned] (KAFKA-4660) Improve test coverage KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-4660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Umesh Chaudhary reassigned KAFKA-4660: -- Assignee: Umesh Chaudhary > Improve test coverage KafkaStreams > -- > > Key: KAFKA-4660 > URL: https://issues.apache.org/jira/browse/KAFKA-4660 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Assignee: Umesh Chaudhary >Priority: Minor > Fix For: 0.11.0.0 > > > {{toString}} is used to print the topology, so probably should have a unit > test. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[DISCUSS]: KIP-159: Introducing Rich functions to Streams
Dear community, As we discussed in KIP-149 [DISCUSS] thread [1], I would like to initiate KIP for rich functions (interfaces) [2]. I would like to get your comments. [1] http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj=Re+DISCUSS+KIP+149+Enabling+key+access+in+ValueTransformer+ValueMapper+and+ValueJoiner [2] https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams Cheers, Jeyhun -- -Cheers Jeyhun
[jira] [Created] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments
Kyle Ambroff created KAFKA-5297: --- Summary: Broker can take a long time to shut down if there are many active log segments Key: KAFKA-5297 URL: https://issues.apache.org/jira/browse/KAFKA-5297 Project: Kafka Issue Type: Improvement Reporter: Kyle Ambroff Priority: Minor Attachments: shutdown-flame-graph.png After the changes for KIP-33 were merged, we started noticing that our cluster restart times were quite a bit longer. In some cases it was taking four times as long as expected to do a rolling restart of every broker in the cluster. This meant that doing a deploy to one of our Kafka clusters went from taking about 3 hours to more than 12 hours! We looked into this and we have some data from a couple of runs with a sampling profiler. It turns out that it isn't unusual for us to have a broker sit in kafka.log.Log#close for up to 30 minutes if it has been running for several weeks. There are just so many active log segments that it just takes a long time to truncate all of the indexes. I've attached a flame graph that was generated from 10 minutes of stack samples collected during shutdown of a broker that took about 30 minutes total to shut down cleanly. * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where every index and timeindex file is truncated to the size of the number of entries in that index. * Another big chunk of time is spent reading the last entry from the index, which is used to make any final updates to the timeindex file. This is something that can be cached. For a broker that's been running for a long time the bulk of these indexes are not likely to be in the page cache anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in LogSegment, so we could add a cache for this as well. Looking at these changes and considering KIP-33, it isn't surprising that the broker shutdown time has increased so dramatically. The extra index plus the extra reads have increased the amount of work performed by kafka.log.Log#close by about 4x (in terms of system calls and potential page faults). Breaking down what this function does: # Read the max timestamp from the timeindex. Could lead to a disk seek. # Read the max offset from the index. Could lead to a disk seek. # Append the timestamp and offset of the most recently written message to the timeindex if it hasn't been written there for some reason. # Truncate the index file ## Get the position in the index of the last entry written ## If on Windows then unmap and close the index ## reopen ## truncate to the number of entries * entry size. (ftruncate() system call) ## mmap() ## Set the position back to where it was before the original. Leads to lseek() system call. ## Close the newly reopenned and mapped index # Same thing as #4 but for the timeindex. ## Get the position in the timeindex of the last entry written ## If on Windows then unmap and close the timeindex ## reopen ## truncate to the number of entries * entry size. (ftruncate() system call) ## mmap() ## Set the position back to where it was before the original. Leads to lseek() system call. ## Close the newly reopenned and mapped timeindex # Finalize the log segment ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for that log segment. ## Truncate the log segment if it doesn't have enough messages written to fill up the whole thing. Potentially leads to a ftruncate() system call. ## Set the position to the end of the segment after truncation. Leads to a lseek() system call. ## Close and unmap the channel. Looking in to the current implementation of kafka.log.AbstractIndex#resize, it appears to do quite a bit of extra work to avoid keeping an instance of RandomAccessFile around. It has to reopen the file, truncate, mmap(), potentially perform an additional disk seek, all before imediately closing the file. You wouldn't think this would amount to much, but I put together a benchmark using jmh to measure the difference between the current code and a new implementation that didn't have to recreate the page mapping during resize(), and the difference is pretty dramatic. {noformat} Result "currentImplementation": 2063.386 ±(99.9%) 81.758 ops/s [Average] (min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863 CI (99.9%): [1981.628, 2145.144] (assumes normal distribution) Result "optimizedImplementation": 3497.354 ±(99.9%) 31.575 ops/s [Average] (min, avg, max) = (3261.232, 3497.354, 3605.527), stdev = 70.623 CI (99.9%): [3465.778, 3528.929] (assumes normal distribution) # Run complete. Total time: 00:03:37 Benchmark Mode Cnt ScoreError Units LogSegmentBenchmark.currentImplementationthrpt 60 2063.386 ± 81.758 ops/s LogSegmentBenchmark.optimizedImplementation thrpt 60 3497.354 ± 31.575 o
[jira] [Commented] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments
[ https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16018527#comment-16018527 ] Kyle Ambroff commented on KAFKA-5297: - Patch incoming. > Broker can take a long time to shut down if there are many active log segments > -- > > Key: KAFKA-5297 > URL: https://issues.apache.org/jira/browse/KAFKA-5297 > Project: Kafka > Issue Type: Improvement >Reporter: Kyle Ambroff >Priority: Minor > Attachments: shutdown-flame-graph.png > > > After the changes for KIP-33 were merged, we started noticing that our > cluster restart times were quite a bit longer. In some cases it was taking > four times as long as expected to do a rolling restart of every broker in the > cluster. This meant that doing a deploy to one of our Kafka clusters went > from taking about 3 hours to more than 12 hours! > We looked into this and we have some data from a couple of runs with a > sampling profiler. It turns out that it isn't unusual for us to have a broker > sit in kafka.log.Log#close for up to 30 minutes if it has been running for > several weeks. There are just so many active log segments that it just takes > a long time to truncate all of the indexes. > I've attached a flame graph that was generated from 10 minutes of stack > samples collected during shutdown of a broker that took about 30 minutes > total to shut down cleanly. > * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where > every index and timeindex file is truncated to the size of the number of > entries in that index. > * Another big chunk of time is spent reading the last entry from the index, > which is used to make any final updates to the timeindex file. This is > something that can be cached. For a broker that's been running for a long > time the bulk of these indexes are not likely to be in the page cache > anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in > LogSegment, so we could add a cache for this as well. > Looking at these changes and considering KIP-33, it isn't surprising that the > broker shutdown time has increased so dramatically. The extra index plus the > extra reads have increased the amount of work performed by > kafka.log.Log#close by about 4x (in terms of system calls and potential page > faults). Breaking down what this function does: > # Read the max timestamp from the timeindex. Could lead to a disk seek. > # Read the max offset from the index. Could lead to a disk seek. > # Append the timestamp and offset of the most recently written message to the > timeindex if it hasn't been written there for some reason. > # Truncate the index file > ## Get the position in the index of the last entry written > ## If on Windows then unmap and close the index > ## reopen > ## truncate to the number of entries * entry size. (ftruncate() system call) > ## mmap() > ## Set the position back to where it was before the original. Leads to > lseek() system call. > ## Close the newly reopenned and mapped index > # Same thing as #4 but for the timeindex. > ## Get the position in the timeindex of the last entry written > ## If on Windows then unmap and close the timeindex > ## reopen > ## truncate to the number of entries * entry size. (ftruncate() system call) > ## mmap() > ## Set the position back to where it was before the original. Leads to > lseek() system call. > ## Close the newly reopenned and mapped timeindex > # Finalize the log segment > ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for > that log segment. > ## Truncate the log segment if it doesn't have enough messages written to > fill up the whole thing. Potentially leads to a ftruncate() system call. > ## Set the position to the end of the segment after truncation. Leads to a > lseek() system call. > ## Close and unmap the channel. > Looking in to the current implementation of kafka.log.AbstractIndex#resize, > it appears to do quite a bit of extra work to avoid keeping an instance of > RandomAccessFile around. It has to reopen the file, truncate, mmap(), > potentially perform an additional disk seek, all before imediately closing > the file. > You wouldn't think this would amount to much, but I put together a benchmark > using jmh to measure the difference between the current code and a new > implementation that didn't have to recreate the page mapping during resize(), > and the difference is pretty dramatic. > {noformat} > Result "currentImplementation": > 2063.386 ±(99.9%) 81.758 ops/s [Average] > (min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863 > CI (99.9%): [1981.628, 2145.144] (assumes normal distribution) > Result "optimizedImplementation": > 3497.354 ±(99.9%) 31.575 ops/s [Average] > (mi
[jira] [Updated] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments
[ https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Ambroff updated KAFKA-5297: Attachment: LogSegmentBenchmark.java Attaching benchmark code in case anyone wants to validate the way I measured this. Requires adding a test dependency. {noformat} diff --git a/build.gradle b/build.gradle index 8d6e703..0c55650 100644 --- a/build.gradle +++ b/build.gradle @@ -520,6 +520,9 @@ project(':core') { testCompile libs.junit testCompile libs.scalaTest testCompile libs.jfreechart +testCompile 'org.openjdk.jmh:jmh-core:1.13' +testCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.13' + scoverage libs.scoveragePlugin scoverage libs.scoverageRuntime {noformat} > Broker can take a long time to shut down if there are many active log segments > -- > > Key: KAFKA-5297 > URL: https://issues.apache.org/jira/browse/KAFKA-5297 > Project: Kafka > Issue Type: Improvement >Reporter: Kyle Ambroff >Priority: Minor > Attachments: LogSegmentBenchmark.java, shutdown-flame-graph.png > > > After the changes for KIP-33 were merged, we started noticing that our > cluster restart times were quite a bit longer. In some cases it was taking > four times as long as expected to do a rolling restart of every broker in the > cluster. This meant that doing a deploy to one of our Kafka clusters went > from taking about 3 hours to more than 12 hours! > We looked into this and we have some data from a couple of runs with a > sampling profiler. It turns out that it isn't unusual for us to have a broker > sit in kafka.log.Log#close for up to 30 minutes if it has been running for > several weeks. There are just so many active log segments that it just takes > a long time to truncate all of the indexes. > I've attached a flame graph that was generated from 10 minutes of stack > samples collected during shutdown of a broker that took about 30 minutes > total to shut down cleanly. > * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where > every index and timeindex file is truncated to the size of the number of > entries in that index. > * Another big chunk of time is spent reading the last entry from the index, > which is used to make any final updates to the timeindex file. This is > something that can be cached. For a broker that's been running for a long > time the bulk of these indexes are not likely to be in the page cache > anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in > LogSegment, so we could add a cache for this as well. > Looking at these changes and considering KIP-33, it isn't surprising that the > broker shutdown time has increased so dramatically. The extra index plus the > extra reads have increased the amount of work performed by > kafka.log.Log#close by about 4x (in terms of system calls and potential page > faults). Breaking down what this function does: > # Read the max timestamp from the timeindex. Could lead to a disk seek. > # Read the max offset from the index. Could lead to a disk seek. > # Append the timestamp and offset of the most recently written message to the > timeindex if it hasn't been written there for some reason. > # Truncate the index file > ## Get the position in the index of the last entry written > ## If on Windows then unmap and close the index > ## reopen > ## truncate to the number of entries * entry size. (ftruncate() system call) > ## mmap() > ## Set the position back to where it was before the original. Leads to > lseek() system call. > ## Close the newly reopenned and mapped index > # Same thing as #4 but for the timeindex. > ## Get the position in the timeindex of the last entry written > ## If on Windows then unmap and close the timeindex > ## reopen > ## truncate to the number of entries * entry size. (ftruncate() system call) > ## mmap() > ## Set the position back to where it was before the original. Leads to > lseek() system call. > ## Close the newly reopenned and mapped timeindex > # Finalize the log segment > ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for > that log segment. > ## Truncate the log segment if it doesn't have enough messages written to > fill up the whole thing. Potentially leads to a ftruncate() system call. > ## Set the position to the end of the segment after truncation. Leads to a > lseek() system call. > ## Close and unmap the channel. > Looking in to the current implementation of kafka.log.AbstractIndex#resize, > it appears to do quite a bit of extra work to avoid keeping an instance of > RandomAccessFile around. It has to reopen the file, truncate, mmap(), > potentially perform an additional disk seek, all before imediately closing > the file. > You wouldn't think this
[jira] [Comment Edited] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments
[ https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16018528#comment-16018528 ] Kyle Ambroff edited comment on KAFKA-5297 at 5/20/17 4:13 PM: -- Attaching benchmark code in case anyone wants to validate the way I measured this. Requires adding a test dependency. I also copied the existing implementation so that I could compare both in one run. {noformat} diff --git a/build.gradle b/build.gradle index 8d6e703..0c55650 100644 --- a/build.gradle +++ b/build.gradle @@ -520,6 +520,9 @@ project(':core') { testCompile libs.junit testCompile libs.scalaTest testCompile libs.jfreechart +testCompile 'org.openjdk.jmh:jmh-core:1.13' +testCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.13' + scoverage libs.scoveragePlugin scoverage libs.scoverageRuntime {noformat} was (Author: ambroff): Attaching benchmark code in case anyone wants to validate the way I measured this. Requires adding a test dependency. {noformat} diff --git a/build.gradle b/build.gradle index 8d6e703..0c55650 100644 --- a/build.gradle +++ b/build.gradle @@ -520,6 +520,9 @@ project(':core') { testCompile libs.junit testCompile libs.scalaTest testCompile libs.jfreechart +testCompile 'org.openjdk.jmh:jmh-core:1.13' +testCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.13' + scoverage libs.scoveragePlugin scoverage libs.scoverageRuntime {noformat} > Broker can take a long time to shut down if there are many active log segments > -- > > Key: KAFKA-5297 > URL: https://issues.apache.org/jira/browse/KAFKA-5297 > Project: Kafka > Issue Type: Improvement >Reporter: Kyle Ambroff >Priority: Minor > Attachments: LogSegmentBenchmark.java, shutdown-flame-graph.png > > > After the changes for KIP-33 were merged, we started noticing that our > cluster restart times were quite a bit longer. In some cases it was taking > four times as long as expected to do a rolling restart of every broker in the > cluster. This meant that doing a deploy to one of our Kafka clusters went > from taking about 3 hours to more than 12 hours! > We looked into this and we have some data from a couple of runs with a > sampling profiler. It turns out that it isn't unusual for us to have a broker > sit in kafka.log.Log#close for up to 30 minutes if it has been running for > several weeks. There are just so many active log segments that it just takes > a long time to truncate all of the indexes. > I've attached a flame graph that was generated from 10 minutes of stack > samples collected during shutdown of a broker that took about 30 minutes > total to shut down cleanly. > * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where > every index and timeindex file is truncated to the size of the number of > entries in that index. > * Another big chunk of time is spent reading the last entry from the index, > which is used to make any final updates to the timeindex file. This is > something that can be cached. For a broker that's been running for a long > time the bulk of these indexes are not likely to be in the page cache > anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in > LogSegment, so we could add a cache for this as well. > Looking at these changes and considering KIP-33, it isn't surprising that the > broker shutdown time has increased so dramatically. The extra index plus the > extra reads have increased the amount of work performed by > kafka.log.Log#close by about 4x (in terms of system calls and potential page > faults). Breaking down what this function does: > # Read the max timestamp from the timeindex. Could lead to a disk seek. > # Read the max offset from the index. Could lead to a disk seek. > # Append the timestamp and offset of the most recently written message to the > timeindex if it hasn't been written there for some reason. > # Truncate the index file > ## Get the position in the index of the last entry written > ## If on Windows then unmap and close the index > ## reopen > ## truncate to the number of entries * entry size. (ftruncate() system call) > ## mmap() > ## Set the position back to where it was before the original. Leads to > lseek() system call. > ## Close the newly reopenned and mapped index > # Same thing as #4 but for the timeindex. > ## Get the position in the timeindex of the last entry written > ## If on Windows then unmap and close the timeindex > ## reopen > ## truncate to the number of entries * entry size. (ftruncate() system call) > ## mmap() > ## Set the position back to where it was before the original. Leads to > lseek() system call. > ## Close the newly reopenned and mapped timeindex > # Finalize
[jira] [Updated] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments
[ https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Ambroff updated KAFKA-5297: Attachment: (was: LogSegmentBenchmark.java) > Broker can take a long time to shut down if there are many active log segments > -- > > Key: KAFKA-5297 > URL: https://issues.apache.org/jira/browse/KAFKA-5297 > Project: Kafka > Issue Type: Improvement >Reporter: Kyle Ambroff >Priority: Minor > Attachments: LogSegmentBenchmark.java, shutdown-flame-graph.png > > > After the changes for KIP-33 were merged, we started noticing that our > cluster restart times were quite a bit longer. In some cases it was taking > four times as long as expected to do a rolling restart of every broker in the > cluster. This meant that doing a deploy to one of our Kafka clusters went > from taking about 3 hours to more than 12 hours! > We looked into this and we have some data from a couple of runs with a > sampling profiler. It turns out that it isn't unusual for us to have a broker > sit in kafka.log.Log#close for up to 30 minutes if it has been running for > several weeks. There are just so many active log segments that it just takes > a long time to truncate all of the indexes. > I've attached a flame graph that was generated from 10 minutes of stack > samples collected during shutdown of a broker that took about 30 minutes > total to shut down cleanly. > * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where > every index and timeindex file is truncated to the size of the number of > entries in that index. > * Another big chunk of time is spent reading the last entry from the index, > which is used to make any final updates to the timeindex file. This is > something that can be cached. For a broker that's been running for a long > time the bulk of these indexes are not likely to be in the page cache > anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in > LogSegment, so we could add a cache for this as well. > Looking at these changes and considering KIP-33, it isn't surprising that the > broker shutdown time has increased so dramatically. The extra index plus the > extra reads have increased the amount of work performed by > kafka.log.Log#close by about 4x (in terms of system calls and potential page > faults). Breaking down what this function does: > # Read the max timestamp from the timeindex. Could lead to a disk seek. > # Read the max offset from the index. Could lead to a disk seek. > # Append the timestamp and offset of the most recently written message to the > timeindex if it hasn't been written there for some reason. > # Truncate the index file > ## Get the position in the index of the last entry written > ## If on Windows then unmap and close the index > ## reopen > ## truncate to the number of entries * entry size. (ftruncate() system call) > ## mmap() > ## Set the position back to where it was before the original. Leads to > lseek() system call. > ## Close the newly reopenned and mapped index > # Same thing as #4 but for the timeindex. > ## Get the position in the timeindex of the last entry written > ## If on Windows then unmap and close the timeindex > ## reopen > ## truncate to the number of entries * entry size. (ftruncate() system call) > ## mmap() > ## Set the position back to where it was before the original. Leads to > lseek() system call. > ## Close the newly reopenned and mapped timeindex > # Finalize the log segment > ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for > that log segment. > ## Truncate the log segment if it doesn't have enough messages written to > fill up the whole thing. Potentially leads to a ftruncate() system call. > ## Set the position to the end of the segment after truncation. Leads to a > lseek() system call. > ## Close and unmap the channel. > Looking in to the current implementation of kafka.log.AbstractIndex#resize, > it appears to do quite a bit of extra work to avoid keeping an instance of > RandomAccessFile around. It has to reopen the file, truncate, mmap(), > potentially perform an additional disk seek, all before imediately closing > the file. > You wouldn't think this would amount to much, but I put together a benchmark > using jmh to measure the difference between the current code and a new > implementation that didn't have to recreate the page mapping during resize(), > and the difference is pretty dramatic. > {noformat} > Result "currentImplementation": > 2063.386 ±(99.9%) 81.758 ops/s [Average] > (min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863 > CI (99.9%): [1981.628, 2145.144] (assumes normal distribution) > Result "optimizedImplementation": > 3497.354 ±(99.9%) 31.575 ops/s [Average] >
[jira] [Updated] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments
[ https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Ambroff updated KAFKA-5297: Attachment: LogSegmentBenchmark.java > Broker can take a long time to shut down if there are many active log segments > -- > > Key: KAFKA-5297 > URL: https://issues.apache.org/jira/browse/KAFKA-5297 > Project: Kafka > Issue Type: Improvement >Reporter: Kyle Ambroff >Priority: Minor > Attachments: LogSegmentBenchmark.java, shutdown-flame-graph.png > > > After the changes for KIP-33 were merged, we started noticing that our > cluster restart times were quite a bit longer. In some cases it was taking > four times as long as expected to do a rolling restart of every broker in the > cluster. This meant that doing a deploy to one of our Kafka clusters went > from taking about 3 hours to more than 12 hours! > We looked into this and we have some data from a couple of runs with a > sampling profiler. It turns out that it isn't unusual for us to have a broker > sit in kafka.log.Log#close for up to 30 minutes if it has been running for > several weeks. There are just so many active log segments that it just takes > a long time to truncate all of the indexes. > I've attached a flame graph that was generated from 10 minutes of stack > samples collected during shutdown of a broker that took about 30 minutes > total to shut down cleanly. > * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where > every index and timeindex file is truncated to the size of the number of > entries in that index. > * Another big chunk of time is spent reading the last entry from the index, > which is used to make any final updates to the timeindex file. This is > something that can be cached. For a broker that's been running for a long > time the bulk of these indexes are not likely to be in the page cache > anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in > LogSegment, so we could add a cache for this as well. > Looking at these changes and considering KIP-33, it isn't surprising that the > broker shutdown time has increased so dramatically. The extra index plus the > extra reads have increased the amount of work performed by > kafka.log.Log#close by about 4x (in terms of system calls and potential page > faults). Breaking down what this function does: > # Read the max timestamp from the timeindex. Could lead to a disk seek. > # Read the max offset from the index. Could lead to a disk seek. > # Append the timestamp and offset of the most recently written message to the > timeindex if it hasn't been written there for some reason. > # Truncate the index file > ## Get the position in the index of the last entry written > ## If on Windows then unmap and close the index > ## reopen > ## truncate to the number of entries * entry size. (ftruncate() system call) > ## mmap() > ## Set the position back to where it was before the original. Leads to > lseek() system call. > ## Close the newly reopenned and mapped index > # Same thing as #4 but for the timeindex. > ## Get the position in the timeindex of the last entry written > ## If on Windows then unmap and close the timeindex > ## reopen > ## truncate to the number of entries * entry size. (ftruncate() system call) > ## mmap() > ## Set the position back to where it was before the original. Leads to > lseek() system call. > ## Close the newly reopenned and mapped timeindex > # Finalize the log segment > ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for > that log segment. > ## Truncate the log segment if it doesn't have enough messages written to > fill up the whole thing. Potentially leads to a ftruncate() system call. > ## Set the position to the end of the segment after truncation. Leads to a > lseek() system call. > ## Close and unmap the channel. > Looking in to the current implementation of kafka.log.AbstractIndex#resize, > it appears to do quite a bit of extra work to avoid keeping an instance of > RandomAccessFile around. It has to reopen the file, truncate, mmap(), > potentially perform an additional disk seek, all before imediately closing > the file. > You wouldn't think this would amount to much, but I put together a benchmark > using jmh to measure the difference between the current code and a new > implementation that didn't have to recreate the page mapping during resize(), > and the difference is pretty dramatic. > {noformat} > Result "currentImplementation": > 2063.386 ±(99.9%) 81.758 ops/s [Average] > (min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863 > CI (99.9%): [1981.628, 2145.144] (assumes normal distribution) > Result "optimizedImplementation": > 3497.354 ±(99.9%) 31.575 ops/s [Average] > (min, avg,
[jira] [Updated] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments
[ https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Ambroff updated KAFKA-5297: Issue Type: Bug (was: Improvement) > Broker can take a long time to shut down if there are many active log segments > -- > > Key: KAFKA-5297 > URL: https://issues.apache.org/jira/browse/KAFKA-5297 > Project: Kafka > Issue Type: Bug >Reporter: Kyle Ambroff >Priority: Minor > Attachments: LogSegmentBenchmark.java, shutdown-flame-graph.png > > > After the changes for KIP-33 were merged, we started noticing that our > cluster restart times were quite a bit longer. In some cases it was taking > four times as long as expected to do a rolling restart of every broker in the > cluster. This meant that doing a deploy to one of our Kafka clusters went > from taking about 3 hours to more than 12 hours! > We looked into this and we have some data from a couple of runs with a > sampling profiler. It turns out that it isn't unusual for us to have a broker > sit in kafka.log.Log#close for up to 30 minutes if it has been running for > several weeks. There are just so many active log segments that it just takes > a long time to truncate all of the indexes. > I've attached a flame graph that was generated from 10 minutes of stack > samples collected during shutdown of a broker that took about 30 minutes > total to shut down cleanly. > * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where > every index and timeindex file is truncated to the size of the number of > entries in that index. > * Another big chunk of time is spent reading the last entry from the index, > which is used to make any final updates to the timeindex file. This is > something that can be cached. For a broker that's been running for a long > time the bulk of these indexes are not likely to be in the page cache > anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in > LogSegment, so we could add a cache for this as well. > Looking at these changes and considering KIP-33, it isn't surprising that the > broker shutdown time has increased so dramatically. The extra index plus the > extra reads have increased the amount of work performed by > kafka.log.Log#close by about 4x (in terms of system calls and potential page > faults). Breaking down what this function does: > # Read the max timestamp from the timeindex. Could lead to a disk seek. > # Read the max offset from the index. Could lead to a disk seek. > # Append the timestamp and offset of the most recently written message to the > timeindex if it hasn't been written there for some reason. > # Truncate the index file > ## Get the position in the index of the last entry written > ## If on Windows then unmap and close the index > ## reopen > ## truncate to the number of entries * entry size. (ftruncate() system call) > ## mmap() > ## Set the position back to where it was before the original. Leads to > lseek() system call. > ## Close the newly reopenned and mapped index > # Same thing as #4 but for the timeindex. > ## Get the position in the timeindex of the last entry written > ## If on Windows then unmap and close the timeindex > ## reopen > ## truncate to the number of entries * entry size. (ftruncate() system call) > ## mmap() > ## Set the position back to where it was before the original. Leads to > lseek() system call. > ## Close the newly reopenned and mapped timeindex > # Finalize the log segment > ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for > that log segment. > ## Truncate the log segment if it doesn't have enough messages written to > fill up the whole thing. Potentially leads to a ftruncate() system call. > ## Set the position to the end of the segment after truncation. Leads to a > lseek() system call. > ## Close and unmap the channel. > Looking in to the current implementation of kafka.log.AbstractIndex#resize, > it appears to do quite a bit of extra work to avoid keeping an instance of > RandomAccessFile around. It has to reopen the file, truncate, mmap(), > potentially perform an additional disk seek, all before imediately closing > the file. > You wouldn't think this would amount to much, but I put together a benchmark > using jmh to measure the difference between the current code and a new > implementation that didn't have to recreate the page mapping during resize(), > and the difference is pretty dramatic. > {noformat} > Result "currentImplementation": > 2063.386 ±(99.9%) 81.758 ops/s [Average] > (min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863 > CI (99.9%): [1981.628, 2145.144] (assumes normal distribution) > Result "optimizedImplementation": > 3497.354 ±(99.9%) 31.575 ops/s [Average] > (min, avg, max) = (
[GitHub] kafka pull request #3110: KAFKA-5297: Reduce time required for shutdown
GitHub user ambroff opened a pull request: https://github.com/apache/kafka/pull/3110 KAFKA-5297: Reduce time required for shutdown Avoid reopening and recreating page mappings for index files during truncation unless it's required by the platform. In my tests this increases throughput of Log#close by about 70%. See KAFKA-5297 for details. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ambroff/kafka KAFKA-5297 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3110.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3110 commit d648174f4ef8cedb3c46061e3e348ed83e01f26a Author: Kyle Ambroff Date: 2017-05-20T16:36:21Z KAFKA-5297: Reduce time required for shutdown Avoid reopening and recreating page mappings for index files during truncation unless it's required by the platform. In my tests this increases throughput of Log#close by about 70%. See KAFKA-5297 for details. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-5297) Broker can take a long time to shut down if there are many active log segments
[ https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16018542#comment-16018542 ] ASF GitHub Bot commented on KAFKA-5297: --- GitHub user ambroff opened a pull request: https://github.com/apache/kafka/pull/3110 KAFKA-5297: Reduce time required for shutdown Avoid reopening and recreating page mappings for index files during truncation unless it's required by the platform. In my tests this increases throughput of Log#close by about 70%. See KAFKA-5297 for details. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ambroff/kafka KAFKA-5297 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3110.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3110 commit d648174f4ef8cedb3c46061e3e348ed83e01f26a Author: Kyle Ambroff Date: 2017-05-20T16:36:21Z KAFKA-5297: Reduce time required for shutdown Avoid reopening and recreating page mappings for index files during truncation unless it's required by the platform. In my tests this increases throughput of Log#close by about 70%. See KAFKA-5297 for details. > Broker can take a long time to shut down if there are many active log segments > -- > > Key: KAFKA-5297 > URL: https://issues.apache.org/jira/browse/KAFKA-5297 > Project: Kafka > Issue Type: Bug >Reporter: Kyle Ambroff >Priority: Minor > Attachments: LogSegmentBenchmark.java, shutdown-flame-graph.png > > > After the changes for KIP-33 were merged, we started noticing that our > cluster restart times were quite a bit longer. In some cases it was taking > four times as long as expected to do a rolling restart of every broker in the > cluster. This meant that doing a deploy to one of our Kafka clusters went > from taking about 3 hours to more than 12 hours! > We looked into this and we have some data from a couple of runs with a > sampling profiler. It turns out that it isn't unusual for us to have a broker > sit in kafka.log.Log#close for up to 30 minutes if it has been running for > several weeks. There are just so many active log segments that it just takes > a long time to truncate all of the indexes. > I've attached a flame graph that was generated from 10 minutes of stack > samples collected during shutdown of a broker that took about 30 minutes > total to shut down cleanly. > * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where > every index and timeindex file is truncated to the size of the number of > entries in that index. > * Another big chunk of time is spent reading the last entry from the index, > which is used to make any final updates to the timeindex file. This is > something that can be cached. For a broker that's been running for a long > time the bulk of these indexes are not likely to be in the page cache > anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in > LogSegment, so we could add a cache for this as well. > Looking at these changes and considering KIP-33, it isn't surprising that the > broker shutdown time has increased so dramatically. The extra index plus the > extra reads have increased the amount of work performed by > kafka.log.Log#close by about 4x (in terms of system calls and potential page > faults). Breaking down what this function does: > # Read the max timestamp from the timeindex. Could lead to a disk seek. > # Read the max offset from the index. Could lead to a disk seek. > # Append the timestamp and offset of the most recently written message to the > timeindex if it hasn't been written there for some reason. > # Truncate the index file > ## Get the position in the index of the last entry written > ## If on Windows then unmap and close the index > ## reopen > ## truncate to the number of entries * entry size. (ftruncate() system call) > ## mmap() > ## Set the position back to where it was before the original. Leads to > lseek() system call. > ## Close the newly reopenned and mapped index > # Same thing as #4 but for the timeindex. > ## Get the position in the timeindex of the last entry written > ## If on Windows then unmap and close the timeindex > ## reopen > ## truncate to the number of entries * entry size. (ftruncate() system call) > ## mmap() > ## Set the position back to where it was before the original. Leads to > lseek() system call. > ## Close the newly reopenned and mapped timeindex > # Finalize the log segment > ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for > that log segment. > ## Truncate the lo
[jira] [Commented] (KAFKA-5266) Follow-up improvements for consumer offset reset tool (KIP-122)
[ https://issues.apache.org/jira/browse/KAFKA-5266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16018581#comment-16018581 ] Jorge Quilcate commented on KAFKA-5266: --- I like 'to-current', I'll implement it. On 4, you're right, NPE is possible in those cases. Wouldn't be better to filter those 'null's and print some warning logs before #toMap? And then catch any other exception on the all #resetOffsets implementation? > Follow-up improvements for consumer offset reset tool (KIP-122) > --- > > Key: KAFKA-5266 > URL: https://issues.apache.org/jira/browse/KAFKA-5266 > Project: Kafka > Issue Type: Bug > Components: tools >Reporter: Jason Gustafson >Assignee: Jorge Quilcate > Fix For: 0.11.0.0 > > > 1. We should try to ensure that offsets are in range for the topic partition. > We currently only verify this for the shift option. > 2. If you provide a CSV file, you shouldn't need to specify one of the > --all-topics or --topic options. > 3. We currently support a "reset to current offsets" option if none of the > supported reset options are provided. This seems kind of useless. Perhaps we > should just enforce that one of the reset options is provided. > 4. The command fails with an NPE if we cannot find one of the offsets we are > trying to reset. It would be better to raise an exception with a friendlier > message. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5298) MirrorMaker deadlocks with missing topics
Raymond Conn created KAFKA-5298: --- Summary: MirrorMaker deadlocks with missing topics Key: KAFKA-5298 URL: https://issues.apache.org/jira/browse/KAFKA-5298 Project: Kafka Issue Type: Bug Components: clients, tools Affects Versions: 0.10.2.1 Reporter: Raymond Conn When mirrorMaker mirrors a topic to destination brokers that have topic auto create disabled and a topic doesn't exist on the destination brokers, the producer in mirror maker logs the following {code} Error while fetching metadata with correlation id 467 : \{mirror-test2=UNKNOWN_TOPIC_OR_PARTITION\} Error while fetching metadata with correlation id 468 : {mirror-test2=UNKNOWN_TOPIC_OR_PARTITION} {code} This log message is fine and expected. The problem is the log message stops ~5 min later. At which point the logs look fine, but mirror maker is not mirroring any of its topics. What's worse is mirrorMaker is basically in an unrecoverable state once this happens (the log statement stops). If you create the topic at the destination mirrorMaker still won't mirror data until a restart. Attempts to restart mirrorMaker (cleanly) fail because the process is more or less deadlocked in its shutdown hook. Here is the reasoning: * MirrorMaker becomes unrecoverable after 5 minutes because of this loop in the [producer|https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L543-L561] * The producer will keep waiting for metadata for the missing topic or until the max timeout is reached. (max long in this case) * after 5 minutes the producer stops making a metadata request for the topic because that topic expires [here|https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L218] * topic is never re-added for metadata requests since the only add is before entering the loop [here|(https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L528] So basically after 5 minutes all metadata requests moving forward are for no topics since the topic expired. The mirrorMaker thread essentially gets stuck waiting forever since there will never be a metadata request for the topic the thread is waiting on All of this basically leads to a deadlock state in the shutdown hook. * shutdown hook sends a shutdown to the mirrorMaker threads * waits for threads to exit their loop by waitind on a [latch|https://github.com/apache/kafka/blob/0.10.2/core/src/main/scala/kafka/tools/MirrorMaker.scala#L396] * latch is never counted down in [produce|https://github.com/apache/kafka/blob/0.10.2/core/src/main/scala/kafka/tools/MirrorMaker.scala#L434] * thread will never exit the loop to countdown the latch on line 462. This can be seen with a thread dump of the shutdown hook thread {code} Name: MirrorMakerShutdownHook State: WAITING on java.util.concurrent.CountDownLatch$Sync@3ffebeac Total blocked: 0 Total waited: 1 Stack trace: sun.misc.Unsafe.park(Native Method) java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) kafka.tools.MirrorMaker$MirrorMakerThread.awaitShutdown(MirrorMaker.scala:498) kafka.tools.MirrorMaker$$anonfun$cleanShutdown$4.apply(MirrorMaker.scala:396) kafka.tools.MirrorMaker$$anonfun$cleanShutdown$4.apply(MirrorMaker.scala:396) scala.collection.Iterator$class.foreach(Iterator.scala:893) {code} The root of the issue more or less related to the issue documented here where the producer can block waiting for metadata. https://issues.apache.org/jira/browse/KAFKA-3450 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3450) Producer blocks on send to topic that doesn't exist if auto create is disabled
[ https://issues.apache.org/jira/browse/KAFKA-3450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16018675#comment-16018675 ] Raymond Conn commented on KAFKA-3450: - Also it looks like if the {{max.block.ms}} is greater than 5 minutes the caller will be forced to wait the max block time even if the topic does get created. After 5 minutes the topic will get removed from the topics map inside of the [Metadata | https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L218] class. It will never get added back since it looks like the only [add | https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L528] is out side the [loop | https://github.com/apache/kafka/blob/e06cd3e55f25a0bb414e0770493906ea8019420a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L561] that the producer is "blocking" on. Basically after 5 minutes all metadata requests are for no topics. {code} 2017-05-20 20:20:05 DEBUG NetworkClient - Sending metadata request (type=MetadataRequest, topics=topic-test2) to node 2 2017-05-20 20:20:05 TRACE NetworkClient - Sending {topics=[topic-test2]} to node 2. 2017-05-20 20:20:12 DEBUG Metadata - Removing unused topic topic-test2 from the metadata list, expiryMs 1495324449201 now 1495326005464 2017-05-20 20:20:22 DEBUG NetworkClient - Sending metadata request (type=MetadataRequest, topics=) to node 5 2017-05-20 20:20:22 TRACE NetworkClient - Sending {topics=[]} to node 5. 2017-05-20 20:20:35 DEBUG NetworkClient - Sending metadata request (type=MetadataRequest, topics=) to node 2 2017-05-20 20:20:35 TRACE NetworkClient - Sending {topics=[]} to node 2. 2017-05-20 20:20:48 DEBUG NetworkClient - Sending metadata request (type=MetadataRequest, topics=) to node 1 2017-05-20 20:20:48 TRACE NetworkClient - Sending {topics=[]} to node 1. {code} > Producer blocks on send to topic that doesn't exist if auto create is disabled > -- > > Key: KAFKA-3450 > URL: https://issues.apache.org/jira/browse/KAFKA-3450 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.9.0.1 >Reporter: Michal Turek >Priority: Critical > > {{producer.send()}} is blocked for {{max.block.ms}} (default 60 seconds) if > the destination topic doesn't exist and if their automatic creation is > disabled. Warning from NetworkClient containing UNKNOWN_TOPIC_OR_PARTITION is > logged every 100 ms in a loop until the 60 seconds timeout expires, but the > operation is not recoverable. > Preconditions > - Kafka 0.9.0.1 with default configuration and auto.create.topics.enable=false > - Kafka 0.9.0.1 clients. > Example minimalist code > https://github.com/avast/kafka-tests/blob/master/src/main/java/com/avast/kafkatests/othertests/nosuchtopic/NoSuchTopicTest.java > {noformat} > /** > * Test of sending to a topic that does not exist while automatic creation of > topics is disabled in Kafka (auto.create.topics.enable=false). > */ > public class NoSuchTopicTest { > private static final Logger LOGGER = > LoggerFactory.getLogger(NoSuchTopicTest.class); > public static void main(String[] args) { > Properties properties = new Properties(); > properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, > NoSuchTopicTest.class.getSimpleName()); > properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); > // Default is 60 seconds > try (Producer producer = new > KafkaProducer<>(properties, new StringSerializer(), new StringSerializer())) { > LOGGER.info("Sending message"); > producer.send(new ProducerRecord<>("ThisTopicDoesNotExist", > "key", "value"), (metadata, exception) -> { > if (exception != null) { > LOGGER.error("Send failed: {}", exception.toString()); > } else { > LOGGER.info("Send successful: {}-{}/{}", > metadata.topic(), metadata.partition(), metadata.offset()); > } > }); > LOGGER.info("Sending message"); > producer.send(new ProducerRecord<>("ThisTopicDoesNotExistToo", > "key", "value"), (metadata, exception) -> { > if (exception != null) { > LOGGER.error("Send failed: {}", exception.toString()); > } else { > LOGGER.info("Send successful: {}-{}/{}", > metadata.topic(), metadata.partition(), metadata.offset()); > } > }); > } > } > } > {noformat} > Relate
[jira] [Created] (KAFKA-5299) MirrorMaker with New.consumer doesn't consume message from multiple topics whitelisted
Jyoti created KAFKA-5299: Summary: MirrorMaker with New.consumer doesn't consume message from multiple topics whitelisted Key: KAFKA-5299 URL: https://issues.apache.org/jira/browse/KAFKA-5299 Project: Kafka Issue Type: Bug Affects Versions: 0.9.0.1 Reporter: Jyoti -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] kafka pull request #3111: HOTFIX: Increase timeouts in distributed connect t...
GitHub user kkonstantine opened a pull request: https://github.com/apache/kafka/pull/3111 HOTFIX: Increase timeouts in distributed connect tests. * There's considerably increased logging in DEBUG mode due to the class scanning performed with the new isolation scheme. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kkonstantine/kafka HOTFIX-Increase-timeouts-in-distributed-connect-tests-to-handle-increase-startup-due-to-class-scanning Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3111.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3111 commit 5538eaa65d7ed614061d6de52ce7f61f7dd7fcd8 Author: Konstantine Karantasis Date: 2017-05-21T05:45:40Z HOTFIX: Increase timeouts in distributed connect tests. * There's considerably increased logging due to the class scanning performed with the new isolation scheme in DEBUG mode. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---