[jira] [Commented] (KAFKA-6728) Kafka Connect Header Null Pointer Exception

2018-04-03 Thread Philippe Hong (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423740#comment-16423740
 ] 

Philippe Hong commented on KAFKA-6728:
--

Thank you, adding
header.converter=org.apache.kafka.connect.storage.SimpleHeaderConverter
solved my issue as you mentioned. Good job on implementing a more convenient 
and lasting fix.

> Kafka Connect Header Null Pointer Exception
> ---
>
> Key: KAFKA-6728
> URL: https://issues.apache.org/jira/browse/KAFKA-6728
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
> Environment: Linux Mint
>Reporter: Philippe Hong
>Priority: Critical
>
> I am trying to use the newly released Kafka Connect that supports headers by 
> using the standalone connector to write to a text file (so in this case I am 
> only using the sink component)
> I am sadly greeted by a NullPointerException :
> {noformat}
> ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and 
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
> java.lang.NullPointerException
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> I launched zookeeper and kafka 1.1.0 locally and sent a 
> ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, 
> Array[Byte]] with a header that has a key and value.
> I can read the record with a console consumer as well as using a 
> KafkaConsumer (where in this case I can see the content of the header of the 
> record I sent previously) so no problem here.
> I only made two changes to the kafka configuration:
>      - I used the StringConverter for the key and the ByteArrayConverter for 
> the value. 
>      - I also changed the topic where the sink would connect to.
> If I forgot something please tell me so as it is the first time I am creating 
> an issue on Jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6728) Kafka Connect Header Null Pointer Exception

2018-04-03 Thread Philippe Hong (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423740#comment-16423740
 ] 

Philippe Hong edited comment on KAFKA-6728 at 4/3/18 9:35 AM:
--

Thank you, adding 
\{noformat}header.converter=org.apache.kafka.connect.storage.Simp 
leHeaderConverter\{noformat} solved my issue as you mentioned. Good job on 
implementing a more convenient and lasting fix.


was (Author: hwki):
Thank you, adding {}header.converter=org.apache.kafka.connect.storage.Simp 
leHeaderConverter{} solved my issue as you mentioned. Good job on implementing 
a more convenient and lasting fix.

> Kafka Connect Header Null Pointer Exception
> ---
>
> Key: KAFKA-6728
> URL: https://issues.apache.org/jira/browse/KAFKA-6728
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
> Environment: Linux Mint
>Reporter: Philippe Hong
>Priority: Critical
>
> I am trying to use the newly released Kafka Connect that supports headers by 
> using the standalone connector to write to a text file (so in this case I am 
> only using the sink component)
> I am sadly greeted by a NullPointerException :
> {noformat}
> ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and 
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
> java.lang.NullPointerException
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> I launched zookeeper and kafka 1.1.0 locally and sent a 
> ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, 
> Array[Byte]] with a header that has a key and value.
> I can read the record with a console consumer as well as using a 
> KafkaConsumer (where in this case I can see the content of the header of the 
> record I sent previously) so no problem here.
> I only made two changes to the kafka configuration:
>      - I used the StringConverter for the key and the ByteArrayConverter for 
> the value. 
>      - I also changed the topic where the sink would connect to.
> If I forgot something please tell me so as it is the first time I am creating 
> an issue on Jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6728) Kafka Connect Header Null Pointer Exception

2018-04-03 Thread Philippe Hong (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423740#comment-16423740
 ] 

Philippe Hong edited comment on KAFKA-6728 at 4/3/18 9:35 AM:
--

Thank you, adding header.converter=org.apache.kafka.connect.storage.Simp 
leHeaderConverter solved my issue as you mentioned. Good job on implementing a 
more convenient and lasting fix.


was (Author: hwki):
Thank you, adding 
\{noformat}header.converter=org.apache.kafka.connect.storage.Simp 
leHeaderConverter\{noformat} solved my issue as you mentioned. Good job on 
implementing a more convenient and lasting fix.

> Kafka Connect Header Null Pointer Exception
> ---
>
> Key: KAFKA-6728
> URL: https://issues.apache.org/jira/browse/KAFKA-6728
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
> Environment: Linux Mint
>Reporter: Philippe Hong
>Priority: Critical
>
> I am trying to use the newly released Kafka Connect that supports headers by 
> using the standalone connector to write to a text file (so in this case I am 
> only using the sink component)
> I am sadly greeted by a NullPointerException :
> {noformat}
> ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and 
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
> java.lang.NullPointerException
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> I launched zookeeper and kafka 1.1.0 locally and sent a 
> ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, 
> Array[Byte]] with a header that has a key and value.
> I can read the record with a console consumer as well as using a 
> KafkaConsumer (where in this case I can see the content of the header of the 
> record I sent previously) so no problem here.
> I only made two changes to the kafka configuration:
>      - I used the StringConverter for the key and the ByteArrayConverter for 
> the value. 
>      - I also changed the topic where the sink would connect to.
> If I forgot something please tell me so as it is the first time I am creating 
> an issue on Jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6728) Kafka Connect Header Null Pointer Exception

2018-04-03 Thread Philippe Hong (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423740#comment-16423740
 ] 

Philippe Hong edited comment on KAFKA-6728 at 4/3/18 9:35 AM:
--

Thank you, adding {}header.converter=org.apache.kafka.connect.storage.Simp 
leHeaderConverter{} solved my issue as you mentioned. Good job on implementing 
a more convenient and lasting fix.


was (Author: hwki):
Thank you, adding
header.converter=org.apache.kafka.connect.storage.SimpleHeaderConverter
solved my issue as you mentioned. Good job on implementing a more convenient 
and lasting fix.

> Kafka Connect Header Null Pointer Exception
> ---
>
> Key: KAFKA-6728
> URL: https://issues.apache.org/jira/browse/KAFKA-6728
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
> Environment: Linux Mint
>Reporter: Philippe Hong
>Priority: Critical
>
> I am trying to use the newly released Kafka Connect that supports headers by 
> using the standalone connector to write to a text file (so in this case I am 
> only using the sink component)
> I am sadly greeted by a NullPointerException :
> {noformat}
> ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and 
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
> java.lang.NullPointerException
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> I launched zookeeper and kafka 1.1.0 locally and sent a 
> ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, 
> Array[Byte]] with a header that has a key and value.
> I can read the record with a console consumer as well as using a 
> KafkaConsumer (where in this case I can see the content of the header of the 
> record I sent previously) so no problem here.
> I only made two changes to the kafka configuration:
>      - I used the StringConverter for the key and the ByteArrayConverter for 
> the value. 
>      - I also changed the topic where the sink would connect to.
> If I forgot something please tell me so as it is the first time I am creating 
> an issue on Jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6728) Kafka Connect Header Null Pointer Exception

2018-04-03 Thread Philippe Hong (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423740#comment-16423740
 ] 

Philippe Hong edited comment on KAFKA-6728 at 4/3/18 9:36 AM:
--

Thank you, adding 
header.converter=org.apache.kafka.connect.storage.SimpleHeaderConverter solved 
my issue as you mentioned. 
Good job on implementing a more convenient and lasting fix.


was (Author: hwki):
Thank you, adding 
header.converter=org.apache.kafka.connect.storage.SimpleHeaderConverter solved 
my issue as you mentioned. Good job on implementing a more convenient and 
lasting fix.

> Kafka Connect Header Null Pointer Exception
> ---
>
> Key: KAFKA-6728
> URL: https://issues.apache.org/jira/browse/KAFKA-6728
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
> Environment: Linux Mint
>Reporter: Philippe Hong
>Priority: Critical
>
> I am trying to use the newly released Kafka Connect that supports headers by 
> using the standalone connector to write to a text file (so in this case I am 
> only using the sink component)
> I am sadly greeted by a NullPointerException :
> {noformat}
> ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and 
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
> java.lang.NullPointerException
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> I launched zookeeper and kafka 1.1.0 locally and sent a 
> ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, 
> Array[Byte]] with a header that has a key and value.
> I can read the record with a console consumer as well as using a 
> KafkaConsumer (where in this case I can see the content of the header of the 
> record I sent previously) so no problem here.
> I only made two changes to the kafka configuration:
>      - I used the StringConverter for the key and the ByteArrayConverter for 
> the value. 
>      - I also changed the topic where the sink would connect to.
> If I forgot something please tell me so as it is the first time I am creating 
> an issue on Jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6728) Kafka Connect Header Null Pointer Exception

2018-04-03 Thread Philippe Hong (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423740#comment-16423740
 ] 

Philippe Hong edited comment on KAFKA-6728 at 4/3/18 9:36 AM:
--

Thank you, adding 
header.converter=org.apache.kafka.connect.storage.SimpleHeaderConverter solved 
my issue as you mentioned. Good job on implementing a more convenient and 
lasting fix.


was (Author: hwki):
Thank you, adding header.converter=org.apache.kafka.connect.storage.Simp 
leHeaderConverter solved my issue as you mentioned. Good job on implementing a 
more convenient and lasting fix.

> Kafka Connect Header Null Pointer Exception
> ---
>
> Key: KAFKA-6728
> URL: https://issues.apache.org/jira/browse/KAFKA-6728
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
> Environment: Linux Mint
>Reporter: Philippe Hong
>Priority: Critical
>
> I am trying to use the newly released Kafka Connect that supports headers by 
> using the standalone connector to write to a text file (so in this case I am 
> only using the sink component)
> I am sadly greeted by a NullPointerException :
> {noformat}
> ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and 
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
> java.lang.NullPointerException
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> I launched zookeeper and kafka 1.1.0 locally and sent a 
> ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, 
> Array[Byte]] with a header that has a key and value.
> I can read the record with a console consumer as well as using a 
> KafkaConsumer (where in this case I can see the content of the header of the 
> record I sent previously) so no problem here.
> I only made two changes to the kafka configuration:
>      - I used the StringConverter for the key and the ByteArrayConverter for 
> the value. 
>      - I also changed the topic where the sink would connect to.
> If I forgot something please tell me so as it is the first time I am creating 
> an issue on Jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6683) ReplicaFetcher crashes with "Attempted to complete a transaction which was not started"

