[jira] [Commented] (KAFKA-5296) Unable to write to some partitions of newly created topic in 10.2
[ https://issues.apache.org/jira/browse/KAFKA-5296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098063#comment-16098063 ] huxihx commented on KAFKA-5296: --- There is only one active controller in a cluster. How did you tell multiple controllers co-existed in your environment? > Unable to write to some partitions of newly created topic in 10.2 > - > > Key: KAFKA-5296 > URL: https://issues.apache.org/jira/browse/KAFKA-5296 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.2.0 >Reporter: Abhisek Saikia > > We are using kafka 10.2 and the cluster was running fine for a month with 50 > topics and now we are having issue in producing message by creating new > topics. The create topic command is successful but producers are throwing > error while writing to some partitions. > Error in producer- > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > [topic1]-8: 30039 ms has passed since batch creation plus linger time > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:70) > ~[kafka-clients-0.10.2.0.jar:na] > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:57) > ~[kafka-clients-0.10.2.0.jar:na] > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) > ~[kafka-clients-0.10.2.0.jar:na] > On the broker side, I don't see any topic-parition folder getting created for > the broker who is the leader for the partition. > While using 0.8 client, the write used to hang while it starts writing to the > partition having this issue. With 10.2 it resolved the the producer hang issue > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
Vincent Maurin created KAFKA-5630: - Summary: Consumer poll loop over the same record after a CorruptRecordException Key: KAFKA-5630 URL: https://issues.apache.org/jira/browse/KAFKA-5630 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.11.0.0 Reporter: Vincent Maurin Hello While consuming a topic with log compaction enabled, I am getting an infinite consumption loop of the same record, i.e, each call to poll is returning to me 500 times one record (500 is my max.poll.records). I am using the java client 0.11.0.0. Running the code with the debugger, the initial problem come from `Fetcher.PartitionRecords,fetchRecords()`. Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record size is less than the minimum record overhead (14)` Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the last record. I guess the corruption problem is similar too https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098117#comment-16098117 ] huxihx commented on KAFKA-5630: --- Did you enable `preallocate` for the topic? > Consumer poll loop over the same record after a CorruptRecordException > -- > > Key: KAFKA-5630 > URL: https://issues.apache.org/jira/browse/KAFKA-5630 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Vincent Maurin > > Hello > While consuming a topic with log compaction enabled, I am getting an infinite > consumption loop of the same record, i.e, each call to poll is returning to > me 500 times one record (500 is my max.poll.records). I am using the java > client 0.11.0.0. > Running the code with the debugger, the initial problem come from > `Fetcher.PartitionRecords,fetchRecords()`. > Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record > size is less than the minimum record overhead (14)` > Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test > block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the > last record. > I guess the corruption problem is similar too > https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the > client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098124#comment-16098124 ] Vincent Maurin commented on KAFKA-5630: --- Just checking and it seems not. I don't have this option in the broker config and the default seems to be false > Consumer poll loop over the same record after a CorruptRecordException > -- > > Key: KAFKA-5630 > URL: https://issues.apache.org/jira/browse/KAFKA-5630 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Vincent Maurin > > Hello > While consuming a topic with log compaction enabled, I am getting an infinite > consumption loop of the same record, i.e, each call to poll is returning to > me 500 times one record (500 is my max.poll.records). I am using the java > client 0.11.0.0. > Running the code with the debugger, the initial problem come from > `Fetcher.PartitionRecords,fetchRecords()`. > Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record > size is less than the minimum record overhead (14)` > Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test > block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the > last record. > I guess the corruption problem is similar too > https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the > client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098129#comment-16098129 ] Vincent Maurin commented on KAFKA-5630: --- A rolling upgrade from 0.10.2.0 has also been performed a couple of week ago. Could be a reason for the corruption problem ? > Consumer poll loop over the same record after a CorruptRecordException > -- > > Key: KAFKA-5630 > URL: https://issues.apache.org/jira/browse/KAFKA-5630 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Vincent Maurin > > Hello > While consuming a topic with log compaction enabled, I am getting an infinite > consumption loop of the same record, i.e, each call to poll is returning to > me 500 times one record (500 is my max.poll.records). I am using the java > client 0.11.0.0. > Running the code with the debugger, the initial problem come from > `Fetcher.PartitionRecords,fetchRecords()`. > Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record > size is less than the minimum record overhead (14)` > Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test > block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the > last record. > I guess the corruption problem is similar too > https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the > client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098134#comment-16098134 ] huxihx commented on KAFKA-5630: --- Seems it's a duplicate of [KAFKA-5431|https://issues.apache.org/jira/browse/KAFKA-5431]. Could you run "bin/kafka-run-class.sh kafka.tools.DumpLogSegments" to see verify that the underlying log file does contain the corrupted records? > Consumer poll loop over the same record after a CorruptRecordException > -- > > Key: KAFKA-5630 > URL: https://issues.apache.org/jira/browse/KAFKA-5630 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Vincent Maurin > > Hello > While consuming a topic with log compaction enabled, I am getting an infinite > consumption loop of the same record, i.e, each call to poll is returning to > me 500 times one record (500 is my max.poll.records). I am using the java > client 0.11.0.0. > Running the code with the debugger, the initial problem come from > `Fetcher.PartitionRecords,fetchRecords()`. > Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record > size is less than the minimum record overhead (14)` > Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test > block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the > last record. > I guess the corruption problem is similar too > https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the > client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098154#comment-16098154 ] Vincent Maurin commented on KAFKA-5630: --- It is ``` offset: 210648 position: 172156054 CreateTime: 1499416798791 isvalid: true size: 610 magic: 1 compresscodec: NONE crc: 1846714374 offset: 210649 position: 172156664 CreateTime: 1499416798796 isvalid: true size: 586 magic: 1 compresscodec: NONE crc: 3995473502 offset: 210650 position: 172157250 CreateTime: 1499416798798 isvalid: true size: 641 magic: 1 compresscodec: NONE crc: 2352501239 Exception in thread "main" org.apache.kafka.common.errors.CorruptRecordException: Record size is smaller than minimum record overhead (14). ``` > Consumer poll loop over the same record after a CorruptRecordException > -- > > Key: KAFKA-5630 > URL: https://issues.apache.org/jira/browse/KAFKA-5630 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Vincent Maurin > > Hello > While consuming a topic with log compaction enabled, I am getting an infinite > consumption loop of the same record, i.e, each call to poll is returning to > me 500 times one record (500 is my max.poll.records). I am using the java > client 0.11.0.0. > Running the code with the debugger, the initial problem come from > `Fetcher.PartitionRecords,fetchRecords()`. > Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record > size is less than the minimum record overhead (14)` > Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test > block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the > last record. > I guess the corruption problem is similar too > https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the > client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098154#comment-16098154 ] Vincent Maurin edited comment on KAFKA-5630 at 7/24/17 9:44 AM: It is {noformat} offset: 210648 position: 172156054 CreateTime: 1499416798791 isvalid: true size: 610 magic: 1 compresscodec: NONE crc: 1846714374 offset: 210649 position: 172156664 CreateTime: 1499416798796 isvalid: true size: 586 magic: 1 compresscodec: NONE crc: 3995473502 offset: 210650 position: 172157250 CreateTime: 1499416798798 isvalid: true size: 641 magic: 1 compresscodec: NONE crc: 2352501239 Exception in thread "main" org.apache.kafka.common.errors.CorruptRecordException: Record size is smaller than minimum record overhead (14). {noformat} was (Author: vmaurin_glispa): It is ``` offset: 210648 position: 172156054 CreateTime: 1499416798791 isvalid: true size: 610 magic: 1 compresscodec: NONE crc: 1846714374 offset: 210649 position: 172156664 CreateTime: 1499416798796 isvalid: true size: 586 magic: 1 compresscodec: NONE crc: 3995473502 offset: 210650 position: 172157250 CreateTime: 1499416798798 isvalid: true size: 641 magic: 1 compresscodec: NONE crc: 2352501239 Exception in thread "main" org.apache.kafka.common.errors.CorruptRecordException: Record size is smaller than minimum record overhead (14). ``` > Consumer poll loop over the same record after a CorruptRecordException > -- > > Key: KAFKA-5630 > URL: https://issues.apache.org/jira/browse/KAFKA-5630 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Vincent Maurin > > Hello > While consuming a topic with log compaction enabled, I am getting an infinite > consumption loop of the same record, i.e, each call to poll is returning to > me 500 times one record (500 is my max.poll.records). I am using the java > client 0.11.0.0. > Running the code with the debugger, the initial problem come from > `Fetcher.PartitionRecords,fetchRecords()`. > Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record > size is less than the minimum record overhead (14)` > Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test > block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the > last record. > I guess the corruption problem is similar too > https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the > client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-1595) Remove deprecated and slower scala JSON parser
[ https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-1595: --- Summary: Remove deprecated and slower scala JSON parser (was: Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount) > Remove deprecated and slower scala JSON parser > -- > > Key: KAFKA-1595 > URL: https://issues.apache.org/jira/browse/KAFKA-1595 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.8.1.1 >Reporter: Jagbir >Assignee: Ismael Juma > Labels: newbie > > The following issue is created as a follow up suggested by Jun Rao > in a kafka news group message with the Subject > "Blocking Recursive parsing from > kafka.consumer.TopicCount$.constructTopicCount" > SUMMARY: > An issue was detected in a typical cluster of 3 kafka instances backed > by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3, > java version 1.7.0_65). On consumer end, when consumers get recycled, > there is a troubling JSON parsing recursion which takes a busy lock and > blocks consumers thread pool. > In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes > a global lock (0xd3a7e1d0) during the rebalance, and fires an > expensive JSON parsing, while keeping the other consumers from shutting > down, see, e.g, > at > kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161) > The deep recursive JSON parsing should be deprecated in favor > of a better JSON parser, see, e.g, > http://engineering.ooyala.com/blog/comparing-scala-json-libraries? > DETAILS: > The first dump is for a recursive blocking thread holding the lock for > 0xd3a7e1d0 > and the subsequent dump is for a waiting thread. > (Please grep for 0xd3a7e1d0 to see the locked object.) > Â > -8<- > "Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor" > prio=10 tid=0x7f24dc285800 nid=0xda9 runnable [0x7f249e40b000] > java.lang.Thread.State: RUNNABLE > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Pars
[jira] [Created] (KAFKA-5631) Use Jackson for serialising to JSON
Ismael Juma created KAFKA-5631: -- Summary: Use Jackson for serialising to JSON Key: KAFKA-5631 URL: https://issues.apache.org/jira/browse/KAFKA-5631 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma Fix For: 0.11.1.0 We currently serialise to JSON via a manually written method `Json.encode`. The implementation is naive: it does a lot of unnecessary String concatenation and it doesn't handle escaping well. KAFKA-1595 switches to Jackson for parsing, so it would make sense to do this after that one is merged. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-5631) Use Jackson for serialising to JSON
[ https://issues.apache.org/jira/browse/KAFKA-5631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Umesh Chaudhary reassigned KAFKA-5631: -- Assignee: Umesh Chaudhary > Use Jackson for serialising to JSON > --- > > Key: KAFKA-5631 > URL: https://issues.apache.org/jira/browse/KAFKA-5631 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Umesh Chaudhary > Labels: newbie > Fix For: 0.11.1.0 > > > We currently serialise to JSON via a manually written method `Json.encode`. > The implementation is naive: it does a lot of unnecessary String > concatenation and it doesn't handle escaping well. > KAFKA-1595 switches to Jackson for parsing, so it would make sense to do this > after that one is merged. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5631) Use Jackson for serialising to JSON
[ https://issues.apache.org/jira/browse/KAFKA-5631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098221#comment-16098221 ] Umesh Chaudhary commented on KAFKA-5631: [~ijuma], assigning this to myself and will start working on it after the KAFKA-1595 will be resolved with Jackson changes. > Use Jackson for serialising to JSON > --- > > Key: KAFKA-5631 > URL: https://issues.apache.org/jira/browse/KAFKA-5631 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Umesh Chaudhary > Labels: newbie > Fix For: 0.11.1.0 > > > We currently serialise to JSON via a manually written method `Json.encode`. > The implementation is naive: it does a lot of unnecessary String > concatenation and it doesn't handle escaping well. > KAFKA-1595 switches to Jackson for parsing, so it would make sense to do this > after that one is merged. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5629) Console Consumer overrides auto.offset.reset property when provided on the command line without warning about it.
[ https://issues.apache.org/jira/browse/KAFKA-5629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098230#comment-16098230 ] ASF GitHub Bot commented on KAFKA-5629: --- GitHub user soenkeliebau opened a pull request: https://github.com/apache/kafka/pull/3566 KAFKA-5629: Added a warn message to the output of ConsoleConsumer when "auto.offset.reset" property is specified on the command line but overridden by the code during startup. Currently the ConsoleConsumer silently overrides that setting, which can create confusing behavior. You can merge this pull request into a Git repository by running: $ git pull https://github.com/soenkeliebau/kafka KAFKA-5629 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3566.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3566 commit 6416efd6cd8badf5d13228eae2f126ebf6bf1f99 Author: Soenke Liebau Date: 2017-07-24T09:08:28Z KAFKA-5629: Added a warn message to the output of ConsoleConsumer when auto.offset.reset property is specified on the command line but overridden by the code during startup. > Console Consumer overrides auto.offset.reset property when provided on the > command line without warning about it. > - > > Key: KAFKA-5629 > URL: https://issues.apache.org/jira/browse/KAFKA-5629 > Project: Kafka > Issue Type: Improvement > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Sönke Liebau >Assignee: Sönke Liebau >Priority: Trivial > > The console consumer allows to specify consumer options on the command line > with the --consumer-property parameter. > In the case of auto.offset.reset this parameter will always silently be > ignored though, because this behavior is controlled via the --from-beginning > parameter. > I believe that behavior to be fine, however we should log a warning in case > auto.offset.reset is specified on the command line and overridden to > something else in the code to avoid potential confusion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098253#comment-16098253 ] Ismael Juma commented on KAFKA-5630: [~vmaurin_glispa], the consumer behaviour is as expected. The application should decide whether it wants to skip the bad record (via `seek`) or not. However, we should figure out if the corruption is due to a bug in Kafka. And fix it, if that's the case. > Consumer poll loop over the same record after a CorruptRecordException > -- > > Key: KAFKA-5630 > URL: https://issues.apache.org/jira/browse/KAFKA-5630 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Vincent Maurin > > Hello > While consuming a topic with log compaction enabled, I am getting an infinite > consumption loop of the same record, i.e, each call to poll is returning to > me 500 times one record (500 is my max.poll.records). I am using the java > client 0.11.0.0. > Running the code with the debugger, the initial problem come from > `Fetcher.PartitionRecords,fetchRecords()`. > Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record > size is less than the minimum record overhead (14)` > Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test > block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the > last record. > I guess the corruption problem is similar too > https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the > client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098254#comment-16098254 ] Ismael Juma commented on KAFKA-5630: Have you had any other issues since the rolling upgrade? > Consumer poll loop over the same record after a CorruptRecordException > -- > > Key: KAFKA-5630 > URL: https://issues.apache.org/jira/browse/KAFKA-5630 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Vincent Maurin > > Hello > While consuming a topic with log compaction enabled, I am getting an infinite > consumption loop of the same record, i.e, each call to poll is returning to > me 500 times one record (500 is my max.poll.records). I am using the java > client 0.11.0.0. > Running the code with the debugger, the initial problem come from > `Fetcher.PartitionRecords,fetchRecords()`. > Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record > size is less than the minimum record overhead (14)` > Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test > block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the > last record. > I guess the corruption problem is similar too > https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the > client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098263#comment-16098263 ] Vincent Maurin commented on KAFKA-5630: --- [~ijuma] thank you for your feedback. Regarding consumer, I have test with version 0.10.2.1 and it is actually throwing the error if calling "poll". Then it sounds fair enough to skip the record with seek. But with 0.11, I don't get any error, a call to poll just returns the same record duplicated max.poll.record. The logic then to seek for the next offsets is more complicated than reacting to the exception, it sounds for me that I have to compare records returned by poll and advance my offset if they are all equals ? Or am I misusing the client ? (It is a manual assigned partition use case, without committing offsets to kafka, I have tried to follow the recommendations in the KafkaConsumer javadoc for that) > Consumer poll loop over the same record after a CorruptRecordException > -- > > Key: KAFKA-5630 > URL: https://issues.apache.org/jira/browse/KAFKA-5630 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Vincent Maurin > > Hello > While consuming a topic with log compaction enabled, I am getting an infinite > consumption loop of the same record, i.e, each call to poll is returning to > me 500 times one record (500 is my max.poll.records). I am using the java > client 0.11.0.0. > Running the code with the debugger, the initial problem come from > `Fetcher.PartitionRecords,fetchRecords()`. > Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record > size is less than the minimum record overhead (14)` > Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test > block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the > last record. > I guess the corruption problem is similar too > https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the > client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098276#comment-16098276 ] Vincent Maurin commented on KAFKA-5630: --- And I haven't noticed any other issues so far. After a check with the DumpLogSegments tool, it appears that 2 partitions where impacted both on the same topic. I had log cleaner errors for these two partitions (same as the consumer one). > Consumer poll loop over the same record after a CorruptRecordException > -- > > Key: KAFKA-5630 > URL: https://issues.apache.org/jira/browse/KAFKA-5630 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Vincent Maurin > > Hello > While consuming a topic with log compaction enabled, I am getting an infinite > consumption loop of the same record, i.e, each call to poll is returning to > me 500 times one record (500 is my max.poll.records). I am using the java > client 0.11.0.0. > Running the code with the debugger, the initial problem come from > `Fetcher.PartitionRecords,fetchRecords()`. > Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record > size is less than the minimum record overhead (14)` > Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test > block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the > last record. > I guess the corruption problem is similar too > https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the > client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-5630: --- Fix Version/s: 0.11.0.1 > Consumer poll loop over the same record after a CorruptRecordException > -- > > Key: KAFKA-5630 > URL: https://issues.apache.org/jira/browse/KAFKA-5630 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Vincent Maurin >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1 > > > Hello > While consuming a topic with log compaction enabled, I am getting an infinite > consumption loop of the same record, i.e, each call to poll is returning to > me 500 times one record (500 is my max.poll.records). I am using the java > client 0.11.0.0. > Running the code with the debugger, the initial problem come from > `Fetcher.PartitionRecords,fetchRecords()`. > Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record > size is less than the minimum record overhead (14)` > Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test > block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the > last record. > I guess the corruption problem is similar too > https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the > client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-5630: --- Labels: reliability (was: ) > Consumer poll loop over the same record after a CorruptRecordException > -- > > Key: KAFKA-5630 > URL: https://issues.apache.org/jira/browse/KAFKA-5630 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Vincent Maurin > Labels: reliability > Fix For: 0.11.0.1 > > > Hello > While consuming a topic with log compaction enabled, I am getting an infinite > consumption loop of the same record, i.e, each call to poll is returning to > me 500 times one record (500 is my max.poll.records). I am using the java > client 0.11.0.0. > Running the code with the debugger, the initial problem come from > `Fetcher.PartitionRecords,fetchRecords()`. > Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record > size is less than the minimum record overhead (14)` > Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test > block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the > last record. > I guess the corruption problem is similar too > https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the > client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098279#comment-16098279 ] Ismael Juma commented on KAFKA-5630: Oh, I see. That's definitely a bug, you should be getting an error. cc [~lindong], [~becket_qin] [~hachikuji] who contributed or reviewed changes in this area. > Consumer poll loop over the same record after a CorruptRecordException > -- > > Key: KAFKA-5630 > URL: https://issues.apache.org/jira/browse/KAFKA-5630 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Vincent Maurin > Labels: reliability > Fix For: 0.11.0.1 > > > Hello > While consuming a topic with log compaction enabled, I am getting an infinite > consumption loop of the same record, i.e, each call to poll is returning to > me 500 times one record (500 is my max.poll.records). I am using the java > client 0.11.0.0. > Running the code with the debugger, the initial problem come from > `Fetcher.PartitionRecords,fetchRecords()`. > Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record > size is less than the minimum record overhead (14)` > Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test > block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the > last record. > I guess the corruption problem is similar too > https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the > client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-5630: --- Priority: Critical (was: Major) > Consumer poll loop over the same record after a CorruptRecordException > -- > > Key: KAFKA-5630 > URL: https://issues.apache.org/jira/browse/KAFKA-5630 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Vincent Maurin >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1 > > > Hello > While consuming a topic with log compaction enabled, I am getting an infinite > consumption loop of the same record, i.e, each call to poll is returning to > me 500 times one record (500 is my max.poll.records). I am using the java > client 0.11.0.0. > Running the code with the debugger, the initial problem come from > `Fetcher.PartitionRecords,fetchRecords()`. > Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record > size is less than the minimum record overhead (14)` > Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test > block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the > last record. > I guess the corruption problem is similar too > https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the > client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-5630: --- Labels: regression reliability (was: reliability) > Consumer poll loop over the same record after a CorruptRecordException > -- > > Key: KAFKA-5630 > URL: https://issues.apache.org/jira/browse/KAFKA-5630 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Vincent Maurin >Priority: Critical > Labels: regression, reliability > Fix For: 0.11.0.1 > > > Hello > While consuming a topic with log compaction enabled, I am getting an infinite > consumption loop of the same record, i.e, each call to poll is returning to > me 500 times one record (500 is my max.poll.records). I am using the java > client 0.11.0.0. > Running the code with the debugger, the initial problem come from > `Fetcher.PartitionRecords,fetchRecords()`. > Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record > size is less than the minimum record overhead (14)` > Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test > block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the > last record. > I guess the corruption problem is similar too > https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the > client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic
[ https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098554#comment-16098554 ] Bart Vercammen commented on KAFKA-5386: --- [~mjsax] Thanks for the reply. The `through("...")` would already help me a little bit, thanks for the tip. With respect to the changelog topics, will this be included in upcoming versions of KafkaStreams? It actually does make sense to be able to define the topic names myself, especially with Kafka ACLs in play. It would be nice to be able to configure the changelog topic _prefix_ and _suffix_ somehow. I have a local fork where I implemented this, it's a bit dirty and purely as a proof of concept, but would it make sense to rework this into a valid PR? > [Kafka Streams] - custom name for state-store change-log topic > -- > > Key: KAFKA-5386 > URL: https://issues.apache.org/jira/browse/KAFKA-5386 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Bart Vercammen > > Currently, when working with Kafka backed state stores in Kafka Streams, > these log compacted topics are given a hardcoded name : > _my.app.id-storename-changelog_ > {noformat}public static String storeChangelogTopic(String applicationId, > String storeName) { > return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX; > }{noformat} > It would be nice if somehow I would be able to override this functionality > and provide the topic-name myself when creating the state-store. > Any comments? > Would it be OK to submit a PR for this? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic
[ https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098554#comment-16098554 ] Bart Vercammen edited comment on KAFKA-5386 at 7/24/17 3:18 PM: [~mjsax] Thanks for the reply. The {{through("...")}} would already help me a little bit, thanks for the tip. With respect to the changelog topics, will this be included in upcoming versions of KafkaStreams? It actually does make sense to be able to define the topic names myself, especially with Kafka ACLs in play. It would be nice to be able to configure the changelog topic _prefix_ and _suffix_ somehow. I have a local fork where I implemented this, it's a bit dirty and purely as a proof of concept, but would it make sense to rework this into a valid PR? was (Author: cloutrix): [~mjsax] Thanks for the reply. The `through("...")` would already help me a little bit, thanks for the tip. With respect to the changelog topics, will this be included in upcoming versions of KafkaStreams? It actually does make sense to be able to define the topic names myself, especially with Kafka ACLs in play. It would be nice to be able to configure the changelog topic _prefix_ and _suffix_ somehow. I have a local fork where I implemented this, it's a bit dirty and purely as a proof of concept, but would it make sense to rework this into a valid PR? > [Kafka Streams] - custom name for state-store change-log topic > -- > > Key: KAFKA-5386 > URL: https://issues.apache.org/jira/browse/KAFKA-5386 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Bart Vercammen > > Currently, when working with Kafka backed state stores in Kafka Streams, > these log compacted topics are given a hardcoded name : > _my.app.id-storename-changelog_ > {noformat}public static String storeChangelogTopic(String applicationId, > String storeName) { > return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX; > }{noformat} > It would be nice if somehow I would be able to override this functionality > and provide the topic-name myself when creating the state-store. > Any comments? > Would it be OK to submit a PR for this? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5632) Message headers not supported by Kafka Streams
CJ Woolard created KAFKA-5632: - Summary: Message headers not supported by Kafka Streams Key: KAFKA-5632 URL: https://issues.apache.org/jira/browse/KAFKA-5632 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.11.0.0 Reporter: CJ Woolard Priority: Minor The new message headers functionality introduced in Kafka 0.11.0.0 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers) do not appear to be respected by Kafka Streams, specifically message headers set on input topics to a Kafka Streams topology do not get propagated to the corresponding output topics of the topology. It appears that it's at least partially due to the SourceNodeRecordDeserializer not properly respecting message headers here: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java#L60 where it isn't using the new ConsumerRecord constructor which supports headers: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java#L122 For additional background here is the line before which we noticed that we still have the message headers, and after which we no longer have them: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java#L93 In terms of a potential solution there are a few different scenarios to consider: 1. A stream processor with one input and one output, i.e. 1-to-1, (A map/transformation for example). This is the simplest case, and one proposal would be to directly propagate any message headers from input to output. 2. A stream processor with one input and many outputs, i.e. 1-to-many, (A flatmap step for example). 3. A stream processor with multiple inputs per output, i.e. many-to-1, (A join step for example). One proposal for supporting all possible scenarios would be to expose overloads in the Kafka Streams DSL methods to allow the user the ability to specify logic for handling of message headers. For additional background the use case is similar to a distributed tracing use case, where the following previous work may be useful for aiding in design discussions: Dapper (https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36356.pdf) or Zipkin (https://github.com/openzipkin/zipkin) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5632) Message headers not supported by Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-5632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] CJ Woolard updated KAFKA-5632: -- Description: The new message headers functionality introduced in Kafka 0.11.0.0 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers) does not appear to be respected by Kafka Streams, specifically message headers set on input topics to a Kafka Streams topology do not get propagated to the corresponding output topics of the topology. It appears that it's at least partially due to the SourceNodeRecordDeserializer not properly respecting message headers here: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java#L60 where it isn't using the new ConsumerRecord constructor which supports headers: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java#L122 For additional background here is the line before which we noticed that we still have the message headers, and after which we no longer have them: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java#L93 In terms of a potential solution there are a few different scenarios to consider: 1. A stream processor with one input and one output, i.e. 1-to-1, (A map/transformation for example). This is the simplest case, and one proposal would be to directly propagate any message headers from input to output. 2. A stream processor with one input and many outputs, i.e. 1-to-many, (A flatmap step for example). 3. A stream processor with multiple inputs per output, i.e. many-to-1, (A join step for example). One proposal for supporting all possible scenarios would be to expose overloads in the Kafka Streams DSL methods to allow the user the ability to specify logic for handling of message headers. For additional background the use case is similar to a distributed tracing use case, where the following previous work may be useful for aiding in design discussions: Dapper (https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36356.pdf) or Zipkin (https://github.com/openzipkin/zipkin) was: The new message headers functionality introduced in Kafka 0.11.0.0 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers) do not appear to be respected by Kafka Streams, specifically message headers set on input topics to a Kafka Streams topology do not get propagated to the corresponding output topics of the topology. It appears that it's at least partially due to the SourceNodeRecordDeserializer not properly respecting message headers here: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java#L60 where it isn't using the new ConsumerRecord constructor which supports headers: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java#L122 For additional background here is the line before which we noticed that we still have the message headers, and after which we no longer have them: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java#L93 In terms of a potential solution there are a few different scenarios to consider: 1. A stream processor with one input and one output, i.e. 1-to-1, (A map/transformation for example). This is the simplest case, and one proposal would be to directly propagate any message headers from input to output. 2. A stream processor with one input and many outputs, i.e. 1-to-many, (A flatmap step for example). 3. A stream processor with multiple inputs per output, i.e. many-to-1, (A join step for example). One proposal for supporting all possible scenarios would be to expose overloads in the Kafka Streams DSL methods to allow the user the ability to specify logic for handling of message headers. For additional background the use case is similar to a distributed tracing use case, where the following previous work may be useful for aiding in design discussions: Dapper (https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36356.pdf) or Zipkin (https://github.com/openzipkin/zipkin) > Message headers not supported by Kafka Streams > -- > > Key: KAFKA-5632 > URL: https://issues.apache.org/jira/browse/KAFKA-5632 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: CJ Woolard >Priority: Minor > > The new message headers functionality introduced in Kafka 0.11.0.0 > (https://cwiki.apache.org/conf
[jira] [Updated] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic
[ https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5386: --- Labels: needs-kip (was: ) > [Kafka Streams] - custom name for state-store change-log topic > -- > > Key: KAFKA-5386 > URL: https://issues.apache.org/jira/browse/KAFKA-5386 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Bart Vercammen > Labels: needs-kip > > Currently, when working with Kafka backed state stores in Kafka Streams, > these log compacted topics are given a hardcoded name : > _my.app.id-storename-changelog_ > {noformat}public static String storeChangelogTopic(String applicationId, > String storeName) { > return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX; > }{noformat} > It would be nice if somehow I would be able to override this functionality > and provide the topic-name myself when creating the state-store. > Any comments? > Would it be OK to submit a PR for this? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic
[ https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098591#comment-16098591 ] Matthias J. Sax commented on KAFKA-5386: Well. With regard to ACL, you can still know the names of the changelog topics: They follow the pattern `--changelog` -- thus, as long as you specify a store name for each `builder.table()` and count/reduce/aggregate and joins, you would know the changelog topic names and could adjust the ACL accordingly. (only if you omit a store name, Streams generates one). ATM, this feature request does not seem to be high priority. It always depends how many people ask for it. Of course, we are more than happy if anybody picks this up :) I guess, we would need a KIP though as this change impacts the public API. > [Kafka Streams] - custom name for state-store change-log topic > -- > > Key: KAFKA-5386 > URL: https://issues.apache.org/jira/browse/KAFKA-5386 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Bart Vercammen > Labels: needs-kip > > Currently, when working with Kafka backed state stores in Kafka Streams, > these log compacted topics are given a hardcoded name : > _my.app.id-storename-changelog_ > {noformat}public static String storeChangelogTopic(String applicationId, > String storeName) { > return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX; > }{noformat} > It would be nice if somehow I would be able to override this functionality > and provide the topic-name myself when creating the state-store. > Any comments? > Would it be OK to submit a PR for this? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic
[ https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098591#comment-16098591 ] Matthias J. Sax edited comment on KAFKA-5386 at 7/24/17 3:39 PM: - Well. With regard to ACL, you can still know the names of the changelog topics: They follow the pattern {{--changelog}} -- thus, as long as you specify a store name for each `builder.table()` and count/reduce/aggregate and joins, you would know the changelog topic names and could adjust the ACL accordingly. (only if you omit a store name, Streams generates one). ATM, this feature request does not seem to be high priority. It always depends how many people ask for it. Of course, we are more than happy if anybody picks this up :) I guess, we would need a KIP though as this change impacts the public API. was (Author: mjsax): Well. With regard to ACL, you can still know the names of the changelog topics: They follow the pattern `--changelog` -- thus, as long as you specify a store name for each `builder.table()` and count/reduce/aggregate and joins, you would know the changelog topic names and could adjust the ACL accordingly. (only if you omit a store name, Streams generates one). ATM, this feature request does not seem to be high priority. It always depends how many people ask for it. Of course, we are more than happy if anybody picks this up :) I guess, we would need a KIP though as this change impacts the public API. > [Kafka Streams] - custom name for state-store change-log topic > -- > > Key: KAFKA-5386 > URL: https://issues.apache.org/jira/browse/KAFKA-5386 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Bart Vercammen > Labels: needs-kip > > Currently, when working with Kafka backed state stores in Kafka Streams, > these log compacted topics are given a hardcoded name : > _my.app.id-storename-changelog_ > {noformat}public static String storeChangelogTopic(String applicationId, > String storeName) { > return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX; > }{noformat} > It would be nice if somehow I would be able to override this functionality > and provide the topic-name myself when creating the state-store. > Any comments? > Would it be OK to submit a PR for this? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5632) Message headers not supported by Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-5632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5632: --- Labels: needs-kip (was: ) > Message headers not supported by Kafka Streams > -- > > Key: KAFKA-5632 > URL: https://issues.apache.org/jira/browse/KAFKA-5632 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: CJ Woolard >Priority: Minor > Labels: needs-kip > > The new message headers functionality introduced in Kafka 0.11.0.0 > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers) > does not appear to be respected by Kafka Streams, specifically message > headers set on input topics to a Kafka Streams topology do not get propagated > to the corresponding output topics of the topology. > It appears that it's at least partially due to the > SourceNodeRecordDeserializer not properly respecting message headers here: > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java#L60 > where it isn't using the new ConsumerRecord constructor which supports > headers: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java#L122 > For additional background here is the line before which we noticed that we > still have the message headers, and after which we no longer have them: > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java#L93 > In terms of a potential solution there are a few different scenarios to > consider: > 1. A stream processor with one input and one output, i.e. 1-to-1, (A > map/transformation for example). This is the simplest case, and one proposal > would be to directly propagate any message headers from input to output. > 2. A stream processor with one input and many outputs, i.e. 1-to-many, (A > flatmap step for example). > 3. A stream processor with multiple inputs per output, i.e. many-to-1, (A > join step for example). > One proposal for supporting all possible scenarios would be to expose > overloads in the Kafka Streams DSL methods to allow the user the ability to > specify logic for handling of message headers. > For additional background the use case is similar to a distributed tracing > use case, where the following previous work may be useful for aiding in > design discussions: > Dapper > (https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36356.pdf) > > or > Zipkin (https://github.com/openzipkin/zipkin) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-3856) Cleanup Kafka Streams builder API
[ https://issues.apache.org/jira/browse/KAFKA-3856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098914#comment-16098914 ] ASF GitHub Bot commented on KAFKA-3856: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3536 > Cleanup Kafka Streams builder API > - > > Key: KAFKA-3856 > URL: https://issues.apache.org/jira/browse/KAFKA-3856 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Matthias J. Sax > Labels: api, kip > Fix For: 0.11.1.0 > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-120%3A+Cleanup+Kafka+Streams+builder+API -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Reopened] (KAFKA-3856) Cleanup Kafka Streams builder API
[ https://issues.apache.org/jira/browse/KAFKA-3856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reopened KAFKA-3856: -- Leave the JIRA as open since I think there is another PR coming for it. BTW moving forward it's better to create sub-tasks if the plans is to tackle it in multiple PRs, so that one PR can be correlating to one sub-task. Give this JIRA for example: 1) Add the {{TopologyDescription}} class for describe functionality 2) Deprecate the internal functions from {{TopologyBuilder}} by extracting them into an inner class. 3) Create / rename the public facing classes as proposed in KIP-120. > Cleanup Kafka Streams builder API > - > > Key: KAFKA-3856 > URL: https://issues.apache.org/jira/browse/KAFKA-3856 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Matthias J. Sax > Labels: api, kip > Fix For: 0.11.1.0 > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-120%3A+Cleanup+Kafka+Streams+builder+API -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4750) KeyValueIterator returns null values
[ https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098929#comment-16098929 ] Guozhang Wang commented on KAFKA-4750: -- [~evis] That is right, we can go with the first option above. Also as [~mjsax] mentioned we can enforce in the code that if the passed in value is `null` object for ser / bytes for deser, skip calling the serde and return the corresponding `null` bytes / object directly, besides stating it clearly in the javadoc. > KeyValueIterator returns null values > > > Key: KAFKA-4750 > URL: https://issues.apache.org/jira/browse/KAFKA-4750 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1, 0.10.2.1, 0.11.0.0 >Reporter: Michal Borowiecki >Assignee: Evgeny Veretennikov > Labels: newbie > Attachments: DeleteTest.java > > > The API for ReadOnlyKeyValueStore.range method promises the returned iterator > will not return null values. However, after upgrading from 0.10.0.0 to > 0.10.1.1 we found null values are returned causing NPEs on our side. > I found this happens after removing entries from the store and I found > resemblance to SAMZA-94 defect. The problem seems to be as it was there, when > deleting entries and having a serializer that does not return null when null > is passed in, the state store doesn't actually delete that key/value pair but > the iterator will return null value for that key. > When I modified our serilizer to return null when null is passed in, the > problem went away. However, I believe this should be fixed in kafka streams, > perhaps with a similar approach as SAMZA-94. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5611) One or more consumers in a consumer-group stop consuming after rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098973#comment-16098973 ] Jason Gustafson commented on KAFKA-5611: [~pskianis] You'd have to get pretty unlikely with the timing, but a wakeup following rebalance completion could explain the issue. Prior to invoking the partition assignor, we have a check to ensure that we have up-to-date metadata. It is possible to get a wakeup when fetching new metadata and the code does not appear clever enough at the moment to resume assignment on the next call to {{poll()}}. I'll submit a patch to fix this and maybe we can see if it addresses the problem. > One or more consumers in a consumer-group stop consuming after rebalancing > -- > > Key: KAFKA-5611 > URL: https://issues.apache.org/jira/browse/KAFKA-5611 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.2.0 >Reporter: Panos Skianis > Attachments: bad-server-with-more-logging-1.tar.gz, kka02, Server 1, > Server 2, Server 3 > > > Scenario: > - 3 zookeepers, 4 Kafkas. 0.10.2.0, with 0.9.0 compatibility still on > (other apps need it but the one mentioned below is already on kafka 0.10.2.0 > client). > - 3 servers running 1 consumer each under the same consumer groupId. > - Servers seem to be consuming messages happily but then there is a timeout > to an external service that causes our app to restart the Kafka Consumer on > one of the servers (this is by design). That causes rebalancing of the group > and upon restart of one of the Consumers seem to "block". > - Server 3 is where the problems occur. > - Problem fixes itself either by restarting one of the 3 servers or cause > the group to rebalance again by using the console consumer with the > autocommit set to false and using the same group. > > Note: > - Haven't managed to recreate it at will yet. > - Mainly happens in production environment, often enough. Hence I do not > have any logs with DEBUG/TRACE statements yet. > - Extracts from log of each app server are attached. Also the log of the > kafka that seems to be dealing with the related group and generations. > - See COMMENT lines in the files for further info. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-5611) One or more consumers in a consumer-group stop consuming after rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-5611: -- Assignee: Jason Gustafson > One or more consumers in a consumer-group stop consuming after rebalancing > -- > > Key: KAFKA-5611 > URL: https://issues.apache.org/jira/browse/KAFKA-5611 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.2.0 >Reporter: Panos Skianis >Assignee: Jason Gustafson > Attachments: bad-server-with-more-logging-1.tar.gz, kka02, Server 1, > Server 2, Server 3 > > > Scenario: > - 3 zookeepers, 4 Kafkas. 0.10.2.0, with 0.9.0 compatibility still on > (other apps need it but the one mentioned below is already on kafka 0.10.2.0 > client). > - 3 servers running 1 consumer each under the same consumer groupId. > - Servers seem to be consuming messages happily but then there is a timeout > to an external service that causes our app to restart the Kafka Consumer on > one of the servers (this is by design). That causes rebalancing of the group > and upon restart of one of the Consumers seem to "block". > - Server 3 is where the problems occur. > - Problem fixes itself either by restarting one of the 3 servers or cause > the group to rebalance again by using the console consumer with the > autocommit set to false and using the same group. > > Note: > - Haven't managed to recreate it at will yet. > - Mainly happens in production environment, often enough. Hence I do not > have any logs with DEBUG/TRACE statements yet. > - Extracts from log of each app server are attached. Also the log of the > kafka that seems to be dealing with the related group and generations. > - See COMMENT lines in the files for further info. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4711) Change Default unclean.leader.election.enabled from True to False (KIP-106)
[ https://issues.apache.org/jira/browse/KAFKA-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098987#comment-16098987 ] ASF GitHub Bot commented on KAFKA-4711: --- GitHub user bobrik opened a pull request: https://github.com/apache/kafka/pull/3567 KAFKA-4711: fix docs onunclean.leader.election.enabled default in design section You can merge this pull request into a Git repository by running: $ git pull https://github.com/bobrik/kafka unclean-docs-clarification Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3567.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3567 commit 9ef42e2867b5310b80a1819948c890a87689226b Author: Ivan Babrou Date: 2017-07-24T19:00:23Z KAFKA-4711: fix docs onunclean.leader.election.enabled default in design section > Change Default unclean.leader.election.enabled from True to False (KIP-106) > --- > > Key: KAFKA-4711 > URL: https://issues.apache.org/jira/browse/KAFKA-4711 > Project: Kafka > Issue Type: Bug >Reporter: Ben Stopford >Assignee: Sharad > > See > https://cwiki.apache.org/confluence/display/KAFKA/KIP-106+-+Change+Default+unclean.leader.election.enabled+from+True+to+False -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin reassigned KAFKA-5630: --- Assignee: Jiangjie Qin > Consumer poll loop over the same record after a CorruptRecordException > -- > > Key: KAFKA-5630 > URL: https://issues.apache.org/jira/browse/KAFKA-5630 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Vincent Maurin >Assignee: Jiangjie Qin >Priority: Critical > Labels: regression, reliability > Fix For: 0.11.0.1 > > > Hello > While consuming a topic with log compaction enabled, I am getting an infinite > consumption loop of the same record, i.e, each call to poll is returning to > me 500 times one record (500 is my max.poll.records). I am using the java > client 0.11.0.0. > Running the code with the debugger, the initial problem come from > `Fetcher.PartitionRecords,fetchRecords()`. > Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record > size is less than the minimum record overhead (14)` > Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test > block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the > last record. > I guess the corruption problem is similar too > https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the > client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098990#comment-16098990 ] Jiangjie Qin commented on KAFKA-5630: - [~vmaurin_glispa] Thanks for reporting the issue. It looks that the issue here is that we assumed the InvalidRecordException will only be thrown when we explicitly validate the record in the Fetcher.PartitionRecords. But it can actually be thrown from the iterator as well. I'll fix that. > Consumer poll loop over the same record after a CorruptRecordException > -- > > Key: KAFKA-5630 > URL: https://issues.apache.org/jira/browse/KAFKA-5630 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Vincent Maurin >Priority: Critical > Labels: regression, reliability > Fix For: 0.11.0.1 > > > Hello > While consuming a topic with log compaction enabled, I am getting an infinite > consumption loop of the same record, i.e, each call to poll is returning to > me 500 times one record (500 is my max.poll.records). I am using the java > client 0.11.0.0. > Running the code with the debugger, the initial problem come from > `Fetcher.PartitionRecords,fetchRecords()`. > Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record > size is less than the minimum record overhead (14)` > Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test > block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the > last record. > I guess the corruption problem is similar too > https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the > client is probably not the expected one -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5633) Clarify another scenario of unclean leader election
Ivan Babrou created KAFKA-5633: -- Summary: Clarify another scenario of unclean leader election Key: KAFKA-5633 URL: https://issues.apache.org/jira/browse/KAFKA-5633 Project: Kafka Issue Type: Bug Reporter: Ivan Babrou When unclean leader election is enabled, you don't need to lose all replicas of some partition, it's enough to lose just one. Leading replica can get into the state when it kicks everything out of ISR because it has issue with the network, then it can just die, causing leaderless partition. This is what we saw: {noformat} Jul 24 18:05:53 broker-10029 kafka[4104]: INFO Partition [requests,9] on broker 10029: Shrinking ISR for partition [requests,9] from 10029,10016,10072 to 10029 (kafka.cluster.Partition) {noformat} {noformat} Topic: requests Partition: 9Leader: -1 Replicas: 10029,10072,10016 Isr: 10029 {noformat} This is the default behavior in 0.11.0.0+, but I don't think that docs are completely clear about implications. Before the change you could silently lose data if the scenario described above happened, but now you can grind your whole pipeline to halt when just one node has issues. My understanding is that to avoid this you'd want to have min.insync.replicas > 1 and acks > 1 (probably all). It's also worth documenting how to force leader election when unclean leader election is disabled. I assume it can be accomplished by switching unclean.leader.election.enable on and off again for problematic topic, but being crystal clear on this it docs would be tremendously helpful. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic
[ https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099058#comment-16099058 ] Bart Vercammen commented on KAFKA-5386: --- [~mjsax] {{--changelog}} that's exactly the problem in our case ;-) We have some strict rules about the naming of kafka topics in our project and would like to have full control over them, including the ACLs on each kafka-topic. So basically we do not want applications to start creating whatever topics all over the place. This being said, {{--changelog}} would work if we could add some wildcard ACLs on the creation of Kafka topics, but that would imply writing an own authorizer in Kafka allowing only the creation of topics that comply to a specific naming schema. Changing the default {{--changelog}} schema into something more configurable would be easier for KafkaStreams applications. The (dirty) patch I made was simply to allow {{ProcessorStateManager}} to accept a {{static}} _prefix_ and _suffix_ configuration and just replace the {{storeChangelogTopic}} function to return {{..}} when they are defined. This would of course not work for the dynamically created repartitioning topics, but that could then be solved with your tip to use {{through("...")}} ... Let me see how far I get to make this cleaner and more configurable without impacting the public API to much ... ;-) > [Kafka Streams] - custom name for state-store change-log topic > -- > > Key: KAFKA-5386 > URL: https://issues.apache.org/jira/browse/KAFKA-5386 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Bart Vercammen > Labels: needs-kip > > Currently, when working with Kafka backed state stores in Kafka Streams, > these log compacted topics are given a hardcoded name : > _my.app.id-storename-changelog_ > {noformat}public static String storeChangelogTopic(String applicationId, > String storeName) { > return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX; > }{noformat} > It would be nice if somehow I would be able to override this functionality > and provide the topic-name myself when creating the state-store. > Any comments? > Would it be OK to submit a PR for this? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5616) unable perform a rolling upgrade from a non-secure to a secure Kafka cluster
[ https://issues.apache.org/jira/browse/KAFKA-5616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhu fangbo resolved KAFKA-5616. --- Resolution: Not A Problem This is due to improper ACL configuration,I did not set the operation for each broker on resource of cluster and topic. > unable perform a rolling upgrade from a non-secure to a secure Kafka cluster > > > Key: KAFKA-5616 > URL: https://issues.apache.org/jira/browse/KAFKA-5616 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.1.1 >Reporter: zhu fangbo > > I want to upgrade my unsecure kafka cluster to a secure one whitch support > SASL_PLAINT protocol, but I failed to perfrom rolling upgrade. The only way I > found to upgrade is to shutdown all brokers first and then restart all > brokers with inter-broker security configured > h3. Before upgrade > Here is the secure configuration of broker 1: > {quote}listeners=PLAINTEXT://10.45.4.9:9092,SASL_PLAINTEXT://10.45.4.9:9099 > sasl.enabled.mechanisms=PLAIN > authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer > super.users=User:admin{quote} > I want to setup a cluster support both unsecure and secure client-broker > connect, so i add a new endpoint to listeners with port = 9099 > h3. Start rolling upgrade > First, I restart broker-1 which is not the controller. below is part of > server.log shows start complete: > !http://olt6kofv9.bkt.clouddn.com/17-7-20/25775149.jpg|height=190,width=1390,hspace=1,vspace=4! > seemed well, but there are no log print to show the replicamanger was > started,and broker1 not go back to the ISR > !http://olt6kofv9.bkt.clouddn.com/17-7-20/55734691.jpg|height=200,width=800! > Besides, the preferred replica leader election was also failed > !http://olt6kofv9.bkt.clouddn.com/17-7-20/94837206.jpg|height=100,width=1200! > h3. After rolling upgrade for all brokers > After upgrade all brokers, it seems each broker can not connect to other > brokers > !http://olt6kofv9.bkt.clouddn.com/17-7-20/84863343.jpg| height=200,width=800! > I restart broker 2 at last which is the controller, then broker 3 came to be > controller, and it also failed to perform preferred replica leader election > !http://olt6kofv9.bkt.clouddn.com/17-7-20/70680876.jpg|height=150,width=1200! > h3. Shutdown all and restart > The cluster works well when I shutdown all brokers and restart all with > inter-broker security configurations like this: > {quote}listeners=PLAINTEXT://10.45.4.9:9092,SASL_PLAINTEXT://10.45.4.9:9099 > #advertised.listeners=SASL_PLAINTEXT://10.45.4.9:9099 > security.inter.broker.protocol=SASL_PLAINTEXT > sasl.mechanism.inter.broker.protocol=PLAIN{quote} > replica fetch thread was started > !http://olt6kofv9.bkt.clouddn.com/17-7-20/98186199.jpg|height=200,width=1200! > and ISR was normal > !http://olt6kofv9.bkt.clouddn.com/17-7-20/13606263.jpg|height=150,width=680! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5634) Replica fetcher thread crashes due to OffsetOutOfRangeException
Jason Gustafson created KAFKA-5634: -- Summary: Replica fetcher thread crashes due to OffsetOutOfRangeException Key: KAFKA-5634 URL: https://issues.apache.org/jira/browse/KAFKA-5634 Project: Kafka Issue Type: Bug Affects Versions: 0.11.0.0 Reporter: Jason Gustafson Assignee: Jason Gustafson Priority: Critical We have seen the following exception recently: {code} kafka.common.KafkaException: error processing data for partition [foo,0] offset 1459250 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:203) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: The specified offset 1459250 is higher than the high watermark 1459032 of the partition foo-0 {code} The error check was added in the patch for KIP-107: https://github.com/apache/kafka/commit/8b05ad406d4cba6a75d1683b6d8699c3ab28f9d6. After investigation, we found that it is possible for the log start offset on the leader to get ahead of the high watermark on the follower after segment deletion. The check therefore seems incorrect. The impact of this bug is that the fetcher thread crashes on the follower and the broker must be restarted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5634) Replica fetcher thread crashes due to OffsetOutOfRangeException
[ https://issues.apache.org/jira/browse/KAFKA-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099470#comment-16099470 ] Jason Gustafson commented on KAFKA-5634: Should have mentioned this, but the specific line is here: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/cluster/Replica.scala#L109. > Replica fetcher thread crashes due to OffsetOutOfRangeException > --- > > Key: KAFKA-5634 > URL: https://issues.apache.org/jira/browse/KAFKA-5634 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Critical > > We have seen the following exception recently: > {code} > kafka.common.KafkaException: error processing data for partition [foo,0] > offset 1459250 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:203) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: The > specified offset 1459250 is higher than the high watermark 1459032 of the > partition foo-0 > {code} > The error check was added in the patch for KIP-107: > https://github.com/apache/kafka/commit/8b05ad406d4cba6a75d1683b6d8699c3ab28f9d6. > After investigation, we found that it is possible for the log start offset > on the leader to get ahead of the high watermark on the follower after > segment deletion. The check therefore seems incorrect. The impact of this bug > is that the fetcher thread crashes on the follower and the broker must be > restarted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4218) Enable access to key in ValueTransformer
[ https://issues.apache.org/jira/browse/KAFKA-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099491#comment-16099491 ] ASF GitHub Bot commented on KAFKA-4218: --- GitHub user jeyhunkarimov opened a pull request: https://github.com/apache/kafka/pull/3570 KAFKA-4218: KIP-149, Enabling withKey interfaces in streams This PR aims to provide key access to ValueMapper, ValueJoiner, ValueTransformer, Initializer and Reducer interfaces. More details can be found in [here](https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner) You can merge this pull request into a Git repository by running: $ git pull https://github.com/jeyhunkarimov/kafka KIP-149 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3570.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3570 commit a1bbfdca86c53359ef75217ec4aaad06179d967d Author: Jeyhun Karimov Date: 2017-07-21T23:18:05Z Add withKey methods to KStream, KGroupedStream, KTable and KGroupedTable interfaces > Enable access to key in ValueTransformer > > > Key: KAFKA-4218 > URL: https://issues.apache.org/jira/browse/KAFKA-4218 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Jeyhun Karimov > Labels: api, kip > Fix For: 1.0.0 > > > While transforming values via {{KStream.transformValues}} and > {{ValueTransformer}}, the key associated with the value may be needed, even > if it is not changed. For instance, it may be used to access stores. > As of now, the key is not available within these methods and interfaces, > leading to the use of {{KStream.transform}} and {{Transformer}}, and the > unnecessary creation of new {{KeyValue}} objects. > KIP-149: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4218) Enable access to key in ValueTransformer
[ https://issues.apache.org/jira/browse/KAFKA-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099492#comment-16099492 ] ASF GitHub Bot commented on KAFKA-4218: --- Github user jeyhunkarimov closed the pull request at: https://github.com/apache/kafka/pull/2946 > Enable access to key in ValueTransformer > > > Key: KAFKA-4218 > URL: https://issues.apache.org/jira/browse/KAFKA-4218 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Jeyhun Karimov > Labels: api, kip > Fix For: 1.0.0 > > > While transforming values via {{KStream.transformValues}} and > {{ValueTransformer}}, the key associated with the value may be needed, even > if it is not changed. For instance, it may be used to access stores. > As of now, the key is not available within these methods and interfaces, > leading to the use of {{KStream.transform}} and {{Transformer}}, and the > unnecessary creation of new {{KeyValue}} objects. > KIP-149: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5611) One or more consumers in a consumer-group stop consuming after rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099537#comment-16099537 ] ASF GitHub Bot commented on KAFKA-5611: --- GitHub user hachikuji opened a pull request: https://github.com/apache/kafka/pull/3571 KAFKA-5611; AbstractCoordinator should handle wakeup raised from onJoinComplete You can merge this pull request into a Git repository by running: $ git pull https://github.com/hachikuji/kafka KAFKA-5611 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3571.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3571 commit e0b4f65031dbb8135d872811c68dec94f7a45efd Author: Jason Gustafson Date: 2017-07-25T05:14:42Z KAFKA-5611; AbstractCoordinator should handle wakeup raised from onJoinComplete > One or more consumers in a consumer-group stop consuming after rebalancing > -- > > Key: KAFKA-5611 > URL: https://issues.apache.org/jira/browse/KAFKA-5611 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.2.0 >Reporter: Panos Skianis >Assignee: Jason Gustafson > Attachments: bad-server-with-more-logging-1.tar.gz, kka02, Server 1, > Server 2, Server 3 > > > Scenario: > - 3 zookeepers, 4 Kafkas. 0.10.2.0, with 0.9.0 compatibility still on > (other apps need it but the one mentioned below is already on kafka 0.10.2.0 > client). > - 3 servers running 1 consumer each under the same consumer groupId. > - Servers seem to be consuming messages happily but then there is a timeout > to an external service that causes our app to restart the Kafka Consumer on > one of the servers (this is by design). That causes rebalancing of the group > and upon restart of one of the Consumers seem to "block". > - Server 3 is where the problems occur. > - Problem fixes itself either by restarting one of the 3 servers or cause > the group to rebalance again by using the console consumer with the > autocommit set to false and using the same group. > > Note: > - Haven't managed to recreate it at will yet. > - Mainly happens in production environment, often enough. Hence I do not > have any logs with DEBUG/TRACE statements yet. > - Extracts from log of each app server are attached. Also the log of the > kafka that seems to be dealing with the related group and generations. > - See COMMENT lines in the files for further info. -- This message was sent by Atlassian JIRA (v6.4.14#64029)