[jira] [Commented] (KAFKA-6728) Kafka Connect Header Null Pointer Exception
[ https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423740#comment-16423740 ] Philippe Hong commented on KAFKA-6728: -- Thank you, adding header.converter=org.apache.kafka.connect.storage.SimpleHeaderConverter solved my issue as you mentioned. Good job on implementing a more convenient and lasting fix. > Kafka Connect Header Null Pointer Exception > --- > > Key: KAFKA-6728 > URL: https://issues.apache.org/jira/browse/KAFKA-6728 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 > Environment: Linux Mint >Reporter: Philippe Hong >Priority: Critical > > I am trying to use the newly released Kafka Connect that supports headers by > using the standalone connector to write to a text file (so in this case I am > only using the sink component) > I am sadly greeted by a NullPointerException : > {noformat} > ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > I launched zookeeper and kafka 1.1.0 locally and sent a > ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, > Array[Byte]] with a header that has a key and value. > I can read the record with a console consumer as well as using a > KafkaConsumer (where in this case I can see the content of the header of the > record I sent previously) so no problem here. > I only made two changes to the kafka configuration: > - I used the StringConverter for the key and the ByteArrayConverter for > the value. > - I also changed the topic where the sink would connect to. > If I forgot something please tell me so as it is the first time I am creating > an issue on Jira. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6728) Kafka Connect Header Null Pointer Exception
[ https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423740#comment-16423740 ] Philippe Hong edited comment on KAFKA-6728 at 4/3/18 9:35 AM: -- Thank you, adding \{noformat}header.converter=org.apache.kafka.connect.storage.Simp leHeaderConverter\{noformat} solved my issue as you mentioned. Good job on implementing a more convenient and lasting fix. was (Author: hwki): Thank you, adding {}header.converter=org.apache.kafka.connect.storage.Simp leHeaderConverter{} solved my issue as you mentioned. Good job on implementing a more convenient and lasting fix. > Kafka Connect Header Null Pointer Exception > --- > > Key: KAFKA-6728 > URL: https://issues.apache.org/jira/browse/KAFKA-6728 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 > Environment: Linux Mint >Reporter: Philippe Hong >Priority: Critical > > I am trying to use the newly released Kafka Connect that supports headers by > using the standalone connector to write to a text file (so in this case I am > only using the sink component) > I am sadly greeted by a NullPointerException : > {noformat} > ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > I launched zookeeper and kafka 1.1.0 locally and sent a > ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, > Array[Byte]] with a header that has a key and value. > I can read the record with a console consumer as well as using a > KafkaConsumer (where in this case I can see the content of the header of the > record I sent previously) so no problem here. > I only made two changes to the kafka configuration: > - I used the StringConverter for the key and the ByteArrayConverter for > the value. > - I also changed the topic where the sink would connect to. > If I forgot something please tell me so as it is the first time I am creating > an issue on Jira. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6728) Kafka Connect Header Null Pointer Exception
[ https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423740#comment-16423740 ] Philippe Hong edited comment on KAFKA-6728 at 4/3/18 9:35 AM: -- Thank you, adding header.converter=org.apache.kafka.connect.storage.Simp leHeaderConverter solved my issue as you mentioned. Good job on implementing a more convenient and lasting fix. was (Author: hwki): Thank you, adding \{noformat}header.converter=org.apache.kafka.connect.storage.Simp leHeaderConverter\{noformat} solved my issue as you mentioned. Good job on implementing a more convenient and lasting fix. > Kafka Connect Header Null Pointer Exception > --- > > Key: KAFKA-6728 > URL: https://issues.apache.org/jira/browse/KAFKA-6728 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 > Environment: Linux Mint >Reporter: Philippe Hong >Priority: Critical > > I am trying to use the newly released Kafka Connect that supports headers by > using the standalone connector to write to a text file (so in this case I am > only using the sink component) > I am sadly greeted by a NullPointerException : > {noformat} > ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > I launched zookeeper and kafka 1.1.0 locally and sent a > ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, > Array[Byte]] with a header that has a key and value. > I can read the record with a console consumer as well as using a > KafkaConsumer (where in this case I can see the content of the header of the > record I sent previously) so no problem here. > I only made two changes to the kafka configuration: > - I used the StringConverter for the key and the ByteArrayConverter for > the value. > - I also changed the topic where the sink would connect to. > If I forgot something please tell me so as it is the first time I am creating > an issue on Jira. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6728) Kafka Connect Header Null Pointer Exception
[ https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423740#comment-16423740 ] Philippe Hong edited comment on KAFKA-6728 at 4/3/18 9:35 AM: -- Thank you, adding {}header.converter=org.apache.kafka.connect.storage.Simp leHeaderConverter{} solved my issue as you mentioned. Good job on implementing a more convenient and lasting fix. was (Author: hwki): Thank you, adding header.converter=org.apache.kafka.connect.storage.SimpleHeaderConverter solved my issue as you mentioned. Good job on implementing a more convenient and lasting fix. > Kafka Connect Header Null Pointer Exception > --- > > Key: KAFKA-6728 > URL: https://issues.apache.org/jira/browse/KAFKA-6728 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 > Environment: Linux Mint >Reporter: Philippe Hong >Priority: Critical > > I am trying to use the newly released Kafka Connect that supports headers by > using the standalone connector to write to a text file (so in this case I am > only using the sink component) > I am sadly greeted by a NullPointerException : > {noformat} > ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > I launched zookeeper and kafka 1.1.0 locally and sent a > ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, > Array[Byte]] with a header that has a key and value. > I can read the record with a console consumer as well as using a > KafkaConsumer (where in this case I can see the content of the header of the > record I sent previously) so no problem here. > I only made two changes to the kafka configuration: > - I used the StringConverter for the key and the ByteArrayConverter for > the value. > - I also changed the topic where the sink would connect to. > If I forgot something please tell me so as it is the first time I am creating > an issue on Jira. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6728) Kafka Connect Header Null Pointer Exception
[ https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423740#comment-16423740 ] Philippe Hong edited comment on KAFKA-6728 at 4/3/18 9:36 AM: -- Thank you, adding header.converter=org.apache.kafka.connect.storage.SimpleHeaderConverter solved my issue as you mentioned. Good job on implementing a more convenient and lasting fix. was (Author: hwki): Thank you, adding header.converter=org.apache.kafka.connect.storage.SimpleHeaderConverter solved my issue as you mentioned. Good job on implementing a more convenient and lasting fix. > Kafka Connect Header Null Pointer Exception > --- > > Key: KAFKA-6728 > URL: https://issues.apache.org/jira/browse/KAFKA-6728 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 > Environment: Linux Mint >Reporter: Philippe Hong >Priority: Critical > > I am trying to use the newly released Kafka Connect that supports headers by > using the standalone connector to write to a text file (so in this case I am > only using the sink component) > I am sadly greeted by a NullPointerException : > {noformat} > ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > I launched zookeeper and kafka 1.1.0 locally and sent a > ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, > Array[Byte]] with a header that has a key and value. > I can read the record with a console consumer as well as using a > KafkaConsumer (where in this case I can see the content of the header of the > record I sent previously) so no problem here. > I only made two changes to the kafka configuration: > - I used the StringConverter for the key and the ByteArrayConverter for > the value. > - I also changed the topic where the sink would connect to. > If I forgot something please tell me so as it is the first time I am creating > an issue on Jira. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6728) Kafka Connect Header Null Pointer Exception
[ https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423740#comment-16423740 ] Philippe Hong edited comment on KAFKA-6728 at 4/3/18 9:36 AM: -- Thank you, adding header.converter=org.apache.kafka.connect.storage.SimpleHeaderConverter solved my issue as you mentioned. Good job on implementing a more convenient and lasting fix. was (Author: hwki): Thank you, adding header.converter=org.apache.kafka.connect.storage.Simp leHeaderConverter solved my issue as you mentioned. Good job on implementing a more convenient and lasting fix. > Kafka Connect Header Null Pointer Exception > --- > > Key: KAFKA-6728 > URL: https://issues.apache.org/jira/browse/KAFKA-6728 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 > Environment: Linux Mint >Reporter: Philippe Hong >Priority: Critical > > I am trying to use the newly released Kafka Connect that supports headers by > using the standalone connector to write to a text file (so in this case I am > only using the sink component) > I am sadly greeted by a NullPointerException : > {noformat} > ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > I launched zookeeper and kafka 1.1.0 locally and sent a > ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, > Array[Byte]] with a header that has a key and value. > I can read the record with a console consumer as well as using a > KafkaConsumer (where in this case I can see the content of the header of the > record I sent previously) so no problem here. > I only made two changes to the kafka configuration: > - I used the StringConverter for the key and the ByteArrayConverter for > the value. > - I also changed the topic where the sink would connect to. > If I forgot something please tell me so as it is the first time I am creating > an issue on Jira. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6683) ReplicaFetcher crashes with "Attempted to complete a transaction which was not started"
[ https://issues.apache.org/jira/browse/KAFKA-6683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423791#comment-16423791 ] Chema Sanchez commented on KAFKA-6683: -- [~hachikuji] I have been unable to reproduce the problem with the fixed code, it seems to work properly. Thanks a lot. > ReplicaFetcher crashes with "Attempted to complete a transaction which was > not started" > > > Key: KAFKA-6683 > URL: https://issues.apache.org/jira/browse/KAFKA-6683 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.0.0 > Environment: os: GNU/Linux > arch: x86_64 > Kernel: 4.9.77 > jvm: OpenJDK 1.8.0 >Reporter: Chema Sanchez >Assignee: Jason Gustafson >Priority: Critical > Fix For: 1.1.0 > > Attachments: server.properties > > > We have been experiencing this issue lately when restarting or replacing > brokers of our Kafka clusters during maintenance operations. > Having restarted or replaced a broker, after some minutes performing normally > it may suddenly throw the following exception and stop replicating some > partitions: > {code:none} > 2018-03-15 17:23:01,482] ERROR [ReplicaFetcher replicaId=12, leaderId=10, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) > java.lang.IllegalArgumentException: Attempted to complete a transaction which > was not started > at > kafka.log.ProducerStateManager.completeTxn(ProducerStateManager.scala:720) > at kafka.log.Log.$anonfun$loadProducersFromLog$4(Log.scala:540) > at > kafka.log.Log.$anonfun$loadProducersFromLog$4$adapted(Log.scala:540) > at scala.collection.immutable.List.foreach(List.scala:389) > at > scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:35) > at > scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:35) > at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:44) > at kafka.log.Log.loadProducersFromLog(Log.scala:540) > at kafka.log.Log.$anonfun$loadProducerState$5(Log.scala:521) > at kafka.log.Log.$anonfun$loadProducerState$5$adapted(Log.scala:514) > at scala.collection.Iterator.foreach(Iterator.scala:929) > at scala.collection.Iterator.foreach$(Iterator.scala:929) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1417) > at scala.collection.IterableLike.foreach(IterableLike.scala:71) > at scala.collection.IterableLike.foreach$(IterableLike.scala:70) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.log.Log.loadProducerState(Log.scala:514) > at kafka.log.Log.$anonfun$truncateTo$2(Log.scala:1487) > at > scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12) > at kafka.log.Log.maybeHandleIOException(Log.scala:1669) > at kafka.log.Log.truncateTo(Log.scala:1467) > at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:454) > at > kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:445) > at > scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:120) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788) > at kafka.log.LogManager.truncateTo(LogManager.scala:445) > at > kafka.server.ReplicaFetcherThread.$anonfun$maybeTruncate$1(ReplicaFetcherThread.scala:281) > at scala.collection.Iterator.foreach(Iterator.scala:929) > at scala.collection.Iterator.foreach$(Iterator.scala:929) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1417) > at scala.collection.IterableLike.foreach(IterableLike.scala:71) > at scala.collection.IterableLike.foreach$(IterableLike.scala:70) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:265) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:135) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217) > at > kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > [2018-03-15 17:23:01,497] INFO [ReplicaFetcher replicaId=12, leaderId=10, > fetcherId=0] Stopped (kafka.server.ReplicaFet
[jira] [Created] (KAFKA-6741) Transient test failure: SslTransportLayerTest.testNetworkThreadTimeRecorded
Manikumar created KAFKA-6741: Summary: Transient test failure: SslTransportLayerTest.testNetworkThreadTimeRecorded Key: KAFKA-6741 URL: https://issues.apache.org/jira/browse/KAFKA-6741 Project: Kafka Issue Type: Bug Reporter: Manikumar debug logs: {code} [2018-04-03 14:51:33,365] DEBUG Added sensor with name connections-closed: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,368] DEBUG Added sensor with name connections-created: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,368] DEBUG Added sensor with name successful-authentication: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,368] DEBUG Added sensor with name failed-authentication: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,368] DEBUG Added sensor with name bytes-sent-received: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,369] DEBUG Added sensor with name bytes-sent: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,370] DEBUG Added sensor with name bytes-received: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,370] DEBUG Added sensor with name select-time: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,371] DEBUG Added sensor with name io-time: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,379] DEBUG Removed sensor with name connections-closed: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,379] DEBUG Removed sensor with name connections-created: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,379] DEBUG Removed sensor with name successful-authentication: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,380] DEBUG Removed sensor with name failed-authentication: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,380] DEBUG Removed sensor with name bytes-sent-received: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,380] DEBUG Removed sensor with name bytes-sent: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,380] DEBUG Removed sensor with name bytes-received: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,382] DEBUG Removed sensor with name select-time: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,382] DEBUG Removed sensor with name io-time: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,382] DEBUG Added sensor with name connections-closed: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,382] DEBUG Added sensor with name connections-created: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,383] DEBUG Added sensor with name successful-authentication: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,383] DEBUG Added sensor with name failed-authentication: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,383] DEBUG Added sensor with name bytes-sent-received: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,384] DEBUG Added sensor with name bytes-sent: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,384] DEBUG Added sensor with name bytes-received: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,384] DEBUG Added sensor with name select-time: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,384] DEBUG Added sensor with name io-time: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,444] DEBUG Added sensor with name connections-closed: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,445] DEBUG Added sensor with name connections-created: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,445] DEBUG Added sensor with name successful-authentication: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,445] DEBUG Added sensor with name failed-authentication: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,446] DEBUG Added sensor with name bytes-sent-received: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,446] DEBUG Added sensor with name bytes-sent: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,446] DEBUG Added sensor with name bytes-received: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,447] DEBUG Added sensor with name select-time: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,447] DEBUG Added sensor with name io-time: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,890] DEBUG Created socket with SO_RCVBUF = 326640, SO_SNDBUF = 65328, SO_TIMEOUT = 0 to node 0 (org.apache.kafka.common.network.Selector:474) [2018-04-03 14:51:33,892] DEBUG Added sensor with name node-127.0.0.1:53543-127.0.
[jira] [Updated] (KAFKA-6741) Transient test failure: SslTransportLayerTest.testNetworkThreadTimeRecorded
[ https://issues.apache.org/jira/browse/KAFKA-6741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-6741: - Description: debug logs: {code} [2018-04-03 14:51:33,365] DEBUG Added sensor with name connections-closed: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,368] DEBUG Added sensor with name connections-created: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,368] DEBUG Added sensor with name successful-authentication: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,368] DEBUG Added sensor with name failed-authentication: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,368] DEBUG Added sensor with name bytes-sent-received: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,369] DEBUG Added sensor with name bytes-sent: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,370] DEBUG Added sensor with name bytes-received: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,370] DEBUG Added sensor with name select-time: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,371] DEBUG Added sensor with name io-time: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,379] DEBUG Removed sensor with name connections-closed: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,379] DEBUG Removed sensor with name connections-created: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,379] DEBUG Removed sensor with name successful-authentication: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,380] DEBUG Removed sensor with name failed-authentication: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,380] DEBUG Removed sensor with name bytes-sent-received: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,380] DEBUG Removed sensor with name bytes-sent: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,380] DEBUG Removed sensor with name bytes-received: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,382] DEBUG Removed sensor with name select-time: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,382] DEBUG Removed sensor with name io-time: (org.apache.kafka.common.metrics.Metrics:447) [2018-04-03 14:51:33,382] DEBUG Added sensor with name connections-closed: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,382] DEBUG Added sensor with name connections-created: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,383] DEBUG Added sensor with name successful-authentication: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,383] DEBUG Added sensor with name failed-authentication: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,383] DEBUG Added sensor with name bytes-sent-received: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,384] DEBUG Added sensor with name bytes-sent: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,384] DEBUG Added sensor with name bytes-received: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,384] DEBUG Added sensor with name select-time: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,384] DEBUG Added sensor with name io-time: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,444] DEBUG Added sensor with name connections-closed: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,445] DEBUG Added sensor with name connections-created: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,445] DEBUG Added sensor with name successful-authentication: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,445] DEBUG Added sensor with name failed-authentication: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,446] DEBUG Added sensor with name bytes-sent-received: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,446] DEBUG Added sensor with name bytes-sent: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,446] DEBUG Added sensor with name bytes-received: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,447] DEBUG Added sensor with name select-time: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,447] DEBUG Added sensor with name io-time: (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,890] DEBUG Created socket with SO_RCVBUF = 326640, SO_SNDBUF = 65328, SO_TIMEOUT = 0 to node 0 (org.apache.kafka.common.network.Selector:474) [2018-04-03 14:51:33,892] DEBUG Added sensor with name node-127.0.0.1:53543-127.0.0.1:53544.bytes-sent (org.apache.kafka.common.metrics.Metrics:414) [2018-04-03 14:51:33,893] DEBUG Added sensor with name node-127.0.0.1:53543-127
[jira] [Created] (KAFKA-6742) TopologyTestDriver error when dealing with stores from GlobalKTable
Valentino Proietti created KAFKA-6742: - Summary: TopologyTestDriver error when dealing with stores from GlobalKTable Key: KAFKA-6742 URL: https://issues.apache.org/jira/browse/KAFKA-6742 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.1.0 Reporter: Valentino Proietti {color:#FF}This junit test simply fails:{color} @Test *public* *void* globalTable() { StreamsBuilder builder = *new* StreamsBuilder(); @SuppressWarnings("unused") *final* KTable localTable = builder .table("local", Consumed._with_(Serdes._String_(), Serdes._String_()), Materialized._as_("localStore")) ; @SuppressWarnings("unused") *final* GlobalKTable globalTable = builder .globalTable("global", Consumed._with_(Serdes._String_(), Serdes._String_()), Materialized._as_("globalStore")) ; // Properties props = *new* Properties(); props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test"); props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost"); TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), props); // *final* KeyValueStore localStore = testDriver.getKeyValueStore("localStore"); Assert._assertNotNull_(localStore); Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore")); // *final* KeyValueStore globalStore = testDriver.getKeyValueStore("globalStore"); Assert._assertNotNull_(globalStore); Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore")); // *final* ConsumerRecordFactory crf = *new* ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer()); testDriver.pipeInput(crf.create("local", "one", "TheOne")); testDriver.pipeInput(crf.create("global", "one", "TheOne")); // Assert._assertEquals_("TheOne", localStore.get("one")); Assert._assertEquals_("TheOne", globalStore.get("one")); {color:#FF}to make it work I had to modify the TopologyTestDriver class as follow:{color} ... *public* Map getAllStateStores() { // final Map allStores = new HashMap<>(); // for (final String storeName : internalTopologyBuilder.allStateStoreName()) { // allStores.put(storeName, ((ProcessorContextImpl) task.context()).getStateMgr().getStore(storeName)); // } // return allStores; {color:#FF}// *FIXME*{color} *final* ProcessorStateManager psm = ((ProcessorContextImpl) task.context()).getStateMgr(); *final* Map allStores = *new* HashMap<>(); *for* (*final* String storeName : internalTopologyBuilder.allStateStoreName()) { StateStore res = psm.getStore(storeName); *if* (res == *null*) res = psm.getGlobalStore(storeName); allStores.put(storeName, res); } *return* allStores; } ... *public* StateStore getStateStore(*final* String name) { // return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name); {color:#FF}// *FIXME*{color} *final* ProcessorStateManager psm = ((ProcessorContextImpl) task.context()).getStateMgr(); StateStore res = psm.getStore(name); *if* (res == *null*) res = psm.getGlobalStore(name); *return* res; } {color:#FF}moreover I think it would be very useful to make the internal MockProducer public for testing cases where a producer is used along side with the "normal" stream processing by adding the method:{color} /** * *@return* records sent with this producer are automatically streamed to the topology. */ *public* *final* Producer<*byte*[], *byte*[]> getProducer() { *return* producer; } {color:#FF}unfortunately this introduces another problem that could be verified by adding the following lines to the previous junit test:{color} ... ** // ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); // just to serialize keys and values testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, cr.timestamp(), cr.key(), cr.value())); testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, cr.timestamp(), cr.key(), cr.value())); testDriver.advanceWallClockTime(0); Assert._assertEquals_("TheOne", localStore.get("one")); Assert._assertEquals_("Second", localStore.get("two")); Assert._assertEquals_("TheOne", globalStore.get("one")); Assert._assertEquals_("Second", globalStore.get("two")); } {color:#FF}that could be fixed with:{color} *private* *void* captureOutputRecords() { // Capture all the records sent to the producer ... *final* List> output = producer.history(); producer.clear(); *for* (*final* ProducerRecord<*byte*[], *byte*[]> record : output) { Queue> outputRecords = outputRecordsByTopic.get(record.topic());
[jira] [Updated] (KAFKA-6742) TopologyTestDriver error when dealing with stores from GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Valentino Proietti updated KAFKA-6742: -- Description: {color:#ff}This junit test simply fails:{color} @Test *public* *void* globalTable() { StreamsBuilder builder = *new* StreamsBuilder(); @SuppressWarnings("unused") *final* KTable localTable = builder .table("local", Consumed._with_(Serdes._String_(), Serdes._String_()), Materialized._as_("localStore")) ; @SuppressWarnings("unused") *final* GlobalKTable globalTable = builder .globalTable("global", Consumed._with_(Serdes._String_(), Serdes._String_()), Materialized._as_("globalStore")) ; // Properties props = *new* Properties(); props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test"); props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost"); TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), props); // *final* KeyValueStore localStore = testDriver.getKeyValueStore("localStore"); Assert._assertNotNull_(localStore); Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore")); // *final* KeyValueStore globalStore = testDriver.getKeyValueStore("globalStore"); Assert._assertNotNull_(globalStore); Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore")); // *final* ConsumerRecordFactory crf = *new* ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer()); testDriver.pipeInput(crf.create("local", "one", "TheOne")); testDriver.pipeInput(crf.create("global", "one", "TheOne")); // Assert._assertEquals_("TheOne", localStore.get("one")); Assert._assertEquals_("TheOne", globalStore.get("one")); {color:#ff}to make it work I had to modify the TopologyTestDriver class as follow:{color} ... *public* Map getAllStateStores() { // final Map allStores = new HashMap<>(); // for (final String storeName : internalTopologyBuilder.allStateStoreName()) { // allStores.put(storeName, ((ProcessorContextImpl) task.context()).getStateMgr().getStore(storeName)); // } // return allStores; {color:#ff}// *FIXME*{color} *final* ProcessorStateManager psm = ((ProcessorContextImpl) task.context()).getStateMgr(); *final* Map allStores = *new* HashMap<>(); *for* (*final* String storeName : internalTopologyBuilder.allStateStoreName()) { StateStore res = psm.getStore(storeName); if (res == null) res = psm.getGlobalStore(storeName); allStores.put(storeName, res); } *return* allStores; } ... *public* StateStore getStateStore(*final* String name) { // return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name); {color:#ff}// *FIXME*{color} *final* ProcessorStateManager psm = ((ProcessorContextImpl) task.context()).getStateMgr(); StateStore res = psm.getStore(name); *if* (res == *null*) res = psm.getGlobalStore(name); *return* res; } {color:#ff}moreover I think it would be very useful to make the internal MockProducer public for testing cases where a producer is used along side with the "normal" stream processing by adding the method:{color} /** * *@return* records sent with this producer are automatically streamed to the topology. */ *public* *final* Producer<*byte*[], *byte*[]> getProducer() { return producer; } {color:#ff}unfortunately this introduces another problem that could be verified by adding the following lines to the previous junit test:{color} ... ** // ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); // just to serialize keys and values testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, cr.timestamp(), cr.key(), cr.value())); testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, cr.timestamp(), cr.key(), cr.value())); testDriver.advanceWallClockTime(0); Assert._assertEquals_("TheOne", localStore.get("one")); Assert._assertEquals_("Second", localStore.get("two")); Assert._assertEquals_("TheOne", globalStore.get("one")); Assert._assertEquals_("Second", globalStore.get("two")); } {color:#ff}that could be fixed with:{color} *private* *void* captureOutputRecords() { // Capture all the records sent to the producer ... *final* List> output = producer.history(); producer.clear(); *for* (*final* ProducerRecord<*byte*[], *byte*[]> record : output) { Queue> outputRecords = outputRecordsByTopic.get(record.topic()); *if* (outputRecords == *null*) { outputRecords = *new* LinkedList<>(); outputRecordsByTopic.put(record.topic(), outputRecords); } outputRecor
[jira] [Assigned] (KAFKA-6740) Plugins class' newConverter and newHeaderConverter methods are unclear
[ https://issues.apache.org/jira/browse/KAFKA-6740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch reassigned KAFKA-6740: Assignee: Randall Hauch > Plugins class' newConverter and newHeaderConverter methods are unclear > -- > > Key: KAFKA-6740 > URL: https://issues.apache.org/jira/browse/KAFKA-6740 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Major > > The ClassLoaderUsage enum is a bit unclear as to what the code is doing. The > `CURRENT_CLASSLOADER` is really about using the connector's configuration, > whereas the `PLUGINS` literal is more about using the worker's configuration. > We should clean this up, and we should also look at moving the if-block into > the `CURRENT_CLASSLOADER` case in {{newConverter}}, just as we did in the > {{newHeaderConverter}} method in KAFKA-6728. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6743) ConsumerPerformance fails to consume all messages on topics with large number of partitions
Alex Dunayevsky created KAFKA-6743: -- Summary: ConsumerPerformance fails to consume all messages on topics with large number of partitions Key: KAFKA-6743 URL: https://issues.apache.org/jira/browse/KAFKA-6743 Project: Kafka Issue Type: Bug Components: core, tools Affects Versions: 0.11.0.2 Reporter: Alex Dunayevsky ConsumerPerformance fails to consume all messages on topics with large number of partitions due to a relatively short default polling loop timeout (1000 ms) that is not reachable and modifiable by the end user. Demo: Create a topic of 10 000 partitions, send a 50 000 000 of 100 byte records using kafka-producer-perf-test and consume them using kafka-consumer-perf-test (ConsumerPerformance). You will likely notice that the number of records returned by the kafka-consumer-perf-test is many times less than expected 50 000 000. This happens due to specific ConsumerPerformance implementation. As the result, in some rough cases it may take a long enough time to process/iterate through the records polled in batches, thus, the time may exceed the default hardcoded polling loop timeout and this is probably not what we want from this utility. We have two options: 1) Increasing polling loop timeout in ConsumerPerformance implementation. It defaults to 1000 ms and is hardcoded, thus cannot be changed but we could export it as an OPTIONAL kafka-consumer-perf-test parameter to enable it on a script level configuration and available to the end user. 2) Decreasing max.poll.records on a Consumer config level. This is not a fine option though since we do not want to touch the default settings. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6724) ConsumerPerformance resets offsets on every startup
[ https://issues.apache.org/jira/browse/KAFKA-6724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424114#comment-16424114 ] ASF GitHub Bot commented on KAFKA-6724: --- rootex- opened a new pull request #4818: KAFKA-6724 ConsumerPerformance resets offsets on every startup URL: https://github.com/apache/kafka/pull/4818 ## Remove consumer offset reset on startup ### ConsumerPerformance fails to consume all messages on topics with large number of partitions due to a relatively short default polling loop timeout (1000 ms) that is not reachable and modifiable by the end user. ### Demo Create a topic of 10 000 partitions, send a 50 000 000 of 100 byte records using kafka-producer-perf-test and consume them using kafka-consumer-perf-test (ConsumerPerformance). You will likely notice that the number of records returned by the kafka-consumer-perf-test is many times less than expected 50 000 000. This happens due to specific ConsumerPerformance implementation. As the result, in some rough cases it may take a long enough time to process/iterate through the records polled in batches, thus, the time may exceed the default hardcoded polling loop timeout and this is probably not what we want from this utility. ### Possible options 1) Increasing polling loop timeout in ConsumerPerformance implementation. It defaults to 1000 ms and is hardcoded, thus cannot be changed but we could export it as an OPTIONAL kafka-consumer-perf-test parameter to enable it on a script level configuration and available to the end user. 2) Decreasing max.poll.records on a Consumer config level. This is not a fine option though since we do not want to touch the default settings. ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ConsumerPerformance resets offsets on every startup > --- > > Key: KAFKA-6724 > URL: https://issues.apache.org/jira/browse/KAFKA-6724 > Project: Kafka > Issue Type: Bug > Components: core, tools >Affects Versions: 0.11.0.1 >Reporter: Alex Dunayevsky >Priority: Minor > Fix For: 1.2.0 > > > ConsumerPerformance used in kafka-consumer-perf-test.sh resets offsets for > it's group on every startup. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6743) ConsumerPerformance fails to consume all messages on topics with large number of partitions
[ https://issues.apache.org/jira/browse/KAFKA-6743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424145#comment-16424145 ] Alex Dunayevsky commented on KAFKA-6743: https://github.com/apache/kafka/pull/4818 > ConsumerPerformance fails to consume all messages on topics with large number > of partitions > --- > > Key: KAFKA-6743 > URL: https://issues.apache.org/jira/browse/KAFKA-6743 > Project: Kafka > Issue Type: Bug > Components: core, tools >Affects Versions: 0.11.0.2 >Reporter: Alex Dunayevsky >Priority: Minor > > ConsumerPerformance fails to consume all messages on topics with large number > of partitions due to a relatively short default polling loop timeout (1000 > ms) that is not reachable and modifiable by the end user. > Demo: Create a topic of 10 000 partitions, send a 50 000 000 of 100 byte > records using kafka-producer-perf-test and consume them using > kafka-consumer-perf-test (ConsumerPerformance). You will likely notice that > the number of records returned by the kafka-consumer-perf-test is many times > less than expected 50 000 000. This happens due to specific > ConsumerPerformance implementation. As the result, in some rough cases it may > take a long enough time to process/iterate through the records polled in > batches, thus, the time may exceed the default hardcoded polling loop timeout > and this is probably not what we want from this utility. > We have two options: > 1) Increasing polling loop timeout in ConsumerPerformance implementation. It > defaults to 1000 ms and is hardcoded, thus cannot be changed but we could > export it as an OPTIONAL kafka-consumer-perf-test parameter to enable it on a > script level configuration and available to the end user. > 2) Decreasing max.poll.records on a Consumer config level. This is not a fine > option though since we do not want to touch the default settings. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6728) Kafka Connect Header Null Pointer Exception
[ https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava reassigned KAFKA-6728: Assignee: Randall Hauch > Kafka Connect Header Null Pointer Exception > --- > > Key: KAFKA-6728 > URL: https://issues.apache.org/jira/browse/KAFKA-6728 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 > Environment: Linux Mint >Reporter: Philippe Hong >Assignee: Randall Hauch >Priority: Critical > Fix For: 1.2.0, 1.1.1 > > > I am trying to use the newly released Kafka Connect that supports headers by > using the standalone connector to write to a text file (so in this case I am > only using the sink component) > I am sadly greeted by a NullPointerException : > {noformat} > ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > I launched zookeeper and kafka 1.1.0 locally and sent a > ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, > Array[Byte]] with a header that has a key and value. > I can read the record with a console consumer as well as using a > KafkaConsumer (where in this case I can see the content of the header of the > record I sent previously) so no problem here. > I only made two changes to the kafka configuration: > - I used the StringConverter for the key and the ByteArrayConverter for > the value. > - I also changed the topic where the sink would connect to. > If I forgot something please tell me so as it is the first time I am creating > an issue on Jira. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6728) Kafka Connect Header Null Pointer Exception
[ https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-6728. -- Resolution: Fixed Fix Version/s: 1.1.1 1.2.0 Issue resolved by pull request 4815 [https://github.com/apache/kafka/pull/4815] > Kafka Connect Header Null Pointer Exception > --- > > Key: KAFKA-6728 > URL: https://issues.apache.org/jira/browse/KAFKA-6728 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 > Environment: Linux Mint >Reporter: Philippe Hong >Priority: Critical > Fix For: 1.2.0, 1.1.1 > > > I am trying to use the newly released Kafka Connect that supports headers by > using the standalone connector to write to a text file (so in this case I am > only using the sink component) > I am sadly greeted by a NullPointerException : > {noformat} > ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > I launched zookeeper and kafka 1.1.0 locally and sent a > ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, > Array[Byte]] with a header that has a key and value. > I can read the record with a console consumer as well as using a > KafkaConsumer (where in this case I can see the content of the header of the > record I sent previously) so no problem here. > I only made two changes to the kafka configuration: > - I used the StringConverter for the key and the ByteArrayConverter for > the value. > - I also changed the topic where the sink would connect to. > If I forgot something please tell me so as it is the first time I am creating > an issue on Jira. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6728) Kafka Connect Header Null Pointer Exception
[ https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424197#comment-16424197 ] ASF GitHub Bot commented on KAFKA-6728: --- ewencp closed pull request #4815: KAFKA-6728: Corrected the worker’s instantiation of the HeaderConverter URL: https://github.com/apache/kafka/pull/4815 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 0a895f67cf8..fd05af57a64 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -76,7 +76,9 @@ public static final String HEADER_CONVERTER_CLASS_CONFIG = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG; public static final String HEADER_CONVERTER_CLASS_DOC = WorkerConfig.HEADER_CONVERTER_CLASS_DOC; public static final String HEADER_CONVERTER_CLASS_DISPLAY = "Header converter class"; -public static final String HEADER_CONVERTER_CLASS_DEFAULT = WorkerConfig.HEADER_CONVERTER_CLASS_DEFAULT; +// The Connector config should not have a default for the header converter, since the absence of a config property means that +// the worker config settings should be used. Thus, we set the default to null here. +public static final String HEADER_CONVERTER_CLASS_DEFAULT = null; public static final String TASKS_MAX_CONFIG = "tasks.max"; private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector."; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index e3d9cf45901..1c6465855ff 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -397,12 +397,21 @@ public boolean startTask( ); if (keyConverter == null) { keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); +log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id); +} else { +log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id); } if (valueConverter == null) { valueConverter = plugins.newConverter(config, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); +log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id); +} else { +log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id); } if (headerConverter == null) { headerConverter = plugins.newHeaderConverter(config, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); +log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), id); +} else { +log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), id); } workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, connectorLoader); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 94f27717080..f4cd2ba14b0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -234,6 +234,8 @@ public Converter newConverter(AbstractConfig config, String classPropertyName, C // Configure the Converter using only the old configuration mechanism ... String configPrefix = classPropertyName + "."; Map converterConfig = config.originalsWithPrefix(configPrefix); +log.debug("Configuring the {} converter with configuration:{}{}", + isKeyConverter ? "key" : "value", System.lineSeparator(), converterConfig); plugin.configure(converterConfig, isKeyConverter); return plugin; } @@ -24
[jira] [Commented] (KAFKA-6684) Support casting values with bytes schema to string
[ https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424234#comment-16424234 ] ASF GitHub Bot commented on KAFKA-6684: --- amitsela opened a new pull request #4820: KAFKA-6684 [WIP]: Cast transform bytes URL: https://github.com/apache/kafka/pull/4820 Allow to cast LogicalType to string by calling the serialized (Java) object's toString(). Added tests for `BigDecimal` and `Date` as whole record and as fields. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support casting values with bytes schema to string > --- > > Key: KAFKA-6684 > URL: https://issues.apache.org/jira/browse/KAFKA-6684 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Amit Sela >Priority: Major > > Casting from BYTES is not supported, which means that casting LogicalTypes is > not supported. > This proposes to allow casting anything to a string, kind of like Java's > {{toString()}}, such that if the object is actually a LogicalType it can be > "serialized" as string instead of bytes+schema. > > {noformat} > Examples: > BigDecimal will cast to the string representation of the number. > Timestamp will cast to the string representation of the timestamp, or maybe > UTC mmddTHH:MM:SS.f format? > {noformat} > > Worst case, bytes are "casted" to whatever the {{toString()}} returns - its > up to the user to know the data. > This would help when using a JSON sink, or anything that's not Avro. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6742) TopologyTestDriver error when dealing with stores from GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424242#comment-16424242 ] Guozhang Wang commented on KAFKA-6742: -- Hello [~vale68] Thanks for reporting the issue. Is it possible for you to submit a PR to illustrate your problem? And if it is indeed an issue we can go ahead and review / merge your PR: https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest By looking at the code above it is a bit difficult to fully understand your point. > TopologyTestDriver error when dealing with stores from GlobalKTable > --- > > Key: KAFKA-6742 > URL: https://issues.apache.org/jira/browse/KAFKA-6742 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Valentino Proietti >Priority: Minor > > {color:#ff}This junit test simply fails:{color} > @Test > *public* *void* globalTable() { > StreamsBuilder builder = *new* StreamsBuilder(); > @SuppressWarnings("unused") > *final* KTable localTable = builder > .table("local", > Consumed._with_(Serdes._String_(), Serdes._String_()), > Materialized._as_("localStore")) > ; > @SuppressWarnings("unused") > *final* GlobalKTable globalTable = builder > .globalTable("global", > Consumed._with_(Serdes._String_(), Serdes._String_()), > Materialized._as_("globalStore")) > ; > // > Properties props = *new* Properties(); > props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test"); > props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost"); > TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), > props); > // > *final* KeyValueStore localStore = > testDriver.getKeyValueStore("localStore"); > Assert._assertNotNull_(localStore); > Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore")); > // > *final* KeyValueStore globalStore = > testDriver.getKeyValueStore("globalStore"); > Assert._assertNotNull_(globalStore); > Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore")); > // > *final* ConsumerRecordFactory crf = *new* > ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer()); > testDriver.pipeInput(crf.create("local", "one", "TheOne")); > testDriver.pipeInput(crf.create("global", "one", "TheOne")); > // > Assert._assertEquals_("TheOne", localStore.get("one")); > Assert._assertEquals_("TheOne", globalStore.get("one")); > > > {color:#ff}to make it work I had to modify the TopologyTestDriver class > as follow:{color} > ... > *public* Map getAllStateStores() { > // final Map allStores = new HashMap<>(); > // for (final String storeName : > internalTopologyBuilder.allStateStoreName()) > { // allStores.put(storeName, ((ProcessorContextImpl) > task.context()).getStateMgr().getStore(storeName)); // } > // return allStores; > {color:#ff}// *FIXME*{color} > *final* ProcessorStateManager psm = ((ProcessorContextImpl) > task.context()).getStateMgr(); > *final* Map allStores = *new* HashMap<>(); > *for* (*final* String storeName : > internalTopologyBuilder.allStateStoreName()) { > StateStore res = psm.getStore(storeName); > if (res == null) > res = psm.getGlobalStore(storeName); > allStores.put(storeName, res); > } > *return* allStores; > } > ... > *public* StateStore getStateStore(*final* String name) { > // return ((ProcessorContextImpl) > task.context()).getStateMgr().getStore(name); > {color:#ff}// *FIXME*{color} > *final* ProcessorStateManager psm = ((ProcessorContextImpl) > task.context()).getStateMgr(); > StateStore res = psm.getStore(name); > *if* (res == *null*) > res = psm.getGlobalStore(name); > *return* res; > } > > {color:#ff}moreover I think it would be very useful to make the internal > MockProducer public for testing cases where a producer is used along side > with the "normal" stream processing by adding the method:{color} > /** > * *@return* records sent with this producer are automatically streamed > to the topology. > */ > *public* *final* Producer<*byte*[], *byte*[]> getProducer() { > return producer; > } > > {color:#ff}unfortunately this introduces another problem that could be > verified by adding the following lines to the previous junit test:{color} > ... > ** > // > ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); > // just to serialize keys and values > testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, > cr.timestamp(), cr.key(), cr.value())); > testDriver.getProducer(
[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to Long.MAX_VALUE
[ https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424263#comment-16424263 ] Khaireddine Rezgui commented on KAFKA-6535: --- Thank you [~vvcephei], i will take a look in the links > Set default retention ms for Streams repartition topics to Long.MAX_VALUE > - > > Key: KAFKA-6535 > URL: https://issues.apache.org/jira/browse/KAFKA-6535 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Khaireddine Rezgui >Priority: Major > Labels: needs-kip, newbie > > After KIP-220 / KIP-204, repartition topics in Streams are transient, so it > is better to set its default retention to infinity to allow any records be > pushed to it with old timestamps (think: bootstrapping, re-processing) and > just rely on the purging API to keeping its storage small. > More specifically, in {{RepartitionTopicConfig}} we have a few default > overrides for repartition topic configs, we should just add the override for > {{TopicConfig.RETENTION_MS_CONFIG}} to set it to Long.MAX_VALUE. This still > allows users to override themselves if they want via > {{StreamsConfig.TOPIC_PREFIX}}. We need to add unit test to verify this > update takes effect. > In addition to the code change, we also need to have doc changes in > streams/upgrade_guide.html specifying this default value change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6739) Broker receives error when handling request with java.lang.IllegalArgumentException: Magic v0 does not support record headers
[ https://issues.apache.org/jira/browse/KAFKA-6739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424336#comment-16424336 ] ASF GitHub Bot commented on KAFKA-6739: --- hachikuji closed pull request #4813: KAFKA-6739: Ignore the presence of headers when down-converting from V2 to V1/V0 URL: https://github.com/apache/kafka/pull/4813 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java index 2452798d485..89a5413e00c 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -130,8 +130,13 @@ private MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, R MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(), timestampType, recordBatchAndRecords.baseOffset, logAppendTime); -for (Record record : recordBatchAndRecords.records) -builder.append(record); +for (Record record : recordBatchAndRecords.records) { +// Down-convert this record. Ignore headers when down-converting to V0 and V1 since they are not supported +if (magic > RecordBatch.MAGIC_VALUE_V1) +builder.append(record); +else +builder.appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value()); +} builder.close(); return builder; diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index 53ac2003586..fdd3ede16cc 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -40,6 +42,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assert.assertArrayEquals; public class FileRecordsTest { @@ -358,6 +361,11 @@ public void testConversion() throws IOException { private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException { List offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L); + +Header[] headers = {new RecordHeader("headerKey1", "headerValue1".getBytes()), +new RecordHeader("headerKey2", "headerValue2".getBytes()), +new RecordHeader("headerKey3", "headerValue3".getBytes())}; + List records = asList( new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()), new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()), @@ -366,9 +374,10 @@ private void doTestConversion(CompressionType compressionType, byte toMagic) thr new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()), new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()), new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()), -new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes()), +new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), headers), new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()), -new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes())); +new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), headers)); +assertEquals("incorrect test setup", offsets.size(), records.size()); ByteBuffer buffer = ByteBuffer.allocate(1024); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, @@ -452,6 +461,7 @@ private void verifyConvertedRecords(List initialRecords, assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp()); assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME)); assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAM
[jira] [Resolved] (KAFKA-6739) Broker receives error when handling request with java.lang.IllegalArgumentException: Magic v0 does not support record headers
[ https://issues.apache.org/jira/browse/KAFKA-6739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6739. Resolution: Fixed Fix Version/s: 1.0.2 > Broker receives error when handling request with > java.lang.IllegalArgumentException: Magic v0 does not support record headers > - > > Key: KAFKA-6739 > URL: https://issues.apache.org/jira/browse/KAFKA-6739 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 >Reporter: Koelli Mungee >Assignee: Dhruvil Shah >Priority: Critical > Fix For: 1.0.2, 1.2.0, 1.1.1 > > > A broker running at 1.0.0 with the following properties > > {code:java} > log.message.format.version=1.0 > inter.broker.protocol.version=1.0 > {code} > receives this ERROR while handling fetch request for a message with a header > {code:java} > [2018-03-23 01:48:03,093] ERROR [KafkaApi-1] Error when handling request > {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=test=[{partition=11,fetch_offset=20645,max_bytes=1048576}]}]} > (kafka.server.KafkaApis) java.lang.IllegalArgumentException: Magic v0 does > not support record headers > at > org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403) > > at > org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586) > > at > org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134) > > at > org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109) > > at > org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518) > > at scala.Option.map(Option.scala:146) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508) > > at scala.Option.flatMap(Option.scala:171) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569) > > at > kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034) > > at > kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52) > > at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588) > > at > kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175) > > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:587) > > at > kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604) > > at > kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604) > > at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:596) > at kafka.server.KafkaApis.handle(KafkaApis.scala:100) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65) > at java.lang.Thread.run(Thread.java:745) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6739) Down-conversion fails for records with headers
[ https://issues.apache.org/jira/browse/KAFKA-6739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-6739: --- Summary: Down-conversion fails for records with headers (was: Broker receives error when handling request with java.lang.IllegalArgumentException: Magic v0 does not support record headers) > Down-conversion fails for records with headers > -- > > Key: KAFKA-6739 > URL: https://issues.apache.org/jira/browse/KAFKA-6739 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 >Reporter: Koelli Mungee >Assignee: Dhruvil Shah >Priority: Critical > Fix For: 1.0.2, 1.2.0, 1.1.1 > > > A broker running at 1.0.0 with the following properties > > {code:java} > log.message.format.version=1.0 > inter.broker.protocol.version=1.0 > {code} > receives this ERROR while handling fetch request for a message with a header > {code:java} > [2018-03-23 01:48:03,093] ERROR [KafkaApi-1] Error when handling request > {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=test=[{partition=11,fetch_offset=20645,max_bytes=1048576}]}]} > (kafka.server.KafkaApis) java.lang.IllegalArgumentException: Magic v0 does > not support record headers > at > org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403) > > at > org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586) > > at > org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134) > > at > org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109) > > at > org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518) > > at scala.Option.map(Option.scala:146) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508) > > at scala.Option.flatMap(Option.scala:171) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569) > > at > kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034) > > at > kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52) > > at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569) > > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588) > > at > kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175) > > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:587) > > at > kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604) > > at > kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604) > > at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:596) > at kafka.server.KafkaApis.handle(KafkaApis.scala:100) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65) > at java.lang.Thread.run(Thread.java:745) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6744) MockProducer with transaction enabled doesn't fail on commit if a record was failed
Pascal Gélinas created KAFKA-6744: - Summary: MockProducer with transaction enabled doesn't fail on commit if a record was failed Key: KAFKA-6744 URL: https://issues.apache.org/jira/browse/KAFKA-6744 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 1.0.0 Reporter: Pascal Gélinas The KafkaProducer#send documentation states the following: When used as part of a transaction, it is not necessary to define a callback or check the result of the future in order to detect errors from send. If any of the send calls failed with an irrecoverable error, the final commitTransaction() call will fail and throw the exception from the last failed send. So I was expecting the following to throw an exception: {{*MockProducer* producer = new MockProducer<>(false,}} {{ new StringSerializer(), new ByteArraySerializer());}} {{producer.initTransactions();}} {{producer.beginTransaction();}} {{producer.send(new ProducerRecord<>("foo", new byte[]{}));}} {{producer.errorNext(new RuntimeException());}} {{producer.commitTransaction(); // Expecting this to throw}} Unfortunately, the commitTransaction() call returns successfully. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3240) Replication issues
[ https://issues.apache.org/jira/browse/KAFKA-3240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424420#comment-16424420 ] Ari Uka commented on KAFKA-3240: I'm running into the same issue. https://issues.apache.org/jira/browse/KAFKA-6679 Restarting all machines seemed to fix the problem for us temporarily. It happened a week later and the issue came up again. We are running: [https://github.com/Shopify/sarama] for Producer and Consumer. FreeBSD 11.0-RELEASE-p8 Kafka 1.0.1 (we are going to attempt to upgrade to Kafka 1.1.0 and restart again.) OpenJDK 1.8.0_121 ZFS This is also in Azure, so the instance is a VM. At this point, we're going to attempt to move to a Linux cluster instead of running a FreeBSD setup. We're going to introduce 3 new machines into the cluster that are running Linux and see if the problem appears on those instances. May I ask what consumer/producer you guys are using, is anyone using the standard java consumer/producer? > Replication issues > -- > > Key: KAFKA-3240 > URL: https://issues.apache.org/jira/browse/KAFKA-3240 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1 > Environment: FreeBSD 10.2-RELEASE-p9 >Reporter: Jan Omar >Priority: Major > Labels: reliability > > Hi, > We are trying to replace our 3-broker cluster running on 0.6 with a new > cluster on 0.9.0.1 (but tried 0.8.2.2 and 0.9.0.0 as well). > - 3 kafka nodes with one zookeeper instance on each machine > - FreeBSD 10.2 p9 > - Nagle off (sysctl net.inet.tcp.delayed_ack=0) > - all kafka machines write a ZFS ZIL to a dedicated SSD > - 3 producers on 3 machines, writing to 1 topics, partitioning 3, replication > factor 3 > - acks all > - 10 Gigabit Ethernet, all machines on one switch, ping 0.05 ms worst case. > While using the ProducerPerformance or rdkafka_performance we are seeing very > strange Replication errors. Any hint on what's going on would be highly > appreciated. Any suggestion on how to debug this properly would help as well. > This is what our broker config looks like: > {code} > broker.id=5 > auto.create.topics.enable=false > delete.topic.enable=true > listeners=PLAINTEXT://:9092 > port=9092 > host.name=kafka-five.acc > advertised.host.name=10.5.3.18 > zookeeper.connect=zookeeper-four.acc:2181,zookeeper-five.acc:2181,zookeeper-six.acc:2181 > zookeeper.connection.timeout.ms=6000 > num.replica.fetchers=1 > replica.fetch.max.bytes=1 > replica.fetch.wait.max.ms=500 > replica.high.watermark.checkpoint.interval.ms=5000 > replica.socket.timeout.ms=30 > replica.socket.receive.buffer.bytes=65536 > replica.lag.time.max.ms=1000 > min.insync.replicas=2 > controller.socket.timeout.ms=3 > controller.message.queue.size=100 > log.dirs=/var/db/kafka > num.partitions=8 > message.max.bytes=1 > auto.create.topics.enable=false > log.index.interval.bytes=4096 > log.index.size.max.bytes=10485760 > log.retention.hours=168 > log.flush.interval.ms=1 > log.flush.interval.messages=2 > log.flush.scheduler.interval.ms=2000 > log.roll.hours=168 > log.retention.check.interval.ms=30 > log.segment.bytes=536870912 > zookeeper.connection.timeout.ms=100 > zookeeper.sync.time.ms=5000 > num.io.threads=8 > num.network.threads=4 > socket.request.max.bytes=104857600 > socket.receive.buffer.bytes=1048576 > socket.send.buffer.bytes=1048576 > queued.max.requests=10 > fetch.purgatory.purge.interval.requests=100 > producer.purgatory.purge.interval.requests=100 > replica.lag.max.messages=1000 > {code} > These are the errors we're seeing: > {code:borderStyle=solid} > ERROR [Replica Manager on Broker 5]: Error processing fetch operation on > partition [test,0] offset 50727 (kafka.server.ReplicaManager) > java.lang.IllegalStateException: Invalid message size: 0 > at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:141) > at kafka.log.LogSegment.translateOffset(LogSegment.scala:105) > at kafka.log.LogSegment.read(LogSegment.scala:126) > at kafka.log.Log.read(Log.scala:506) > at > kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:536) > at > kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:507) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > ka
[jira] [Created] (KAFKA-6745) kafka consumer rebalancing takes long time (from 3 secs to 5 minutes)
Ramkumar created KAFKA-6745: --- Summary: kafka consumer rebalancing takes long time (from 3 secs to 5 minutes) Key: KAFKA-6745 URL: https://issues.apache.org/jira/browse/KAFKA-6745 Project: Kafka Issue Type: Improvement Components: clients, core Affects Versions: 0.11.0.0 Reporter: Ramkumar Hi, We had an HTTP service 3 nodes around Kafka 0.8 . This http service acts as a REST api for the publishers and consumers to use middleware intead of using kafka client api. Here the when the consumers rebalance is not a major issue. We wanted to upgrade to kafka 0.11 , we have updated our http services (3 node cluster) to use new Kafka consumer API , but it takes rebalancing of consumer (multiple consumer under same Group) between secs to 5 mins (max.poll.interval.ms). Because of this time our http clients are timing out and do failover. This rebalancing time is major issue. It is not clear from the documentation ,that rebalance activity for the group takes place after max.poll.interval.ms or it starts after 3 secs and complete any time with in 5 minutes. We tried to reduce max.poll.interval.ms to 15 seconds. but this also triggers rebalance internally. Below are the other parameters we have set In our service max.poll.interval.ms = 30 sec seconds heartbeat.interval.ms = 1 minute session.timeout.ms = 4 minutes consumer.cache.timeout = 2 min below is the log ""2018-03-26 12:53:23,009 [qtp1404928347-11556] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group firstnetportal_001 ""2018-03-26 12:57:52,793 [qtp1404928347-11556] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Successfully joined group firstnetportal_001 with generation 7475 Please let me know if there are any other application/client use http interace in 3 nodes with out any having this issue -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6729) KTable should use user source topics if possible and not create changelog topic
[ https://issues.apache.org/jira/browse/KAFKA-6729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-6729: -- Assignee: Bill Bejeck > KTable should use user source topics if possible and not create changelog > topic > --- > > Key: KAFKA-6729 > URL: https://issues.apache.org/jira/browse/KAFKA-6729 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Bill Bejeck >Priority: Critical > Fix For: 1.2.0 > > > With KIP-182 we reworked Streams API largely and introduced a regression into > 1.0 code base. If a KTable is populated from a source topic, ie, > StreamsBuilder.table() -- the KTable does create its own changelog topic. > However, in older releases (0.11 or older), we don't create a changelog topic > but use the user specified source topic instead. > We want to reintroduce this optimization to reduce the load (storage and > write) on the broker side for this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6678) Upgrade dependencies with later release versions
[ https://issues.apache.org/jira/browse/KAFKA-6678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-6678: -- Component/s: build > Upgrade dependencies with later release versions > > > Key: KAFKA-6678 > URL: https://issues.apache.org/jira/browse/KAFKA-6678 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: Ted Yu >Priority: Major > Attachments: k-update.txt > > > {code} > The following dependencies have later release versions: > - net.sourceforge.argparse4j:argparse4j [0.7.0 -> 0.8.1] > - org.bouncycastle:bcpkix-jdk15on [1.58 -> 1.59] > - com.puppycrawl.tools:checkstyle [6.19 -> 8.8] > - org.owasp:dependency-check-gradle [3.0.2 -> 3.1.1] > - org.ajoberstar:grgit [1.9.3 -> 2.1.1] > - org.glassfish.jersey.containers:jersey-container-servlet [2.25.1 -> 2.26] > - org.eclipse.jetty:jetty-client [9.2.24.v20180105 -> 9.4.8.v20171121] > - org.eclipse.jetty:jetty-server [9.2.24.v20180105 -> 9.4.8.v20171121] > - org.eclipse.jetty:jetty-servlet [9.2.24.v20180105 -> 9.4.8.v20171121] > - org.eclipse.jetty:jetty-servlets [9.2.24.v20180105 -> 9.4.8.v20171121] > - org.openjdk.jmh:jmh-core [1.19 -> 1.20] > - org.openjdk.jmh:jmh-core-benchmarks [1.19 -> 1.20] > - org.openjdk.jmh:jmh-generator-annprocess [1.19 -> 1.20] > - org.lz4:lz4-java [1.4 -> 1.4.1] > - org.apache.maven:maven-artifact [3.5.2 -> 3.5.3] > - org.jacoco:org.jacoco.agent [0.7.9 -> 0.8.0] > - org.jacoco:org.jacoco.ant [0.7.9 -> 0.8.0] > - org.rocksdb:rocksdbjni [5.7.3 -> 5.11.3] > - org.scala-lang:scala-library [2.11.12 -> 2.12.4] > - com.typesafe.scala-logging:scala-logging_2.11 [3.7.2 -> 3.8.0] > - org.scala-lang:scala-reflect [2.11.12 -> 2.12.4] > - org.scalatest:scalatest_2.11 [3.0.4 -> 3.0.5] > {code} > Looks like we can consider upgrading scalatest, jmh-core and checkstyle -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6531) SocketServerTest#closingChannelException fails sometimes
[ https://issues.apache.org/jira/browse/KAFKA-6531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-6531: -- Description: >From >https://builds.apache.org/job/kafka-trunk-jdk9/361/testReport/junit/kafka.network/SocketServerTest/closingChannelException/ > : {code} java.lang.AssertionError: Channels not removed at kafka.utils.TestUtils$.fail(TestUtils.scala:355) at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865) at kafka.network.SocketServerTest.assertProcessorHealthy(SocketServerTest.scala:914) at kafka.network.SocketServerTest.$anonfun$closingChannelException$1(SocketServerTest.scala:763) at kafka.network.SocketServerTest.$anonfun$closingChannelException$1$adapted(SocketServerTest.scala:747) {code} Among the test output, I saw: {code} [2018-02-04 18:51:15,995] ERROR Processor 0 closed connection from /127.0.0.1:48261 (kafka.network.SocketServerTest$$anon$5$$anon$1:73) java.lang.IllegalStateException: There is already a connection for id 127.0.0.1:1-127.0.0.1:2-0 at org.apache.kafka.common.network.Selector.ensureNotRegistered(Selector.java:260) at org.apache.kafka.common.network.Selector.register(Selector.java:254) at kafka.network.SocketServerTest$TestableSelector.super$register(SocketServerTest.scala:1043) at kafka.network.SocketServerTest$TestableSelector.$anonfun$register$2(SocketServerTest.scala:1043) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at kafka.network.SocketServerTest$TestableSelector.runOp(SocketServerTest.scala:1037) at kafka.network.SocketServerTest$TestableSelector.register(SocketServerTest.scala:1043) at kafka.network.Processor.configureNewConnections(SocketServer.scala:723) at kafka.network.Processor.run(SocketServer.scala:532) {code} was: >From >https://builds.apache.org/job/kafka-trunk-jdk9/361/testReport/junit/kafka.network/SocketServerTest/closingChannelException/ > : {code} java.lang.AssertionError: Channels not removed at kafka.utils.TestUtils$.fail(TestUtils.scala:355) at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865) at kafka.network.SocketServerTest.assertProcessorHealthy(SocketServerTest.scala:914) at kafka.network.SocketServerTest.$anonfun$closingChannelException$1(SocketServerTest.scala:763) at kafka.network.SocketServerTest.$anonfun$closingChannelException$1$adapted(SocketServerTest.scala:747) {code} Among the test output, I saw: {code} [2018-02-04 18:51:15,995] ERROR Processor 0 closed connection from /127.0.0.1:48261 (kafka.network.SocketServerTest$$anon$5$$anon$1:73) java.lang.IllegalStateException: There is already a connection for id 127.0.0.1:1-127.0.0.1:2-0 at org.apache.kafka.common.network.Selector.ensureNotRegistered(Selector.java:260) at org.apache.kafka.common.network.Selector.register(Selector.java:254) at kafka.network.SocketServerTest$TestableSelector.super$register(SocketServerTest.scala:1043) at kafka.network.SocketServerTest$TestableSelector.$anonfun$register$2(SocketServerTest.scala:1043) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at kafka.network.SocketServerTest$TestableSelector.runOp(SocketServerTest.scala:1037) at kafka.network.SocketServerTest$TestableSelector.register(SocketServerTest.scala:1043) at kafka.network.Processor.configureNewConnections(SocketServer.scala:723) at kafka.network.Processor.run(SocketServer.scala:532) {code} > SocketServerTest#closingChannelException fails sometimes > > > Key: KAFKA-6531 > URL: https://issues.apache.org/jira/browse/KAFKA-6531 > Project: Kafka > Issue Type: Test > Components: core >Reporter: Ted Yu >Priority: Minor > > From > https://builds.apache.org/job/kafka-trunk-jdk9/361/testReport/junit/kafka.network/SocketServerTest/closingChannelException/ > : > {code} > java.lang.AssertionError: Channels not removed > at kafka.utils.TestUtils$.fail(TestUtils.scala:355) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865) > at > kafka.network.SocketServerTest.assertProcessorHealthy(SocketServerTest.scala:914) > at > kafka.network.SocketServerTest.$anonfun$closingChannelException$1(SocketServerTest.scala:763) > at > kafka.network.SocketServerTest.$anonfun$closingChannelException$1$adapted(SocketServerTest.scala:747) > {code} > Among the test output, I saw: > {code} > [2018-02-04 18:51:15,995] ERROR Processor 0 closed connection from > /127.0.0.1:48261 (kafka.network.SocketServerTest$$anon$5$$anon$1:73) > java.lang.IllegalStateException: There is already a connection for id > 127.0.0.1:1-127.0.0.1:2-0 > at > org.apache.k
[jira] [Commented] (KAFKA-6678) Upgrade dependencies with later release versions
[ https://issues.apache.org/jira/browse/KAFKA-6678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424881#comment-16424881 ] Ismael Juma commented on KAFKA-6678: Are you sure you ran the tool with the latest trunk? Some department require Java 8, so we can't upgrade. > Upgrade dependencies with later release versions > > > Key: KAFKA-6678 > URL: https://issues.apache.org/jira/browse/KAFKA-6678 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: Ted Yu >Priority: Major > Attachments: k-update.txt > > > {code} > The following dependencies have later release versions: > - net.sourceforge.argparse4j:argparse4j [0.7.0 -> 0.8.1] > - org.bouncycastle:bcpkix-jdk15on [1.58 -> 1.59] > - com.puppycrawl.tools:checkstyle [6.19 -> 8.8] > - org.owasp:dependency-check-gradle [3.0.2 -> 3.1.1] > - org.ajoberstar:grgit [1.9.3 -> 2.1.1] > - org.glassfish.jersey.containers:jersey-container-servlet [2.25.1 -> 2.26] > - org.eclipse.jetty:jetty-client [9.2.24.v20180105 -> 9.4.8.v20171121] > - org.eclipse.jetty:jetty-server [9.2.24.v20180105 -> 9.4.8.v20171121] > - org.eclipse.jetty:jetty-servlet [9.2.24.v20180105 -> 9.4.8.v20171121] > - org.eclipse.jetty:jetty-servlets [9.2.24.v20180105 -> 9.4.8.v20171121] > - org.openjdk.jmh:jmh-core [1.19 -> 1.20] > - org.openjdk.jmh:jmh-core-benchmarks [1.19 -> 1.20] > - org.openjdk.jmh:jmh-generator-annprocess [1.19 -> 1.20] > - org.lz4:lz4-java [1.4 -> 1.4.1] > - org.apache.maven:maven-artifact [3.5.2 -> 3.5.3] > - org.jacoco:org.jacoco.agent [0.7.9 -> 0.8.0] > - org.jacoco:org.jacoco.ant [0.7.9 -> 0.8.0] > - org.rocksdb:rocksdbjni [5.7.3 -> 5.11.3] > - org.scala-lang:scala-library [2.11.12 -> 2.12.4] > - com.typesafe.scala-logging:scala-logging_2.11 [3.7.2 -> 3.8.0] > - org.scala-lang:scala-reflect [2.11.12 -> 2.12.4] > - org.scalatest:scalatest_2.11 [3.0.4 -> 3.0.5] > {code} > Looks like we can consider upgrading scalatest, jmh-core and checkstyle -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6560) Use single-point queries than range queries for windowed aggregation operators
[ https://issues.apache.org/jira/browse/KAFKA-6560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424885#comment-16424885 ] ASF GitHub Bot commented on KAFKA-6560: --- guozhangwang closed pull request #4814: KAFKA-6560: Use single query for getters as well URL: https://github.com/apache/kafka/pull/4814 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index 27f8408320f..6953f7c58ce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -25,7 +25,6 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.streams.state.WindowStoreIterator; import java.util.Map; @@ -132,10 +131,7 @@ public T get(final Windowed windowedKey) { K key = windowedKey.key(); W window = (W) windowedKey.window(); -// this iterator should contain at most one element -try (WindowStoreIterator iter = windowStore.fetch(key, window.start(), window.start())) { -return iter.hasNext() ? iter.next().value : null; -} +return windowStore.fetch(key, window.start()); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java index c3d95d81b86..1d8b32b5fa5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java @@ -24,7 +24,6 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.streams.state.WindowStoreIterator; import java.util.Map; @@ -128,10 +127,7 @@ public V get(final Windowed windowedKey) { K key = windowedKey.key(); W window = (W) windowedKey.window(); -// this iterator should only contain one element -try (WindowStoreIterator iter = windowStore.fetch(key, window.start(), window.start())) { -return iter.hasNext() ? iter.next().value : null; -} +return windowStore.fetch(key, window.start()); } } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use single-point queries than range queries for windowed aggregation operators > -- > > Key: KAFKA-6560 > URL: https://issues.apache.org/jira/browse/KAFKA-6560 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Critical > Labels: needs-kip > Fix For: 1.2.0 > > > Today for windowed aggregations in Streams DSL, the underlying implementation > is leveraging the fetch(key, from, to) API to get all the related windows for > a single record to update. However, this is a very inefficient operation with > significant amount of CPU time iterating over window stores. On the other > hand, since the operator implementation itself have full knowledge of the > window specs it can actually translate this operation into multiple > single-point queries with the accurate window start timestamp, which would > largely reduce the overhead. > The proposed approach is to add a single fetch API to the WindowedStore and > use that in the KStreamWindowedAggregate / KStreamWindowedReduce operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6730) Simplify state store recovery
[ https://issues.apache.org/jira/browse/KAFKA-6730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6730: --- Description: In the current code base, we restore state stores in the main thread (in contrast to older code that did restore state stored in the rebalance call back). This has multiple advantages and allows us the further simplify restore code. In the original code base, during a long restore phase, it was possible that a instance misses a rebalance and drops out of the consumer group. To detect this case, we apply a check during the restore phase, that the end-offset of the changelog topic does not change. A changed offset implies a missed rebalance as another thread started to write into the changelog topic (ie, the current thread does not own the task/store/changelog-topic anymore). With the new code, that restores in the main-loop, it's ensured that `poll()` is called regularly and thus, a rebalance will be detected automatically. This make the check about an changing changelog-end-offset unnecessary. We can simplify the restore logic, to just consuming until `poll()` does not return any data. For this case, we fetch the end-offset to see if we did fully restore. If yes, we resume processing, if not, we continue the restore. was: In the current code base, we restore state stores in the main thread (in contrast to older code that did restore state stored in the rebalance call back). This has multiple advantages and allows us the further simplify restore code. In the original code base, during a long restore phase, it was possible that a instance misses a rebalance and drops out of the consumer group. To detect this case, we apply a check during the restore phase, that the end-offset of the changelog topic does not change. A changed offset implies a missed rebalance as another thread started to write into the changelog topic (ie, the current thread does not own the task/store/changelog-topic anymore). With the new code, that restores in the main-loop, it's ensured that poll() is called regularly and thus, a rebalance will be detected automatically. This make the check about an changing changleog-end-offset unnecessary. We can simplify the restore logic, to just consuming until `pol()` does not return any data. For this case, we fetch the end-offset to see if we did fully restore. If yes, we resume processing, if not, we continue the restore. > Simplify state store recovery > - > > Key: KAFKA-6730 > URL: https://issues.apache.org/jira/browse/KAFKA-6730 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Fix For: 1.2.0 > > > In the current code base, we restore state stores in the main thread (in > contrast to older code that did restore state stored in the rebalance call > back). This has multiple advantages and allows us the further simplify > restore code. > In the original code base, during a long restore phase, it was possible that > a instance misses a rebalance and drops out of the consumer group. To detect > this case, we apply a check during the restore phase, that the end-offset of > the changelog topic does not change. A changed offset implies a missed > rebalance as another thread started to write into the changelog topic (ie, > the current thread does not own the task/store/changelog-topic anymore). > With the new code, that restores in the main-loop, it's ensured that `poll()` > is called regularly and thus, a rebalance will be detected automatically. > This make the check about an changing changelog-end-offset unnecessary. > We can simplify the restore logic, to just consuming until `poll()` does not > return any data. For this case, we fetch the end-offset to see if we did > fully restore. If yes, we resume processing, if not, we continue the restore. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6742) TopologyTestDriver error when dealing with stores from GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16425091#comment-16425091 ] Valentino Proietti commented on KAFKA-6742: --- Hello [~guozhang], I will create the PR asap. Thank you > TopologyTestDriver error when dealing with stores from GlobalKTable > --- > > Key: KAFKA-6742 > URL: https://issues.apache.org/jira/browse/KAFKA-6742 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Valentino Proietti >Priority: Minor > > {color:#ff}This junit test simply fails:{color} > @Test > *public* *void* globalTable() { > StreamsBuilder builder = *new* StreamsBuilder(); > @SuppressWarnings("unused") > *final* KTable localTable = builder > .table("local", > Consumed._with_(Serdes._String_(), Serdes._String_()), > Materialized._as_("localStore")) > ; > @SuppressWarnings("unused") > *final* GlobalKTable globalTable = builder > .globalTable("global", > Consumed._with_(Serdes._String_(), Serdes._String_()), > Materialized._as_("globalStore")) > ; > // > Properties props = *new* Properties(); > props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test"); > props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost"); > TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), > props); > // > *final* KeyValueStore localStore = > testDriver.getKeyValueStore("localStore"); > Assert._assertNotNull_(localStore); > Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore")); > // > *final* KeyValueStore globalStore = > testDriver.getKeyValueStore("globalStore"); > Assert._assertNotNull_(globalStore); > Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore")); > // > *final* ConsumerRecordFactory crf = *new* > ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer()); > testDriver.pipeInput(crf.create("local", "one", "TheOne")); > testDriver.pipeInput(crf.create("global", "one", "TheOne")); > // > Assert._assertEquals_("TheOne", localStore.get("one")); > Assert._assertEquals_("TheOne", globalStore.get("one")); > > > {color:#ff}to make it work I had to modify the TopologyTestDriver class > as follow:{color} > ... > *public* Map getAllStateStores() { > // final Map allStores = new HashMap<>(); > // for (final String storeName : > internalTopologyBuilder.allStateStoreName()) > { // allStores.put(storeName, ((ProcessorContextImpl) > task.context()).getStateMgr().getStore(storeName)); // } > // return allStores; > {color:#ff}// *FIXME*{color} > *final* ProcessorStateManager psm = ((ProcessorContextImpl) > task.context()).getStateMgr(); > *final* Map allStores = *new* HashMap<>(); > *for* (*final* String storeName : > internalTopologyBuilder.allStateStoreName()) { > StateStore res = psm.getStore(storeName); > if (res == null) > res = psm.getGlobalStore(storeName); > allStores.put(storeName, res); > } > *return* allStores; > } > ... > *public* StateStore getStateStore(*final* String name) { > // return ((ProcessorContextImpl) > task.context()).getStateMgr().getStore(name); > {color:#ff}// *FIXME*{color} > *final* ProcessorStateManager psm = ((ProcessorContextImpl) > task.context()).getStateMgr(); > StateStore res = psm.getStore(name); > *if* (res == *null*) > res = psm.getGlobalStore(name); > *return* res; > } > > {color:#ff}moreover I think it would be very useful to make the internal > MockProducer public for testing cases where a producer is used along side > with the "normal" stream processing by adding the method:{color} > /** > * *@return* records sent with this producer are automatically streamed > to the topology. > */ > *public* *final* Producer<*byte*[], *byte*[]> getProducer() { > return producer; > } > > {color:#ff}unfortunately this introduces another problem that could be > verified by adding the following lines to the previous junit test:{color} > ... > ** > // > ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); > // just to serialize keys and values > testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, > cr.timestamp(), cr.key(), cr.value())); > testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, > cr.timestamp(), cr.key(), cr.value())); > testDriver.advanceWallClockTime(0); > Assert._assertEquals_("TheOne", localStore.get("one")); > Assert._assertEquals_("Second", localStore.get("two")); > Assert._assertEquals_("TheOne", globalStore.get("one")); > Assert._assertE