2018-04-03 Thread Chema Sanchez (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423791#comment-16423791
 ] 

Chema Sanchez commented on KAFKA-6683:
--

[~hachikuji] I have been unable to reproduce the problem with the fixed code, 
it seems to work properly.
Thanks a lot.

> ReplicaFetcher crashes with "Attempted to complete a transaction which was 
> not started" 
> 
>
> Key: KAFKA-6683
> URL: https://issues.apache.org/jira/browse/KAFKA-6683
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0
> Environment: os: GNU/Linux 
> arch: x86_64
> Kernel: 4.9.77
> jvm: OpenJDK 1.8.0
>Reporter: Chema Sanchez
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 1.1.0
>
> Attachments: server.properties
>
>
> We have been experiencing this issue lately when restarting or replacing 
> brokers of our Kafka clusters during maintenance operations.
> Having restarted or replaced a broker, after some minutes performing normally 
> it may suddenly throw the following exception and stop replicating some 
> partitions:
> {code:none}
> 2018-03-15 17:23:01,482] ERROR [ReplicaFetcher replicaId=12, leaderId=10, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> java.lang.IllegalArgumentException: Attempted to complete a transaction which 
> was not started
>     at 
> kafka.log.ProducerStateManager.completeTxn(ProducerStateManager.scala:720)
>     at kafka.log.Log.$anonfun$loadProducersFromLog$4(Log.scala:540)
>     at 
> kafka.log.Log.$anonfun$loadProducersFromLog$4$adapted(Log.scala:540)
>     at scala.collection.immutable.List.foreach(List.scala:389)
>     at 
> scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:35)
>     at 
> scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:35)
>     at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:44)
>     at kafka.log.Log.loadProducersFromLog(Log.scala:540)
>     at kafka.log.Log.$anonfun$loadProducerState$5(Log.scala:521)
>     at kafka.log.Log.$anonfun$loadProducerState$5$adapted(Log.scala:514)
>     at scala.collection.Iterator.foreach(Iterator.scala:929)
>     at scala.collection.Iterator.foreach$(Iterator.scala:929)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:71)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at kafka.log.Log.loadProducerState(Log.scala:514)
>     at kafka.log.Log.$anonfun$truncateTo$2(Log.scala:1487)
>     at 
> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
>     at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
>     at kafka.log.Log.truncateTo(Log.scala:1467)
>     at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:454)
>     at 
> kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:445)
>     at 
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789)
>     at scala.collection.immutable.Map$Map1.foreach(Map.scala:120)
>     at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788)
>     at kafka.log.LogManager.truncateTo(LogManager.scala:445)
>     at 
> kafka.server.ReplicaFetcherThread.$anonfun$maybeTruncate$1(ReplicaFetcherThread.scala:281)
>     at scala.collection.Iterator.foreach(Iterator.scala:929)
>     at scala.collection.Iterator.foreach$(Iterator.scala:929)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:71)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at 
> kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:265)
>     at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:135)
>     at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
>     at 
> kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:132)
>     at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> [2018-03-15 17:23:01,497] INFO [ReplicaFetcher replicaId=12, leaderId=10, 
> fetcherId=0] Stopped (kafka.server.ReplicaFet

[jira] [Created] (KAFKA-6741) Transient test failure: SslTransportLayerTest.testNetworkThreadTimeRecorded

2018-04-03 Thread Manikumar (JIRA)
Manikumar created KAFKA-6741:


 Summary: Transient test failure: 
SslTransportLayerTest.testNetworkThreadTimeRecorded
 Key: KAFKA-6741
 URL: https://issues.apache.org/jira/browse/KAFKA-6741
 Project: Kafka
  Issue Type: Bug
Reporter: Manikumar


debug logs:

{code}
 [2018-04-03 14:51:33,365] DEBUG Added sensor with name connections-closed: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,368] DEBUG Added sensor with name connections-created: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,368] DEBUG Added sensor with name 
successful-authentication: (org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,368] DEBUG Added sensor with name failed-authentication: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,368] DEBUG Added sensor with name bytes-sent-received: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,369] DEBUG Added sensor with name bytes-sent: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,370] DEBUG Added sensor with name bytes-received: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,370] DEBUG Added sensor with name select-time: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,371] DEBUG Added sensor with name io-time: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,379] DEBUG Removed sensor with name connections-closed: 
(org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,379] DEBUG Removed sensor with name connections-created: 
(org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,379] DEBUG Removed sensor with name 
successful-authentication: (org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,380] DEBUG Removed sensor with name 
failed-authentication: (org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,380] DEBUG Removed sensor with name bytes-sent-received: 
(org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,380] DEBUG Removed sensor with name bytes-sent: 
(org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,380] DEBUG Removed sensor with name bytes-received: 
(org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,382] DEBUG Removed sensor with name select-time: 
(org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,382] DEBUG Removed sensor with name io-time: 
(org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,382] DEBUG Added sensor with name connections-closed: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,382] DEBUG Added sensor with name connections-created: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,383] DEBUG Added sensor with name 
successful-authentication: (org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,383] DEBUG Added sensor with name failed-authentication: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,383] DEBUG Added sensor with name bytes-sent-received: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,384] DEBUG Added sensor with name bytes-sent: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,384] DEBUG Added sensor with name bytes-received: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,384] DEBUG Added sensor with name select-time: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,384] DEBUG Added sensor with name io-time: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,444] DEBUG Added sensor with name connections-closed: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,445] DEBUG Added sensor with name connections-created: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,445] DEBUG Added sensor with name 
successful-authentication: (org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,445] DEBUG Added sensor with name failed-authentication: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,446] DEBUG Added sensor with name bytes-sent-received: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,446] DEBUG Added sensor with name bytes-sent: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,446] DEBUG Added sensor with name bytes-received: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,447] DEBUG Added sensor with name select-time: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,447] DEBUG Added sensor with name io-time: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,890] DEBUG Created socket with SO_RCVBUF = 326640, 
SO_SNDBUF = 65328, SO_TIMEOUT = 0 to node 0 
(org.apache.kafka.common.network.Selector:474)
 [2018-04-03 14:51:33,892] DEBUG Added sensor with name 
node-127.0.0.1:53543-127.0.

[jira] [Updated] (KAFKA-6741) Transient test failure: SslTransportLayerTest.testNetworkThreadTimeRecorded

2018-04-03 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-6741:
-
Description: 
debug logs:

{code}
 [2018-04-03 14:51:33,365] DEBUG Added sensor with name connections-closed: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,368] DEBUG Added sensor with name connections-created: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,368] DEBUG Added sensor with name 
successful-authentication: (org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,368] DEBUG Added sensor with name failed-authentication: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,368] DEBUG Added sensor with name bytes-sent-received: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,369] DEBUG Added sensor with name bytes-sent: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,370] DEBUG Added sensor with name bytes-received: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,370] DEBUG Added sensor with name select-time: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,371] DEBUG Added sensor with name io-time: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,379] DEBUG Removed sensor with name connections-closed: 
(org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,379] DEBUG Removed sensor with name connections-created: 
(org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,379] DEBUG Removed sensor with name 
successful-authentication: (org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,380] DEBUG Removed sensor with name 
failed-authentication: (org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,380] DEBUG Removed sensor with name bytes-sent-received: 
(org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,380] DEBUG Removed sensor with name bytes-sent: 
(org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,380] DEBUG Removed sensor with name bytes-received: 
(org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,382] DEBUG Removed sensor with name select-time: 
(org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,382] DEBUG Removed sensor with name io-time: 
(org.apache.kafka.common.metrics.Metrics:447)
 [2018-04-03 14:51:33,382] DEBUG Added sensor with name connections-closed: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,382] DEBUG Added sensor with name connections-created: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,383] DEBUG Added sensor with name 
successful-authentication: (org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,383] DEBUG Added sensor with name failed-authentication: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,383] DEBUG Added sensor with name bytes-sent-received: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,384] DEBUG Added sensor with name bytes-sent: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,384] DEBUG Added sensor with name bytes-received: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,384] DEBUG Added sensor with name select-time: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,384] DEBUG Added sensor with name io-time: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,444] DEBUG Added sensor with name connections-closed: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,445] DEBUG Added sensor with name connections-created: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,445] DEBUG Added sensor with name 
successful-authentication: (org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,445] DEBUG Added sensor with name failed-authentication: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,446] DEBUG Added sensor with name bytes-sent-received: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,446] DEBUG Added sensor with name bytes-sent: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,446] DEBUG Added sensor with name bytes-received: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,447] DEBUG Added sensor with name select-time: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,447] DEBUG Added sensor with name io-time: 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,890] DEBUG Created socket with SO_RCVBUF = 326640, 
SO_SNDBUF = 65328, SO_TIMEOUT = 0 to node 0 
(org.apache.kafka.common.network.Selector:474)
 [2018-04-03 14:51:33,892] DEBUG Added sensor with name 
node-127.0.0.1:53543-127.0.0.1:53544.bytes-sent 
(org.apache.kafka.common.metrics.Metrics:414)
 [2018-04-03 14:51:33,893] DEBUG Added sensor with name 
node-127.0.0.1:53543-127

[jira] [Created] (KAFKA-6742) TopologyTestDriver error when dealing with stores from GlobalKTable

2018-04-03 Thread Valentino Proietti (JIRA)
Valentino Proietti created KAFKA-6742:
-

 Summary: TopologyTestDriver error when dealing with stores from 
GlobalKTable
 Key: KAFKA-6742
 URL: https://issues.apache.org/jira/browse/KAFKA-6742
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.1.0
Reporter: Valentino Proietti


{color:#FF}This junit test simply fails:{color}

@Test

*public* *void* globalTable() {

StreamsBuilder builder = *new* StreamsBuilder();

@SuppressWarnings("unused")

*final* KTable localTable = builder

.table("local", 

Consumed._with_(Serdes._String_(), Serdes._String_()),

Materialized._as_("localStore"))

;

@SuppressWarnings("unused")

*final* GlobalKTable globalTable = builder

.globalTable("global", 

Consumed._with_(Serdes._String_(), Serdes._String_()),

        Materialized._as_("globalStore"))

;

//

Properties props = *new* Properties();

props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test");

props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost");

TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), 
props);

//

*final* KeyValueStore localStore = 
testDriver.getKeyValueStore("localStore");

Assert._assertNotNull_(localStore);

Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore"));

//

*final* KeyValueStore globalStore = 
testDriver.getKeyValueStore("globalStore");

Assert._assertNotNull_(globalStore);

Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore"));

//

    *final* ConsumerRecordFactory crf = *new* 
ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer());

testDriver.pipeInput(crf.create("local", "one", "TheOne"));

testDriver.pipeInput(crf.create("global", "one", "TheOne"));

//

Assert._assertEquals_("TheOne", localStore.get("one"));

Assert._assertEquals_("TheOne", globalStore.get("one"));

 

 

{color:#FF}to make it work I had to modify the TopologyTestDriver class as 
follow:{color}

...

    *public* Map getAllStateStores() {

//        final Map allStores = new HashMap<>();

//        for (final String storeName : 
internalTopologyBuilder.allStateStoreName()) {

//            allStores.put(storeName, ((ProcessorContextImpl) 
task.context()).getStateMgr().getStore(storeName));

//        }

//        return allStores;

    {color:#FF}// *FIXME*{color}

    *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
task.context()).getStateMgr();

        *final* Map allStores = *new* HashMap<>();

        *for* (*final* String storeName : 
internalTopologyBuilder.allStateStoreName()) {

            StateStore res = psm.getStore(storeName);

            *if* (res == *null*)

            res = psm.getGlobalStore(storeName);

            allStores.put(storeName, res);

        }

        *return* allStores;

    }

...

    *public* StateStore getStateStore(*final* String name) {

//        return ((ProcessorContextImpl) 
task.context()).getStateMgr().getStore(name);

        {color:#FF}// *FIXME*{color}

    *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
task.context()).getStateMgr();

        StateStore res = psm.getStore(name);

        *if* (res == *null*)

        res = psm.getGlobalStore(name);

        *return* res;

    }

 

{color:#FF}moreover I think it would be very useful to make the internal 
MockProducer public for testing cases where a producer is used along side with 
the "normal" stream processing by adding the method:{color}

    /**

     * *@return* records sent with this producer are automatically streamed to 
the topology.

     */

    *public* *final* Producer<*byte*[], *byte*[]> getProducer() {

    *return* producer;

    }

 

{color:#FF}unfortunately this introduces another problem that could be 
verified by adding the following lines to the previous junit test:{color}

...

**

//

ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); // 
just to serialize keys and values

testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, 
cr.timestamp(), cr.key(), cr.value()));

testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, 
cr.timestamp(), cr.key(), cr.value()));

testDriver.advanceWallClockTime(0);

Assert._assertEquals_("TheOne", localStore.get("one"));

Assert._assertEquals_("Second", localStore.get("two"));

Assert._assertEquals_("TheOne", globalStore.get("one"));

Assert._assertEquals_("Second", globalStore.get("two"));

}

 

{color:#FF}that could be fixed with:{color}

 

    *private* *void* captureOutputRecords() {

        // Capture all the records sent to the producer ...

        *final* List> output = 
producer.history();

        producer.clear();

        *for* (*final* ProducerRecord<*byte*[], *byte*[]> record : output) {

            Queue> outputRecords = 
outputRecordsByTopic.get(record.topic());


[jira] [Updated] (KAFKA-6742) TopologyTestDriver error when dealing with stores from GlobalKTable

2018-04-03 Thread Valentino Proietti (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valentino Proietti updated KAFKA-6742:
--
Description: 
{color:#ff}This junit test simply fails:{color}

@Test

*public* *void* globalTable() {

StreamsBuilder builder = *new* StreamsBuilder();

@SuppressWarnings("unused")

*final* KTable localTable = builder

.table("local", 

Consumed._with_(Serdes._String_(), Serdes._String_()),

Materialized._as_("localStore"))

;

@SuppressWarnings("unused")

*final* GlobalKTable globalTable = builder

.globalTable("global", 

Consumed._with_(Serdes._String_(), Serdes._String_()),

        Materialized._as_("globalStore"))

;

//

Properties props = *new* Properties();

props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test");

props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost");

TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), 
props);

//

*final* KeyValueStore localStore = 
testDriver.getKeyValueStore("localStore");

Assert._assertNotNull_(localStore);

Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore"));

//

*final* KeyValueStore globalStore = 
testDriver.getKeyValueStore("globalStore");

Assert._assertNotNull_(globalStore);

Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore"));

//

    *final* ConsumerRecordFactory crf = *new* 
ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer());

testDriver.pipeInput(crf.create("local", "one", "TheOne"));

testDriver.pipeInput(crf.create("global", "one", "TheOne"));

//

Assert._assertEquals_("TheOne", localStore.get("one"));

Assert._assertEquals_("TheOne", globalStore.get("one"));

 

 

{color:#ff}to make it work I had to modify the TopologyTestDriver class as 
follow:{color}

...

    *public* Map getAllStateStores() {

//        final Map allStores = new HashMap<>();

//        for (final String storeName : 
internalTopologyBuilder.allStateStoreName())

{ //            allStores.put(storeName, ((ProcessorContextImpl) 
task.context()).getStateMgr().getStore(storeName)); //        }

//        return allStores;

    {color:#ff}// *FIXME*{color}

    *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
task.context()).getStateMgr();

        *final* Map allStores = *new* HashMap<>();

        *for* (*final* String storeName : 
internalTopologyBuilder.allStateStoreName()) {            

StateStore res = psm.getStore(storeName);            

if (res == null)            

  res = psm.getGlobalStore(storeName);            

allStores.put(storeName, res);        

}

        *return* allStores;

    }

...

    *public* StateStore getStateStore(*final* String name) {

//        return ((ProcessorContextImpl) 
task.context()).getStateMgr().getStore(name);

        {color:#ff}// *FIXME*{color}

    *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
task.context()).getStateMgr();

        StateStore res = psm.getStore(name);

        *if* (res == *null*)

        res = psm.getGlobalStore(name);

        *return* res;

    }

 

{color:#ff}moreover I think it would be very useful to make the internal 
MockProducer public for testing cases where a producer is used along side with 
the "normal" stream processing by adding the method:{color}

    /**

     * *@return* records sent with this producer are automatically streamed to 
the topology.

     */

    *public* *final* Producer<*byte*[], *byte*[]> getProducer() {     

return producer;    

}

 

{color:#ff}unfortunately this introduces another problem that could be 
verified by adding the following lines to the previous junit test:{color}

...

**

//

ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); // 
just to serialize keys and values

testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, 
cr.timestamp(), cr.key(), cr.value()));

testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, 
cr.timestamp(), cr.key(), cr.value()));

testDriver.advanceWallClockTime(0);

Assert._assertEquals_("TheOne", localStore.get("one"));

Assert._assertEquals_("Second", localStore.get("two"));

Assert._assertEquals_("TheOne", globalStore.get("one"));

Assert._assertEquals_("Second", globalStore.get("two"));

}

 

{color:#ff}that could be fixed with:{color}

 

    *private* *void* captureOutputRecords() {

        // Capture all the records sent to the producer ...

        *final* List> output = 
producer.history();

        producer.clear();

        *for* (*final* ProducerRecord<*byte*[], *byte*[]> record : output) {

            Queue> outputRecords = 
outputRecordsByTopic.get(record.topic());

            *if* (outputRecords == *null*)

{                 outputRecords = *new* LinkedList<>();                 
outputRecordsByTopic.put(record.topic(), outputRecords);             }

            outputRecor

[jira] [Assigned] (KAFKA-6740) Plugins class' newConverter and newHeaderConverter methods are unclear

2018-04-03 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch reassigned KAFKA-6740:


Assignee: Randall Hauch

> Plugins class' newConverter and newHeaderConverter methods are unclear
> --
>
> Key: KAFKA-6740
> URL: https://issues.apache.org/jira/browse/KAFKA-6740
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Major
>
> The ClassLoaderUsage enum is a bit unclear as to what the code is doing. The 
> `CURRENT_CLASSLOADER` is really about using the connector's configuration, 
> whereas the `PLUGINS` literal is more about using the worker's configuration. 
> We should clean this up, and we should also look at moving the if-block into 
> the `CURRENT_CLASSLOADER` case in {{newConverter}}, just as we did in the 
> {{newHeaderConverter}} method in KAFKA-6728.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6743) ConsumerPerformance fails to consume all messages on topics with large number of partitions

2018-04-03 Thread Alex Dunayevsky (JIRA)
Alex Dunayevsky created KAFKA-6743:
--

 Summary: ConsumerPerformance fails to consume all messages on 
topics with large number of partitions
 Key: KAFKA-6743
 URL: https://issues.apache.org/jira/browse/KAFKA-6743
 Project: Kafka
  Issue Type: Bug
  Components: core, tools
Affects Versions: 0.11.0.2
Reporter: Alex Dunayevsky


ConsumerPerformance fails to consume all messages on topics with large number 
of partitions due to a relatively short default polling loop timeout (1000 ms) 
that is not reachable and modifiable by the end user.

Demo: Create a topic of 10 000 partitions, send a 50 000 000 of 100 byte 
records using kafka-producer-perf-test and consume them using 
kafka-consumer-perf-test (ConsumerPerformance). You will likely notice that the 
number of records returned by the kafka-consumer-perf-test is many times less 
than expected 50 000 000. This happens due to specific ConsumerPerformance 
implementation. As the result, in some rough cases it may take a long enough 
time to process/iterate through the records polled in batches, thus, the time 
may exceed the default hardcoded polling loop timeout and this is probably not 
what we want from this utility.

We have two options: 
1) Increasing polling loop timeout in ConsumerPerformance implementation. It 
defaults to 1000 ms and is hardcoded, thus cannot be changed but we could 
export it as an OPTIONAL kafka-consumer-perf-test parameter to enable it on a 
script level configuration and available to the end user.
2) Decreasing max.poll.records on a Consumer config level. This is not a fine 
option though since we do not want to touch the default settings.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6724) ConsumerPerformance resets offsets on every startup

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424114#comment-16424114
 ] 

ASF GitHub Bot commented on KAFKA-6724:
---

rootex- opened a new pull request #4818: KAFKA-6724 ConsumerPerformance resets 
offsets on every startup
URL: https://github.com/apache/kafka/pull/4818
 
 
   ## Remove consumer offset reset on startup
   
   ### ConsumerPerformance fails to consume all messages 
   on topics with large number of partitions due to a relatively short default 
polling loop timeout (1000 ms) that is not reachable and modifiable by the end 
user. 
   
   ### Demo
   Create a topic of 10 000 partitions, send a 50 000 000 of 100 byte records 
using kafka-producer-perf-test and consume them using kafka-consumer-perf-test 
(ConsumerPerformance). You will likely notice that the number of records 
returned by the kafka-consumer-perf-test is many times less than expected 50 
000 000. 
   
   This happens due to specific ConsumerPerformance implementation. As the 
result, in some rough cases it may take a long enough time to process/iterate 
through the records polled in batches, thus, the time may exceed the default 
hardcoded polling loop timeout and this is probably not what we want from this 
utility. 
   
   ### Possible options
   1) Increasing polling loop timeout in ConsumerPerformance implementation. It 
defaults to 1000 ms and is hardcoded, thus cannot be changed but we could 
export it as an OPTIONAL kafka-consumer-perf-test parameter to enable it on a 
script level configuration and available to the end user.
   2) Decreasing max.poll.records on a Consumer config level. This is not a 
fine option though since we do not want to touch the default settings.
   
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ConsumerPerformance resets offsets on every startup
> ---
>
> Key: KAFKA-6724
> URL: https://issues.apache.org/jira/browse/KAFKA-6724
> Project: Kafka
>  Issue Type: Bug
>  Components: core, tools
>Affects Versions: 0.11.0.1
>Reporter: Alex Dunayevsky
>Priority: Minor
> Fix For: 1.2.0
>
>
> ConsumerPerformance used in kafka-consumer-perf-test.sh resets offsets for 
> it's group on every startup. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6743) ConsumerPerformance fails to consume all messages on topics with large number of partitions

2018-04-03 Thread Alex Dunayevsky (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424145#comment-16424145
 ] 

Alex Dunayevsky commented on KAFKA-6743:


https://github.com/apache/kafka/pull/4818

> ConsumerPerformance fails to consume all messages on topics with large number 
> of partitions
> ---
>
> Key: KAFKA-6743
> URL: https://issues.apache.org/jira/browse/KAFKA-6743
> Project: Kafka
>  Issue Type: Bug
>  Components: core, tools
>Affects Versions: 0.11.0.2
>Reporter: Alex Dunayevsky
>Priority: Minor
>
> ConsumerPerformance fails to consume all messages on topics with large number 
> of partitions due to a relatively short default polling loop timeout (1000 
> ms) that is not reachable and modifiable by the end user.
> Demo: Create a topic of 10 000 partitions, send a 50 000 000 of 100 byte 
> records using kafka-producer-perf-test and consume them using 
> kafka-consumer-perf-test (ConsumerPerformance). You will likely notice that 
> the number of records returned by the kafka-consumer-perf-test is many times 
> less than expected 50 000 000. This happens due to specific 
> ConsumerPerformance implementation. As the result, in some rough cases it may 
> take a long enough time to process/iterate through the records polled in 
> batches, thus, the time may exceed the default hardcoded polling loop timeout 
> and this is probably not what we want from this utility.
> We have two options: 
> 1) Increasing polling loop timeout in ConsumerPerformance implementation. It 
> defaults to 1000 ms and is hardcoded, thus cannot be changed but we could 
> export it as an OPTIONAL kafka-consumer-perf-test parameter to enable it on a 
> script level configuration and available to the end user.
> 2) Decreasing max.poll.records on a Consumer config level. This is not a fine 
> option though since we do not want to touch the default settings.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6728) Kafka Connect Header Null Pointer Exception

2018-04-03 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava reassigned KAFKA-6728:


Assignee: Randall Hauch

> Kafka Connect Header Null Pointer Exception
> ---
>
> Key: KAFKA-6728
> URL: https://issues.apache.org/jira/browse/KAFKA-6728
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
> Environment: Linux Mint
>Reporter: Philippe Hong
>Assignee: Randall Hauch
>Priority: Critical
> Fix For: 1.2.0, 1.1.1
>
>
> I am trying to use the newly released Kafka Connect that supports headers by 
> using the standalone connector to write to a text file (so in this case I am 
> only using the sink component)
> I am sadly greeted by a NullPointerException :
> {noformat}
> ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and 
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
> java.lang.NullPointerException
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> I launched zookeeper and kafka 1.1.0 locally and sent a 
> ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, 
> Array[Byte]] with a header that has a key and value.
> I can read the record with a console consumer as well as using a 
> KafkaConsumer (where in this case I can see the content of the header of the 
> record I sent previously) so no problem here.
> I only made two changes to the kafka configuration:
>      - I used the StringConverter for the key and the ByteArrayConverter for 
> the value. 
>      - I also changed the topic where the sink would connect to.
> If I forgot something please tell me so as it is the first time I am creating 
> an issue on Jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6728) Kafka Connect Header Null Pointer Exception

2018-04-03 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-6728.
--
   Resolution: Fixed
Fix Version/s: 1.1.1
   1.2.0

Issue resolved by pull request 4815
[https://github.com/apache/kafka/pull/4815]

> Kafka Connect Header Null Pointer Exception
> ---
>
> Key: KAFKA-6728
> URL: https://issues.apache.org/jira/browse/KAFKA-6728
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
> Environment: Linux Mint
>Reporter: Philippe Hong
>Priority: Critical
> Fix For: 1.2.0, 1.1.1
>
>
> I am trying to use the newly released Kafka Connect that supports headers by 
> using the standalone connector to write to a text file (so in this case I am 
> only using the sink component)
> I am sadly greeted by a NullPointerException :
> {noformat}
> ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and 
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
> java.lang.NullPointerException
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> I launched zookeeper and kafka 1.1.0 locally and sent a 
> ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, 
> Array[Byte]] with a header that has a key and value.
> I can read the record with a console consumer as well as using a 
> KafkaConsumer (where in this case I can see the content of the header of the 
> record I sent previously) so no problem here.
> I only made two changes to the kafka configuration:
>      - I used the StringConverter for the key and the ByteArrayConverter for 
> the value. 
>      - I also changed the topic where the sink would connect to.
> If I forgot something please tell me so as it is the first time I am creating 
> an issue on Jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6728) Kafka Connect Header Null Pointer Exception

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424197#comment-16424197
 ] 

ASF GitHub Bot commented on KAFKA-6728:
---

ewencp closed pull request #4815: KAFKA-6728: Corrected the worker’s 
instantiation of the HeaderConverter
URL: https://github.com/apache/kafka/pull/4815
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 0a895f67cf8..fd05af57a64 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -76,7 +76,9 @@
 public static final String HEADER_CONVERTER_CLASS_CONFIG = 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;
 public static final String HEADER_CONVERTER_CLASS_DOC = 
WorkerConfig.HEADER_CONVERTER_CLASS_DOC;
 public static final String HEADER_CONVERTER_CLASS_DISPLAY = "Header 
converter class";
-public static final String HEADER_CONVERTER_CLASS_DEFAULT = 
WorkerConfig.HEADER_CONVERTER_CLASS_DEFAULT;
+// The Connector config should not have a default for the header 
converter, since the absence of a config property means that
+// the worker config settings should be used. Thus, we set the default to 
null here.
+public static final String HEADER_CONVERTER_CLASS_DEFAULT = null;
 
 public static final String TASKS_MAX_CONFIG = "tasks.max";
 private static final String TASKS_MAX_DOC = "Maximum number of tasks to 
use for this connector.";
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index e3d9cf45901..1c6465855ff 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -397,12 +397,21 @@ public boolean startTask(
 );
 if (keyConverter == null) {
 keyConverter = plugins.newConverter(config, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
+log.info("Set up the key converter {} for task {} using the 
worker config", keyConverter.getClass(), id);
+} else {
+log.info("Set up the key converter {} for task {} using the 
connector config", keyConverter.getClass(), id);
 }
 if (valueConverter == null) {
 valueConverter = plugins.newConverter(config, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
+log.info("Set up the value converter {} for task {} using the 
worker config", valueConverter.getClass(), id);
+} else {
+log.info("Set up the value converter {} for task {} using the 
connector config", valueConverter.getClass(), id);
 }
 if (headerConverter == null) {
 headerConverter = plugins.newHeaderConverter(config, 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
+log.info("Set up the header converter {} for task {} using the 
worker config", headerConverter.getClass(), id);
+} else {
+log.info("Set up the header converter {} for task {} using the 
connector config", headerConverter.getClass(), id);
 }
 
 workerTask = buildWorkerTask(connConfig, id, task, statusListener, 
initialState, keyConverter, valueConverter, headerConverter, connectorLoader);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 94f27717080..f4cd2ba14b0 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -234,6 +234,8 @@ public Converter newConverter(AbstractConfig config, String 
classPropertyName, C
 // Configure the Converter using only the old configuration mechanism 
...
 String configPrefix = classPropertyName + ".";
 Map converterConfig = 
config.originalsWithPrefix(configPrefix);
+log.debug("Configuring the {} converter with configuration:{}{}",
+  isKeyConverter ? "key" : "value", System.lineSeparator(), 
converterConfig);
 plugin.configure(converterConfig, isKeyConverter);
 return plugin;
 }
@@ -24

[jira] [Commented] (KAFKA-6684) Support casting values with bytes schema to string

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424234#comment-16424234
 ] 

ASF GitHub Bot commented on KAFKA-6684:
---

amitsela opened a new pull request #4820: KAFKA-6684 [WIP]: Cast transform bytes
URL: https://github.com/apache/kafka/pull/4820
 
 
   Allow to cast LogicalType to string by calling the serialized (Java) 
object's toString().   
   
   Added tests for `BigDecimal` and `Date` as whole record and as fields. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support casting values with bytes schema to string 
> ---
>
> Key: KAFKA-6684
> URL: https://issues.apache.org/jira/browse/KAFKA-6684
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Amit Sela
>Priority: Major
>
> Casting from BYTES is not supported, which means that casting LogicalTypes is 
> not supported.
> This proposes to allow casting anything to a string, kind of like Java's 
> {{toString()}}, such that if the object is actually a LogicalType it can be 
> "serialized" as string instead of bytes+schema.
>  
> {noformat}
> Examples:
> BigDecimal will cast to the string representation of the number.
> Timestamp will cast to the string representation of the timestamp, or maybe 
> UTC mmddTHH:MM:SS.f format?
> {noformat}
>  
> Worst case, bytes are "casted" to whatever the {{toString()}} returns - its 
> up to the user to know the data.
> This would help when using a JSON sink, or anything that's not Avro.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6742) TopologyTestDriver error when dealing with stores from GlobalKTable

2018-04-03 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424242#comment-16424242
 ] 

Guozhang Wang commented on KAFKA-6742:
--

Hello [~vale68] Thanks for reporting the issue. Is it possible for you to 
submit a PR to illustrate your problem? And if it is indeed an issue we can go 
ahead and review / merge your PR: 
https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest

By looking at the code above it is a bit difficult to fully understand your 
point.

> TopologyTestDriver error when dealing with stores from GlobalKTable
> ---
>
> Key: KAFKA-6742
> URL: https://issues.apache.org/jira/browse/KAFKA-6742
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Valentino Proietti
>Priority: Minor
>
> {color:#ff}This junit test simply fails:{color}
> @Test
> *public* *void* globalTable() {
> StreamsBuilder builder = *new* StreamsBuilder();
> @SuppressWarnings("unused")
> *final* KTable localTable = builder
> .table("local", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
> Materialized._as_("localStore"))
> ;
> @SuppressWarnings("unused")
> *final* GlobalKTable globalTable = builder
> .globalTable("global", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
>         Materialized._as_("globalStore"))
> ;
> //
> Properties props = *new* Properties();
> props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test");
> props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost");
> TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), 
> props);
> //
> *final* KeyValueStore localStore = 
> testDriver.getKeyValueStore("localStore");
> Assert._assertNotNull_(localStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore"));
> //
> *final* KeyValueStore globalStore = 
> testDriver.getKeyValueStore("globalStore");
> Assert._assertNotNull_(globalStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore"));
> //
>     *final* ConsumerRecordFactory crf = *new* 
> ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer());
> testDriver.pipeInput(crf.create("local", "one", "TheOne"));
> testDriver.pipeInput(crf.create("global", "one", "TheOne"));
> //
> Assert._assertEquals_("TheOne", localStore.get("one"));
> Assert._assertEquals_("TheOne", globalStore.get("one"));
>  
>  
> {color:#ff}to make it work I had to modify the TopologyTestDriver class 
> as follow:{color}
> ...
>     *public* Map getAllStateStores() {
> //        final Map allStores = new HashMap<>();
> //        for (final String storeName : 
> internalTopologyBuilder.allStateStoreName())
> { //            allStores.put(storeName, ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(storeName)); //        }
> //        return allStores;
>     {color:#ff}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         *final* Map allStores = *new* HashMap<>();
>         *for* (*final* String storeName : 
> internalTopologyBuilder.allStateStoreName()) {            
> StateStore res = psm.getStore(storeName);            
> if (res == null)            
>   res = psm.getGlobalStore(storeName);            
> allStores.put(storeName, res);        
> }
>         *return* allStores;
>     }
> ...
>     *public* StateStore getStateStore(*final* String name) {
> //        return ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(name);
>         {color:#ff}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         StateStore res = psm.getStore(name);
>         *if* (res == *null*)
>         res = psm.getGlobalStore(name);
>         *return* res;
>     }
>  
> {color:#ff}moreover I think it would be very useful to make the internal 
> MockProducer public for testing cases where a producer is used along side 
> with the "normal" stream processing by adding the method:{color}
>     /**
>      * *@return* records sent with this producer are automatically streamed 
> to the topology.
>      */
>     *public* *final* Producer<*byte*[], *byte*[]> getProducer() {     
> return producer;    
> }
>  
> {color:#ff}unfortunately this introduces another problem that could be 
> verified by adding the following lines to the previous junit test:{color}
> ...
> **
> //
> ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); 
> // just to serialize keys and values
> testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, 
> cr.timestamp(), cr.key(), cr.value()));
> testDriver.getProducer(

[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to Long.MAX_VALUE

2018-04-03 Thread Khaireddine Rezgui (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424263#comment-16424263
 ] 

Khaireddine Rezgui commented on KAFKA-6535:
---

Thank you [~vvcephei], i will take a look in the links

> Set default retention ms for Streams repartition topics to Long.MAX_VALUE
> -
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Khaireddine Rezgui
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.
> More specifically, in {{RepartitionTopicConfig}} we have a few default 
> overrides for repartition topic configs, we should just add the override for 
> {{TopicConfig.RETENTION_MS_CONFIG}} to set it to Long.MAX_VALUE. This still 
> allows users to override themselves if they want via 
> {{StreamsConfig.TOPIC_PREFIX}}. We need to add unit test to verify this 
> update takes effect.
> In addition to the code change, we also need to have doc changes in 
> streams/upgrade_guide.html specifying this default value change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6739) Broker receives error when handling request with java.lang.IllegalArgumentException: Magic v0 does not support record headers

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424336#comment-16424336
 ] 

ASF GitHub Bot commented on KAFKA-6739:
---

hachikuji closed pull request #4813: KAFKA-6739: Ignore the presence of headers 
when down-converting from V2 to V1/V0
URL: https://github.com/apache/kafka/pull/4813
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 2452798d485..89a5413e00c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -130,8 +130,13 @@ private MemoryRecordsBuilder convertRecordBatch(byte 
magic, ByteBuffer buffer, R
 
 MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, 
batch.compressionType(),
 timestampType, recordBatchAndRecords.baseOffset, 
logAppendTime);
-for (Record record : recordBatchAndRecords.records)
-builder.append(record);
+for (Record record : recordBatchAndRecords.records) {
+// Down-convert this record. Ignore headers when down-converting 
to V0 and V1 since they are not supported
+if (magic > RecordBatch.MAGIC_VALUE_V1)
+builder.append(record);
+else
+builder.appendWithOffset(record.offset(), record.timestamp(), 
record.key(), record.value());
+}
 
 builder.close();
 return builder;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 53ac2003586..fdd3ede16cc 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -40,6 +42,7 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertArrayEquals;
 
 public class FileRecordsTest {
 
@@ -358,6 +361,11 @@ public void testConversion() throws IOException {
 
 private void doTestConversion(CompressionType compressionType, byte 
toMagic) throws IOException {
 List offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 
24L);
+
+Header[] headers = {new RecordHeader("headerKey1", 
"headerValue1".getBytes()),
+new RecordHeader("headerKey2", 
"headerValue2".getBytes()),
+new RecordHeader("headerKey3", 
"headerValue3".getBytes())};
+
 List records = asList(
 new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
 new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
@@ -366,9 +374,10 @@ private void doTestConversion(CompressionType 
compressionType, byte toMagic) thr
 new SimpleRecord(5L, "k5".getBytes(), "hello 
again".getBytes()),
 new SimpleRecord(6L, "k6".getBytes(), "I sense 
indecision".getBytes()),
 new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
-new SimpleRecord(8L, "k8".getBytes(), "running 
out".getBytes()),
+new SimpleRecord(8L, "k8".getBytes(), "running 
out".getBytes(), headers),
 new SimpleRecord(9L, "k9".getBytes(), "ok, almost 
done".getBytes()),
-new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes()));
+new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), 
headers));
+assertEquals("incorrect test setup", offsets.size(), records.size());
 
 ByteBuffer buffer = ByteBuffer.allocate(1024);
 MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.MAGIC_VALUE_V0, compressionType,
@@ -452,6 +461,7 @@ private void verifyConvertedRecords(List 
initialRecords,
 assertEquals("Timestamp should not change", 
initialRecords.get(i).timestamp(), record.timestamp());
 
assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
 
assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAM

[jira] [Resolved] (KAFKA-6739) Broker receives error when handling request with java.lang.IllegalArgumentException: Magic v0 does not support record headers

2018-04-03 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6739.

   Resolution: Fixed
Fix Version/s: 1.0.2

> Broker receives error when handling request with 
> java.lang.IllegalArgumentException: Magic v0 does not support record headers
> -
>
> Key: KAFKA-6739
> URL: https://issues.apache.org/jira/browse/KAFKA-6739
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Koelli Mungee
>Assignee: Dhruvil Shah
>Priority: Critical
> Fix For: 1.0.2, 1.2.0, 1.1.1
>
>
> A broker running at 1.0.0 with the following properties 
>  
> {code:java}
> log.message.format.version=1.0
> inter.broker.protocol.version=1.0
> {code}
> receives this ERROR while handling fetch request for a message with a header
> {code:java}
> [2018-03-23 01:48:03,093] ERROR [KafkaApi-1] Error when handling request 
> {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=test=[{partition=11,fetch_offset=20645,max_bytes=1048576}]}]}
>  (kafka.server.KafkaApis) java.lang.IllegalArgumentException: Magic v0 does 
> not support record headers 
> at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403)
>  
> at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586)
>  
> at 
> org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134)
>  
> at 
> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109)
>  
> at 
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) 
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518)
>  
> at scala.Option.map(Option.scala:146) 
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508)
>  
> at scala.Option.flatMap(Option.scala:171) 
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555)
>  
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
>  
> at 
> kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034)
>  
> at 
> kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52)
>  
> at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033) 
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588)
>  
> at 
> kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175)
>  
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:587)
>  
> at 
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
>  
> at 
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
>  
> at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820) 
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:596) 
> at kafka.server.KafkaApis.handle(KafkaApis.scala:100) 
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65) 
> at java.lang.Thread.run(Thread.java:745)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6739) Down-conversion fails for records with headers

2018-04-03 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-6739:
---
Summary: Down-conversion fails for records with headers  (was: Broker 
receives error when handling request with java.lang.IllegalArgumentException: 
Magic v0 does not support record headers)

> Down-conversion fails for records with headers
> --
>
> Key: KAFKA-6739
> URL: https://issues.apache.org/jira/browse/KAFKA-6739
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Koelli Mungee
>Assignee: Dhruvil Shah
>Priority: Critical
> Fix For: 1.0.2, 1.2.0, 1.1.1
>
>
> A broker running at 1.0.0 with the following properties 
>  
> {code:java}
> log.message.format.version=1.0
> inter.broker.protocol.version=1.0
> {code}
> receives this ERROR while handling fetch request for a message with a header
> {code:java}
> [2018-03-23 01:48:03,093] ERROR [KafkaApi-1] Error when handling request 
> {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=test=[{partition=11,fetch_offset=20645,max_bytes=1048576}]}]}
>  (kafka.server.KafkaApis) java.lang.IllegalArgumentException: Magic v0 does 
> not support record headers 
> at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403)
>  
> at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586)
>  
> at 
> org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134)
>  
> at 
> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109)
>  
> at 
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) 
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518)
>  
> at scala.Option.map(Option.scala:146) 
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508)
>  
> at scala.Option.flatMap(Option.scala:171) 
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555)
>  
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
>  
> at 
> kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034)
>  
> at 
> kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52)
>  
> at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033) 
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569)
>  
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588)
>  
> at 
> kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175)
>  
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:587)
>  
> at 
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
>  
> at 
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
>  
> at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820) 
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:596) 
> at kafka.server.KafkaApis.handle(KafkaApis.scala:100) 
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65) 
> at java.lang.Thread.run(Thread.java:745)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6744) MockProducer with transaction enabled doesn't fail on commit if a record was failed

2018-04-03 Thread JIRA
Pascal Gélinas created KAFKA-6744:
-

 Summary: MockProducer with transaction enabled doesn't fail on 
commit if a record was failed
 Key: KAFKA-6744
 URL: https://issues.apache.org/jira/browse/KAFKA-6744
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 1.0.0
Reporter: Pascal Gélinas


The KafkaProducer#send documentation states the following:

When used as part of a transaction, it is not necessary to define a callback or 
check the result of the future in order to detect errors from send. If any of 
the send calls failed with an irrecoverable error, the final 
commitTransaction() call will fail and throw the exception from the last failed 
send.

So I was expecting the following to throw an exception:

{{*MockProducer* producer = new MockProducer<>(false,}}
{{ new StringSerializer(), new ByteArraySerializer());}}
{{producer.initTransactions();}}
{{producer.beginTransaction();}}
{{producer.send(new ProducerRecord<>("foo", new byte[]{}));}}
{{producer.errorNext(new RuntimeException());}}
{{producer.commitTransaction(); // Expecting this to throw}}

Unfortunately, the commitTransaction() call returns successfully.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3240) Replication issues

2018-04-03 Thread Ari Uka (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424420#comment-16424420
 ] 

Ari Uka commented on KAFKA-3240:


I'm running into the same issue. 
https://issues.apache.org/jira/browse/KAFKA-6679

Restarting all machines seemed to fix the problem for us temporarily. It 
happened a week later and the issue came up again. 

We are running:
[https://github.com/Shopify/sarama] for Producer and Consumer.
FreeBSD 11.0-RELEASE-p8
Kafka 1.0.1 (we are going to attempt to upgrade to Kafka 1.1.0 and restart 
again.)
OpenJDK 1.8.0_121
ZFS 

This is also in Azure, so the instance is a VM. At this point, we're going to 
attempt to move to a Linux cluster instead of running a FreeBSD setup. We're 
going to introduce 3 new machines into the cluster that are running Linux and 
see if the problem appears on those instances.

May I ask what consumer/producer you guys are using, is anyone using the 
standard java consumer/producer?



 

 

> Replication issues
> --
>
> Key: KAFKA-3240
> URL: https://issues.apache.org/jira/browse/KAFKA-3240
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1
> Environment: FreeBSD 10.2-RELEASE-p9
>Reporter: Jan Omar
>Priority: Major
>  Labels: reliability
>
> Hi,
> We are trying to replace our 3-broker cluster running on 0.6 with a new 
> cluster on 0.9.0.1 (but tried 0.8.2.2 and 0.9.0.0 as well).
> - 3 kafka nodes with one zookeeper instance on each machine
> - FreeBSD 10.2 p9
> - Nagle off (sysctl net.inet.tcp.delayed_ack=0)
> - all kafka machines write a ZFS ZIL to a dedicated SSD
> - 3 producers on 3 machines, writing to 1 topics, partitioning 3, replication 
> factor 3
> - acks all
> - 10 Gigabit Ethernet, all machines on one switch, ping 0.05 ms worst case.
> While using the ProducerPerformance or rdkafka_performance we are seeing very 
> strange Replication errors. Any hint on what's going on would be highly 
> appreciated. Any suggestion on how to debug this properly would help as well.
> This is what our broker config looks like:
> {code}
> broker.id=5
> auto.create.topics.enable=false
> delete.topic.enable=true
> listeners=PLAINTEXT://:9092
> port=9092
> host.name=kafka-five.acc
> advertised.host.name=10.5.3.18
> zookeeper.connect=zookeeper-four.acc:2181,zookeeper-five.acc:2181,zookeeper-six.acc:2181
> zookeeper.connection.timeout.ms=6000
> num.replica.fetchers=1
> replica.fetch.max.bytes=1
> replica.fetch.wait.max.ms=500
> replica.high.watermark.checkpoint.interval.ms=5000
> replica.socket.timeout.ms=30
> replica.socket.receive.buffer.bytes=65536
> replica.lag.time.max.ms=1000
> min.insync.replicas=2
> controller.socket.timeout.ms=3
> controller.message.queue.size=100
> log.dirs=/var/db/kafka
> num.partitions=8
> message.max.bytes=1
> auto.create.topics.enable=false
> log.index.interval.bytes=4096
> log.index.size.max.bytes=10485760
> log.retention.hours=168
> log.flush.interval.ms=1
> log.flush.interval.messages=2
> log.flush.scheduler.interval.ms=2000
> log.roll.hours=168
> log.retention.check.interval.ms=30
> log.segment.bytes=536870912
> zookeeper.connection.timeout.ms=100
> zookeeper.sync.time.ms=5000
> num.io.threads=8
> num.network.threads=4
> socket.request.max.bytes=104857600
> socket.receive.buffer.bytes=1048576
> socket.send.buffer.bytes=1048576
> queued.max.requests=10
> fetch.purgatory.purge.interval.requests=100
> producer.purgatory.purge.interval.requests=100
> replica.lag.max.messages=1000
> {code}
> These are the errors we're seeing:
> {code:borderStyle=solid}
> ERROR [Replica Manager on Broker 5]: Error processing fetch operation on 
> partition [test,0] offset 50727 (kafka.server.ReplicaManager)
> java.lang.IllegalStateException: Invalid message size: 0
>   at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:141)
>   at kafka.log.LogSegment.translateOffset(LogSegment.scala:105)
>   at kafka.log.LogSegment.read(LogSegment.scala:126)
>   at kafka.log.Log.read(Log.scala:506)
>   at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:536)
>   at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:507)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
>   at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> ka

[jira] [Created] (KAFKA-6745) kafka consumer rebalancing takes long time (from 3 secs to 5 minutes)

2018-04-03 Thread Ramkumar (JIRA)
Ramkumar created KAFKA-6745:
---

 Summary: kafka consumer rebalancing takes long time (from 3 secs 
to 5 minutes)
 Key: KAFKA-6745
 URL: https://issues.apache.org/jira/browse/KAFKA-6745
 Project: Kafka
  Issue Type: Improvement
  Components: clients, core
Affects Versions: 0.11.0.0
Reporter: Ramkumar


Hi, We had an HTTP service 3 nodes around Kafka 0.8 . This http service acts as 
a REST api for the publishers and consumers to use middleware intead of using 
kafka client api. Here the when the consumers rebalance is not a major issue.

We wanted to upgrade to kafka 0.11 , we have updated our http services (3 node 
cluster) to use new Kafka consumer API , but it takes rebalancing of consumer 
(multiple consumer under same Group) between secs to 5 mins 
(max.poll.interval.ms). Because of this time our http clients are timing out 
and do failover. This rebalancing time is major issue. It is not clear from the 
documentation ,that rebalance activity for the group takes place after 
max.poll.interval.ms  or it starts after 3 secs and complete any time with in 5 
minutes. We tried to reduce max.poll.interval.ms   to 15 seconds. but this also 
triggers rebalance internally.

Below are the other parameters we have set In our service
max.poll.interval.ms = 30 sec
 seconds heartbeat.interval.ms = 1
minute session.timeout.ms = 4
minutes consumer.cache.timeout = 2 min
 
 
below is the log
""2018-03-26 12:53:23,009 [qtp1404928347-11556] INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining 
group firstnetportal_001

""2018-03-26 12:57:52,793 [qtp1404928347-11556] INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Successfully 
joined group firstnetportal_001 with generation 7475

Please let me know if there are any other application/client use http interace 
in 3 nodes with out any having this  issue
 
 
 
 
 
 
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6729) KTable should use user source topics if possible and not create changelog topic

2018-04-03 Thread Bill Bejeck (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-6729:
--

Assignee: Bill Bejeck

> KTable should use user source topics if possible and not create changelog 
> topic
> ---
>
> Key: KAFKA-6729
> URL: https://issues.apache.org/jira/browse/KAFKA-6729
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>Priority: Critical
> Fix For: 1.2.0
>
>
> With KIP-182 we reworked Streams API largely and introduced a regression into 
> 1.0 code base. If a KTable is populated from a source topic, ie, 
> StreamsBuilder.table() -- the KTable does create its own changelog topic. 
> However, in older releases (0.11 or older), we don't create a changelog topic 
> but use the user specified source topic instead.
> We want to reintroduce this optimization to reduce the load (storage and 
> write) on the broker side for this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6678) Upgrade dependencies with later release versions

2018-04-03 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-6678:
--
Component/s: build

> Upgrade dependencies with later release versions
> 
>
> Key: KAFKA-6678
> URL: https://issues.apache.org/jira/browse/KAFKA-6678
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Ted Yu
>Priority: Major
> Attachments: k-update.txt
>
>
> {code}
> The following dependencies have later release versions:
>  - net.sourceforge.argparse4j:argparse4j [0.7.0 -> 0.8.1]
>  - org.bouncycastle:bcpkix-jdk15on [1.58 -> 1.59]
>  - com.puppycrawl.tools:checkstyle [6.19 -> 8.8]
>  - org.owasp:dependency-check-gradle [3.0.2 -> 3.1.1]
>  - org.ajoberstar:grgit [1.9.3 -> 2.1.1]
>  - org.glassfish.jersey.containers:jersey-container-servlet [2.25.1 -> 2.26]
>  - org.eclipse.jetty:jetty-client [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.eclipse.jetty:jetty-server [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.eclipse.jetty:jetty-servlet [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.eclipse.jetty:jetty-servlets [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.openjdk.jmh:jmh-core [1.19 -> 1.20]
>  - org.openjdk.jmh:jmh-core-benchmarks [1.19 -> 1.20]
>  - org.openjdk.jmh:jmh-generator-annprocess [1.19 -> 1.20]
>  - org.lz4:lz4-java [1.4 -> 1.4.1]
>  - org.apache.maven:maven-artifact [3.5.2 -> 3.5.3]
>  - org.jacoco:org.jacoco.agent [0.7.9 -> 0.8.0]
>  - org.jacoco:org.jacoco.ant [0.7.9 -> 0.8.0]
>  - org.rocksdb:rocksdbjni [5.7.3 -> 5.11.3]
>  - org.scala-lang:scala-library [2.11.12 -> 2.12.4]
>  - com.typesafe.scala-logging:scala-logging_2.11 [3.7.2 -> 3.8.0]
>  - org.scala-lang:scala-reflect [2.11.12 -> 2.12.4]
>  - org.scalatest:scalatest_2.11 [3.0.4 -> 3.0.5]
> {code}
> Looks like we can consider upgrading scalatest, jmh-core and checkstyle



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6531) SocketServerTest#closingChannelException fails sometimes

2018-04-03 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-6531:
--
Description: 
>From 
>https://builds.apache.org/job/kafka-trunk-jdk9/361/testReport/junit/kafka.network/SocketServerTest/closingChannelException/
> :

{code}
java.lang.AssertionError: Channels not removed
at kafka.utils.TestUtils$.fail(TestUtils.scala:355)
at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865)
at 
kafka.network.SocketServerTest.assertProcessorHealthy(SocketServerTest.scala:914)
at 
kafka.network.SocketServerTest.$anonfun$closingChannelException$1(SocketServerTest.scala:763)
at 
kafka.network.SocketServerTest.$anonfun$closingChannelException$1$adapted(SocketServerTest.scala:747)
{code}
Among the test output, I saw:
{code}
[2018-02-04 18:51:15,995] ERROR Processor 0 closed connection from 
/127.0.0.1:48261 (kafka.network.SocketServerTest$$anon$5$$anon$1:73)
java.lang.IllegalStateException: There is already a connection for id 
127.0.0.1:1-127.0.0.1:2-0
at 
org.apache.kafka.common.network.Selector.ensureNotRegistered(Selector.java:260)
at org.apache.kafka.common.network.Selector.register(Selector.java:254)
at 
kafka.network.SocketServerTest$TestableSelector.super$register(SocketServerTest.scala:1043)
at 
kafka.network.SocketServerTest$TestableSelector.$anonfun$register$2(SocketServerTest.scala:1043)
at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at 
kafka.network.SocketServerTest$TestableSelector.runOp(SocketServerTest.scala:1037)
at 
kafka.network.SocketServerTest$TestableSelector.register(SocketServerTest.scala:1043)
at 
kafka.network.Processor.configureNewConnections(SocketServer.scala:723)
at kafka.network.Processor.run(SocketServer.scala:532)
{code}

  was:
>From 
>https://builds.apache.org/job/kafka-trunk-jdk9/361/testReport/junit/kafka.network/SocketServerTest/closingChannelException/
> :
{code}
java.lang.AssertionError: Channels not removed
at kafka.utils.TestUtils$.fail(TestUtils.scala:355)
at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865)
at 
kafka.network.SocketServerTest.assertProcessorHealthy(SocketServerTest.scala:914)
at 
kafka.network.SocketServerTest.$anonfun$closingChannelException$1(SocketServerTest.scala:763)
at 
kafka.network.SocketServerTest.$anonfun$closingChannelException$1$adapted(SocketServerTest.scala:747)
{code}
Among the test output, I saw:
{code}
[2018-02-04 18:51:15,995] ERROR Processor 0 closed connection from 
/127.0.0.1:48261 (kafka.network.SocketServerTest$$anon$5$$anon$1:73)
java.lang.IllegalStateException: There is already a connection for id 
127.0.0.1:1-127.0.0.1:2-0
at 
org.apache.kafka.common.network.Selector.ensureNotRegistered(Selector.java:260)
at org.apache.kafka.common.network.Selector.register(Selector.java:254)
at 
kafka.network.SocketServerTest$TestableSelector.super$register(SocketServerTest.scala:1043)
at 
kafka.network.SocketServerTest$TestableSelector.$anonfun$register$2(SocketServerTest.scala:1043)
at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at 
kafka.network.SocketServerTest$TestableSelector.runOp(SocketServerTest.scala:1037)
at 
kafka.network.SocketServerTest$TestableSelector.register(SocketServerTest.scala:1043)
at 
kafka.network.Processor.configureNewConnections(SocketServer.scala:723)
at kafka.network.Processor.run(SocketServer.scala:532)
{code}


> SocketServerTest#closingChannelException fails sometimes
> 
>
> Key: KAFKA-6531
> URL: https://issues.apache.org/jira/browse/KAFKA-6531
> Project: Kafka
>  Issue Type: Test
>  Components: core
>Reporter: Ted Yu
>Priority: Minor
>
> From 
> https://builds.apache.org/job/kafka-trunk-jdk9/361/testReport/junit/kafka.network/SocketServerTest/closingChannelException/
>  :
> {code}
> java.lang.AssertionError: Channels not removed
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:355)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865)
>   at 
> kafka.network.SocketServerTest.assertProcessorHealthy(SocketServerTest.scala:914)
>   at 
> kafka.network.SocketServerTest.$anonfun$closingChannelException$1(SocketServerTest.scala:763)
>   at 
> kafka.network.SocketServerTest.$anonfun$closingChannelException$1$adapted(SocketServerTest.scala:747)
> {code}
> Among the test output, I saw:
> {code}
> [2018-02-04 18:51:15,995] ERROR Processor 0 closed connection from 
> /127.0.0.1:48261 (kafka.network.SocketServerTest$$anon$5$$anon$1:73)
> java.lang.IllegalStateException: There is already a connection for id 
> 127.0.0.1:1-127.0.0.1:2-0
>   at 
> org.apache.k

[jira] [Commented] (KAFKA-6678) Upgrade dependencies with later release versions

2018-04-03 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424881#comment-16424881
 ] 

Ismael Juma commented on KAFKA-6678:


Are you sure you ran the tool with the latest trunk? Some department require 
Java 8, so we can't upgrade.

> Upgrade dependencies with later release versions
> 
>
> Key: KAFKA-6678
> URL: https://issues.apache.org/jira/browse/KAFKA-6678
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Ted Yu
>Priority: Major
> Attachments: k-update.txt
>
>
> {code}
> The following dependencies have later release versions:
>  - net.sourceforge.argparse4j:argparse4j [0.7.0 -> 0.8.1]
>  - org.bouncycastle:bcpkix-jdk15on [1.58 -> 1.59]
>  - com.puppycrawl.tools:checkstyle [6.19 -> 8.8]
>  - org.owasp:dependency-check-gradle [3.0.2 -> 3.1.1]
>  - org.ajoberstar:grgit [1.9.3 -> 2.1.1]
>  - org.glassfish.jersey.containers:jersey-container-servlet [2.25.1 -> 2.26]
>  - org.eclipse.jetty:jetty-client [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.eclipse.jetty:jetty-server [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.eclipse.jetty:jetty-servlet [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.eclipse.jetty:jetty-servlets [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.openjdk.jmh:jmh-core [1.19 -> 1.20]
>  - org.openjdk.jmh:jmh-core-benchmarks [1.19 -> 1.20]
>  - org.openjdk.jmh:jmh-generator-annprocess [1.19 -> 1.20]
>  - org.lz4:lz4-java [1.4 -> 1.4.1]
>  - org.apache.maven:maven-artifact [3.5.2 -> 3.5.3]
>  - org.jacoco:org.jacoco.agent [0.7.9 -> 0.8.0]
>  - org.jacoco:org.jacoco.ant [0.7.9 -> 0.8.0]
>  - org.rocksdb:rocksdbjni [5.7.3 -> 5.11.3]
>  - org.scala-lang:scala-library [2.11.12 -> 2.12.4]
>  - com.typesafe.scala-logging:scala-logging_2.11 [3.7.2 -> 3.8.0]
>  - org.scala-lang:scala-reflect [2.11.12 -> 2.12.4]
>  - org.scalatest:scalatest_2.11 [3.0.4 -> 3.0.5]
> {code}
> Looks like we can consider upgrading scalatest, jmh-core and checkstyle



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6560) Use single-point queries than range queries for windowed aggregation operators

2018-04-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424885#comment-16424885
 ] 

ASF GitHub Bot commented on KAFKA-6560:
---

guozhangwang closed pull request #4814: KAFKA-6560: Use single query for 
getters as well
URL: https://github.com/apache/kafka/pull/4814
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 27f8408320f..6953f7c58ce 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -25,7 +25,6 @@
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
 
 import java.util.Map;
 
@@ -132,10 +131,7 @@ public T get(final Windowed windowedKey) {
 K key = windowedKey.key();
 W window = (W) windowedKey.window();
 
-// this iterator should contain at most one element
-try (WindowStoreIterator iter = windowStore.fetch(key, 
window.start(), window.start())) {
-return iter.hasNext() ? iter.next().value : null;
-}
+return windowStore.fetch(key, window.start());
 }
 }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index c3d95d81b86..1d8b32b5fa5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -24,7 +24,6 @@
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
 
 import java.util.Map;
 
@@ -128,10 +127,7 @@ public V get(final Windowed windowedKey) {
 K key = windowedKey.key();
 W window = (W) windowedKey.window();
 
-// this iterator should only contain one element
-try (WindowStoreIterator iter = windowStore.fetch(key, 
window.start(), window.start())) {
-return iter.hasNext() ? iter.next().value : null;
-}
+return windowStore.fetch(key, window.start());
 }
 }
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use single-point queries than range queries for windowed aggregation operators
> --
>
> Key: KAFKA-6560
> URL: https://issues.apache.org/jira/browse/KAFKA-6560
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> Today for windowed aggregations in Streams DSL, the underlying implementation 
> is leveraging the fetch(key, from, to) API to get all the related windows for 
> a single record to update. However, this is a very inefficient operation with 
> significant amount of CPU time iterating over window stores. On the other 
> hand, since the operator implementation itself have full knowledge of the 
> window specs it can actually translate this operation into multiple 
> single-point queries with the accurate window start timestamp, which would 
> largely reduce the overhead.
> The proposed approach is to add a single fetch API to the WindowedStore and 
> use that in the KStreamWindowedAggregate / KStreamWindowedReduce operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6730) Simplify state store recovery

2018-04-03 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6730:
---
Description: 
In the current code base, we restore state stores in the main thread (in 
contrast to older code that did restore state stored in the rebalance call 
back). This has multiple advantages and allows us the further simplify restore 
code.

In the original code base, during a long restore phase, it was possible that a 
instance misses a rebalance and drops out of the consumer group. To detect this 
case, we apply a check during the restore phase, that the end-offset of the 
changelog topic does not change. A changed offset implies a missed rebalance as 
another thread started to write into the changelog topic (ie, the current 
thread does not own the task/store/changelog-topic anymore).

With the new code, that restores in the main-loop, it's ensured that `poll()` 
is called regularly and thus, a rebalance will be detected automatically. This 
make the check about an changing changelog-end-offset unnecessary.

We can simplify the restore logic, to just consuming until `poll()` does not 
return any data. For this case, we fetch the end-offset to see if we did fully 
restore. If yes, we resume processing, if not, we continue the restore.

  was:
In the current code base, we restore state stores in the main thread (in 
contrast to older code that did restore state stored in the rebalance call 
back). This has multiple advantages and allows us the further simplify restore 
code.

In the original code base, during a long restore phase, it was possible that a 
instance misses a rebalance and drops out of the consumer group. To detect this 
case, we apply a check during the restore phase, that the end-offset of the 
changelog topic does not change. A changed offset implies a missed rebalance as 
another thread started to write into the changelog topic (ie, the current 
thread does not own the task/store/changelog-topic anymore).

With the new code, that restores in the main-loop, it's ensured that poll() is 
called regularly and thus, a rebalance will be detected automatically. This 
make the check about an changing changleog-end-offset unnecessary.

We can simplify the restore logic, to just consuming until `pol()` does not 
return any data. For this case, we fetch the end-offset to see if we did fully 
restore. If yes, we resume processing, if not, we continue the restore.


> Simplify state store recovery
> -
>
> Key: KAFKA-6730
> URL: https://issues.apache.org/jira/browse/KAFKA-6730
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
> Fix For: 1.2.0
>
>
> In the current code base, we restore state stores in the main thread (in 
> contrast to older code that did restore state stored in the rebalance call 
> back). This has multiple advantages and allows us the further simplify 
> restore code.
> In the original code base, during a long restore phase, it was possible that 
> a instance misses a rebalance and drops out of the consumer group. To detect 
> this case, we apply a check during the restore phase, that the end-offset of 
> the changelog topic does not change. A changed offset implies a missed 
> rebalance as another thread started to write into the changelog topic (ie, 
> the current thread does not own the task/store/changelog-topic anymore).
> With the new code, that restores in the main-loop, it's ensured that `poll()` 
> is called regularly and thus, a rebalance will be detected automatically. 
> This make the check about an changing changelog-end-offset unnecessary.
> We can simplify the restore logic, to just consuming until `poll()` does not 
> return any data. For this case, we fetch the end-offset to see if we did 
> fully restore. If yes, we resume processing, if not, we continue the restore.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6742) TopologyTestDriver error when dealing with stores from GlobalKTable

2018-04-03 Thread Valentino Proietti (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16425091#comment-16425091
 ] 

Valentino Proietti commented on KAFKA-6742:
---

Hello [~guozhang], I will create the PR asap. Thank you

> TopologyTestDriver error when dealing with stores from GlobalKTable
> ---
>
> Key: KAFKA-6742
> URL: https://issues.apache.org/jira/browse/KAFKA-6742
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Valentino Proietti
>Priority: Minor
>
> {color:#ff}This junit test simply fails:{color}
> @Test
> *public* *void* globalTable() {
> StreamsBuilder builder = *new* StreamsBuilder();
> @SuppressWarnings("unused")
> *final* KTable localTable = builder
> .table("local", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
> Materialized._as_("localStore"))
> ;
> @SuppressWarnings("unused")
> *final* GlobalKTable globalTable = builder
> .globalTable("global", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
>         Materialized._as_("globalStore"))
> ;
> //
> Properties props = *new* Properties();
> props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test");
> props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost");
> TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), 
> props);
> //
> *final* KeyValueStore localStore = 
> testDriver.getKeyValueStore("localStore");
> Assert._assertNotNull_(localStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore"));
> //
> *final* KeyValueStore globalStore = 
> testDriver.getKeyValueStore("globalStore");
> Assert._assertNotNull_(globalStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore"));
> //
>     *final* ConsumerRecordFactory crf = *new* 
> ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer());
> testDriver.pipeInput(crf.create("local", "one", "TheOne"));
> testDriver.pipeInput(crf.create("global", "one", "TheOne"));
> //
> Assert._assertEquals_("TheOne", localStore.get("one"));
> Assert._assertEquals_("TheOne", globalStore.get("one"));
>  
>  
> {color:#ff}to make it work I had to modify the TopologyTestDriver class 
> as follow:{color}
> ...
>     *public* Map getAllStateStores() {
> //        final Map allStores = new HashMap<>();
> //        for (final String storeName : 
> internalTopologyBuilder.allStateStoreName())
> { //            allStores.put(storeName, ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(storeName)); //        }
> //        return allStores;
>     {color:#ff}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         *final* Map allStores = *new* HashMap<>();
>         *for* (*final* String storeName : 
> internalTopologyBuilder.allStateStoreName()) {            
> StateStore res = psm.getStore(storeName);            
> if (res == null)            
>   res = psm.getGlobalStore(storeName);            
> allStores.put(storeName, res);        
> }
>         *return* allStores;
>     }
> ...
>     *public* StateStore getStateStore(*final* String name) {
> //        return ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(name);
>         {color:#ff}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         StateStore res = psm.getStore(name);
>         *if* (res == *null*)
>         res = psm.getGlobalStore(name);
>         *return* res;
>     }
>  
> {color:#ff}moreover I think it would be very useful to make the internal 
> MockProducer public for testing cases where a producer is used along side 
> with the "normal" stream processing by adding the method:{color}
>     /**
>      * *@return* records sent with this producer are automatically streamed 
> to the topology.
>      */
>     *public* *final* Producer<*byte*[], *byte*[]> getProducer() {     
> return producer;    
> }
>  
> {color:#ff}unfortunately this introduces another problem that could be 
> verified by adding the following lines to the previous junit test:{color}
> ...
> **
> //
> ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); 
> // just to serialize keys and values
> testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, 
> cr.timestamp(), cr.key(), cr.value()));
> testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, 
> cr.timestamp(), cr.key(), cr.value()));
> testDriver.advanceWallClockTime(0);
> Assert._assertEquals_("TheOne", localStore.get("one"));
> Assert._assertEquals_("Second", localStore.get("two"));
> Assert._assertEquals_("TheOne", globalStore.get("one"));
> Assert._assertE