[jira] [Commented] (KAFKA-4972) Kafka 0.10.0 Found a corrupted index file during Kafka broker startup
[ https://issues.apache.org/jira/browse/KAFKA-4972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169681#comment-16169681 ] Julius Žaromskis commented on KAFKA-4972: - There's a bunch of warning msgs in my log file, kafka is slow to restart {{[2017-09-18 06:53:19,349] WARN Found a corrupted index file due to requirement failed: Corrupt index found, index file (/var/kafka/dispatch.task-ack-6/00021796.index) has non-zero size but the last offset is 21796 which is no larger than the base offset 21796.}. deleting /var/kafka/dispatch.task-ack-6/00021796.timeindex, /var/kafka/dispatch.task-ack-6/00021796.index, and /var/kafka/dispatch.task-ack-6/00021796.txnindex and rebuilding index... (kafka.log.Log) [2017-09-18 06:56:10,533] WARN Found a corrupted index file due to requirement failed: Corrupt index found, index file (/var/kafka/dispatch.task-ack-10/00027244.index) has non-zero size but the last offset is 27244 which is no larger than the base offset 27244.}. deleting /var/kafka/dispatch.task-ack-10/00027244.timeindex, /var/kafka/dispatch.task-ack-10/00027244.index, and /var/kafka/dispatch.task-ack-10/00027244.txnindex and rebuilding index... (kafka.log.Log) [2017-09-18 07:09:17,710] WARN Found a corrupted index file due to requirement failed: Corrupt index found, index file (/var/kafka/dispatch.status-3/49362755.index) has non-zero size but the last offset is 49362755 which is no larger than the base offset 49362755.}. deleting /var/kafka/dispatch.status-3/49362755.timeindex, /var/kafka/dispatch.status-3/49362755.index, and /var/kafka/dispatch.status-3/49362755.txnindex and rebuilding index... (kafka.log.Log)}} > Kafka 0.10.0 Found a corrupted index file during Kafka broker startup > -- > > Key: KAFKA-4972 > URL: https://issues.apache.org/jira/browse/KAFKA-4972 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.0.0 > Environment: JDK: HotSpot x64 1.7.0_80 > Tag: 0.10.0 >Reporter: fangjinuo >Priority: Critical > Fix For: 0.11.0.2 > > Attachments: Snap3.png > > > After force shutdown all kafka brokers one by one, restart them one by one, > but a broker startup failure. > The following WARN leval log was found in the log file: > found a corrutped index file, .index , delet it ... > you can view details by following attachment. > I look up some codes in core module, found out : > the nonthreadsafe method LogSegment.append(offset, messages) has tow caller: > 1) Log.append(messages) // here has a synchronized > lock > 2) LogCleaner.cleanInto(topicAndPartition, source, dest, map, retainDeletes, > messageFormatVersion) // here has not > So I guess this may be the reason for the repeated offset in 0xx.log file > (logsegment's .log) > Although this is just my inference, but I hope that this problem can be > quickly repaired -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-4972) Kafka 0.10.0 Found a corrupted index file during Kafka broker startup
[ https://issues.apache.org/jira/browse/KAFKA-4972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169681#comment-16169681 ] Julius Žaromskis edited comment on KAFKA-4972 at 9/18/17 7:22 AM: -- There's a bunch of warning msgs in my log file, kafka is slow to restart. Upgrading 0.10.2 to 0.11.0.1 {{[2017-09-18 06:53:19,349] WARN Found a corrupted index file due to requirement failed: Corrupt index found, index file (/var/kafka/dispatch.task-ack-6/00021796.index) has non-zero size but the last offset is 21796 which is no larger than the base offset 21796.}. deleting /var/kafka/dispatch.task-ack-6/00021796.timeindex, /var/kafka/dispatch.task-ack-6/00021796.index, and /var/kafka/dispatch.task-ack-6/00021796.txnindex and rebuilding index... (kafka.log.Log) [2017-09-18 06:56:10,533] WARN Found a corrupted index file due to requirement failed: Corrupt index found, index file (/var/kafka/dispatch.task-ack-10/00027244.index) has non-zero size but the last offset is 27244 which is no larger than the base offset 27244.}. deleting /var/kafka/dispatch.task-ack-10/00027244.timeindex, /var/kafka/dispatch.task-ack-10/00027244.index, and /var/kafka/dispatch.task-ack-10/00027244.txnindex and rebuilding index... (kafka.log.Log) [2017-09-18 07:09:17,710] WARN Found a corrupted index file due to requirement failed: Corrupt index found, index file (/var/kafka/dispatch.status-3/49362755.index) has non-zero size but the last offset is 49362755 which is no larger than the base offset 49362755.}. deleting /var/kafka/dispatch.status-3/49362755.timeindex, /var/kafka/dispatch.status-3/49362755.index, and /var/kafka/dispatch.status-3/49362755.txnindex and rebuilding index... (kafka.log.Log)}} was (Author: juliuszaromskis): There's a bunch of warning msgs in my log file, kafka is slow to restart {{[2017-09-18 06:53:19,349] WARN Found a corrupted index file due to requirement failed: Corrupt index found, index file (/var/kafka/dispatch.task-ack-6/00021796.index) has non-zero size but the last offset is 21796 which is no larger than the base offset 21796.}. deleting /var/kafka/dispatch.task-ack-6/00021796.timeindex, /var/kafka/dispatch.task-ack-6/00021796.index, and /var/kafka/dispatch.task-ack-6/00021796.txnindex and rebuilding index... (kafka.log.Log) [2017-09-18 06:56:10,533] WARN Found a corrupted index file due to requirement failed: Corrupt index found, index file (/var/kafka/dispatch.task-ack-10/00027244.index) has non-zero size but the last offset is 27244 which is no larger than the base offset 27244.}. deleting /var/kafka/dispatch.task-ack-10/00027244.timeindex, /var/kafka/dispatch.task-ack-10/00027244.index, and /var/kafka/dispatch.task-ack-10/00027244.txnindex and rebuilding index... (kafka.log.Log) [2017-09-18 07:09:17,710] WARN Found a corrupted index file due to requirement failed: Corrupt index found, index file (/var/kafka/dispatch.status-3/49362755.index) has non-zero size but the last offset is 49362755 which is no larger than the base offset 49362755.}. deleting /var/kafka/dispatch.status-3/49362755.timeindex, /var/kafka/dispatch.status-3/49362755.index, and /var/kafka/dispatch.status-3/49362755.txnindex and rebuilding index... (kafka.log.Log)}} > Kafka 0.10.0 Found a corrupted index file during Kafka broker startup > -- > > Key: KAFKA-4972 > URL: https://issues.apache.org/jira/browse/KAFKA-4972 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.0.0 > Environment: JDK: HotSpot x64 1.7.0_80 > Tag: 0.10.0 >Reporter: fangjinuo >Priority: Critical > Fix For: 0.11.0.2 > > Attachments: Snap3.png > > > After force shutdown all kafka brokers one by one, restart them one by one, > but a broker startup failure. > The following WARN leval log was found in the log file: > found a corrutped index file, .index , delet it ... > you can view details by following attachment. > I look up some codes in core module, found out : > the nonthreadsafe method LogSegment.append(offset, messages) has tow caller: > 1) Log.append(messages) // here has a synchronized > lock > 2) LogCleaner.cleanInto(topicAndPartition, source, dest, map, retainDeletes, > messageFormatVersion) // here has not > So I guess this may be the reason for the repeated offset in 0xx.log file > (logsegment's .log) > Although this is just my inference, but I hope that this proble
[jira] [Commented] (KAFKA-5917) Kafka not starting
[ https://issues.apache.org/jira/browse/KAFKA-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169693#comment-16169693 ] huxihx commented on KAFKA-5917: --- Could check the version of znode `/config/changes/config_change_*` by issuing `get /config/changes/config_change_* in Zookeeper to see the version is 1. > Kafka not starting > -- > > Key: KAFKA-5917 > URL: https://issues.apache.org/jira/browse/KAFKA-5917 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.10.0.0 >Reporter: Balu > > Getting this error in kafka,zookeeper,schema repository cluster. > FATAL [Kafka Server 3], Fatal error during KafkaServer startup. Prepare to > shutdown (kafka.server.KafkaServer) > java.lang.IllegalArgumentException: requirement failed > at scala.Predef$.require(Predef.scala:212) > at > kafka.server.DynamicConfigManager$ConfigChangedNotificationHandler$.processNotification(DynamicConfigManager.scala:90) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2$$anonfun$apply$2.apply(ZkNodeChangeNotificationListener.scala:95) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2$$anonfun$apply$2.apply(ZkNodeChangeNotificationListener.scala:95) > at scala.Option.map(Option.scala:146) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2.apply(ZkNodeChangeNotificationListener.scala:95) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2.apply(ZkNodeChangeNotificationListener.scala:90) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.common.ZkNodeChangeNotificationListener.kafka$common$ZkNodeChangeNotificationListener$$processNotifications(ZkNodeChangeNotificationListener.scala:90) > at > kafka.common.ZkNodeChangeNotificationListener.processAllNotifications(ZkNodeChangeNotificationListener.scala:79) > at > kafka.common.ZkNodeChangeNotificationListener.init(ZkNodeChangeNotificationListener.scala:67) > at > kafka.server.DynamicConfigManager.startup(DynamicConfigManager.scala:122) > at kafka.server.KafkaServer.startup(KafkaServer.scala:233) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37) > at kafka.Kafka$.main(Kafka.scala:67) > at kafka.Kafka.main(Kafka.scala) > Please help -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5917) Kafka not starting
[ https://issues.apache.org/jira/browse/KAFKA-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169714#comment-16169714 ] Manikumar commented on KAFKA-5917: -- This happens when you run Kafka-configs.sh script with new versions Kafka libs on an older version of Kafka server. In my guess, you are mostly running newer versions of KafkaManager against older versions of Kafka Cluster. you need to manually remove /config/changes/config_change_ zk path to restore kafka. > Kafka not starting > -- > > Key: KAFKA-5917 > URL: https://issues.apache.org/jira/browse/KAFKA-5917 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.10.0.0 >Reporter: Balu > > Getting this error in kafka,zookeeper,schema repository cluster. > FATAL [Kafka Server 3], Fatal error during KafkaServer startup. Prepare to > shutdown (kafka.server.KafkaServer) > java.lang.IllegalArgumentException: requirement failed > at scala.Predef$.require(Predef.scala:212) > at > kafka.server.DynamicConfigManager$ConfigChangedNotificationHandler$.processNotification(DynamicConfigManager.scala:90) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2$$anonfun$apply$2.apply(ZkNodeChangeNotificationListener.scala:95) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2$$anonfun$apply$2.apply(ZkNodeChangeNotificationListener.scala:95) > at scala.Option.map(Option.scala:146) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2.apply(ZkNodeChangeNotificationListener.scala:95) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2.apply(ZkNodeChangeNotificationListener.scala:90) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.common.ZkNodeChangeNotificationListener.kafka$common$ZkNodeChangeNotificationListener$$processNotifications(ZkNodeChangeNotificationListener.scala:90) > at > kafka.common.ZkNodeChangeNotificationListener.processAllNotifications(ZkNodeChangeNotificationListener.scala:79) > at > kafka.common.ZkNodeChangeNotificationListener.init(ZkNodeChangeNotificationListener.scala:67) > at > kafka.server.DynamicConfigManager.startup(DynamicConfigManager.scala:122) > at kafka.server.KafkaServer.startup(KafkaServer.scala:233) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37) > at kafka.Kafka$.main(Kafka.scala:67) > at kafka.Kafka.main(Kafka.scala) > Please help -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5917) Kafka not starting
[ https://issues.apache.org/jira/browse/KAFKA-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169714#comment-16169714 ] Manikumar edited comment on KAFKA-5917 at 9/18/17 7:56 AM: --- This happens when you run Kafka-configs.sh script with new versions Kafka libs against an older version of Kafka server. In my guess, you are mostly running newer versions of KafkaManager against older versions of Kafka Cluster. you need to manually remove /config/changes/config_change_ zk path to restore kafka. was (Author: omkreddy): This happens when you run Kafka-configs.sh script with new versions Kafka libs on an older version of Kafka server. In my guess, you are mostly running newer versions of KafkaManager against older versions of Kafka Cluster. you need to manually remove /config/changes/config_change_ zk path to restore kafka. > Kafka not starting > -- > > Key: KAFKA-5917 > URL: https://issues.apache.org/jira/browse/KAFKA-5917 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.10.0.0 >Reporter: Balu > > Getting this error in kafka,zookeeper,schema repository cluster. > FATAL [Kafka Server 3], Fatal error during KafkaServer startup. Prepare to > shutdown (kafka.server.KafkaServer) > java.lang.IllegalArgumentException: requirement failed > at scala.Predef$.require(Predef.scala:212) > at > kafka.server.DynamicConfigManager$ConfigChangedNotificationHandler$.processNotification(DynamicConfigManager.scala:90) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2$$anonfun$apply$2.apply(ZkNodeChangeNotificationListener.scala:95) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2$$anonfun$apply$2.apply(ZkNodeChangeNotificationListener.scala:95) > at scala.Option.map(Option.scala:146) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2.apply(ZkNodeChangeNotificationListener.scala:95) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2.apply(ZkNodeChangeNotificationListener.scala:90) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.common.ZkNodeChangeNotificationListener.kafka$common$ZkNodeChangeNotificationListener$$processNotifications(ZkNodeChangeNotificationListener.scala:90) > at > kafka.common.ZkNodeChangeNotificationListener.processAllNotifications(ZkNodeChangeNotificationListener.scala:79) > at > kafka.common.ZkNodeChangeNotificationListener.init(ZkNodeChangeNotificationListener.scala:67) > at > kafka.server.DynamicConfigManager.startup(DynamicConfigManager.scala:122) > at kafka.server.KafkaServer.startup(KafkaServer.scala:233) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37) > at kafka.Kafka$.main(Kafka.scala:67) > at kafka.Kafka.main(Kafka.scala) > Please help -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4107) Support offset reset capability in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169748#comment-16169748 ] Stelios BOURAZANIS commented on KAFKA-4107: --- This is an extremely usefull tool, as Kafka connect implementations usually store a unique ID on a specif topic, and selected partition (based on a calculated data key) , to check for new records. If this ID is reset from the source, then no way to reset the offset of the corresponding partition of that offset topic for this connector. One could delete the offset partition and restart the workers, but that has implications on certain type of source connectors. The tool should have the ability to read a certain offset per topic partition and print the value and the key of the data in that offset. The tool should have the ability to write a new message in a specific topic/partition and use a user specific value ( 0 to reset) as a value. > Support offset reset capability in Kafka Connect > > > Key: KAFKA-4107 > URL: https://issues.apache.org/jira/browse/KAFKA-4107 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Jason Gustafson > > It would be useful in some cases to be able to reset connector offsets. For > example, if a topic in Kafka corresponding to a source database is > accidentally deleted (or deleted because of corrupt data), an administrator > may want to reset offsets and reproduce the log from the beginning. It may > also be useful to have support for overriding offsets, but that seems like a > less likely use case. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-4107) Support offset reset capability in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169748#comment-16169748 ] Stelios BOURAZANIS edited comment on KAFKA-4107 at 9/18/17 8:38 AM: This is an extremely usefull tool, as Kafka connect implementations usually store a unique ID( based on source data) on a specif topic, and selected partition (based on a calculated data key) , to check for new records. If this ID is reset from the source, then no way to reset the offset of the corresponding partition of that offset topic for this connector. One could delete the offset partition and restart the workers, but that has implications on certain type of source connectors. The tool should have the ability to read a certain offset per topic partition and print the value and the key of the data in that offset. The tool should have the ability to write a new message in a specific topic/partition and use a user specific value ( 0 to reset) as a value. was (Author: slionb): This is an extremely usefull tool, as Kafka connect implementations usually store a unique ID on a specif topic, and selected partition (based on a calculated data key) , to check for new records. If this ID is reset from the source, then no way to reset the offset of the corresponding partition of that offset topic for this connector. One could delete the offset partition and restart the workers, but that has implications on certain type of source connectors. The tool should have the ability to read a certain offset per topic partition and print the value and the key of the data in that offset. The tool should have the ability to write a new message in a specific topic/partition and use a user specific value ( 0 to reset) as a value. > Support offset reset capability in Kafka Connect > > > Key: KAFKA-4107 > URL: https://issues.apache.org/jira/browse/KAFKA-4107 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Jason Gustafson > > It would be useful in some cases to be able to reset connector offsets. For > example, if a topic in Kafka corresponding to a source database is > accidentally deleted (or deleted because of corrupt data), an administrator > may want to reset offsets and reproduce the log from the beginning. It may > also be useful to have support for overriding offsets, but that seems like a > less likely use case. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5754) Refactor Streams to use LogContext
[ https://issues.apache.org/jira/browse/KAFKA-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5754. --- Resolution: Fixed Fix Version/s: 1.0.0 Issue resolved by pull request 3727 [https://github.com/apache/kafka/pull/3727] > Refactor Streams to use LogContext > -- > > Key: KAFKA-5754 > URL: https://issues.apache.org/jira/browse/KAFKA-5754 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jason Gustafson >Assignee: Umesh Chaudhary > Labels: newbie > Fix For: 1.0.0 > > > We added a {{LogContext}} object which automatically adds a log prefix to > every message written by loggers constructed from it (much like the Logging > mixin available in the server code). We use this in the consumer to ensure > that messages always contain the consumer group and client ids, which is very > helpful when multiple consumers are run on the same instance. Kafka Streams > requires similar contextual logging by including the prefix manually in each > log message. It would be better to take advantage of the new {{LogContext}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5754) Refactor Streams to use LogContext
[ https://issues.apache.org/jira/browse/KAFKA-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169769#comment-16169769 ] ASF GitHub Bot commented on KAFKA-5754: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3727 > Refactor Streams to use LogContext > -- > > Key: KAFKA-5754 > URL: https://issues.apache.org/jira/browse/KAFKA-5754 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jason Gustafson >Assignee: Umesh Chaudhary > Labels: newbie > Fix For: 1.0.0 > > > We added a {{LogContext}} object which automatically adds a log prefix to > every message written by loggers constructed from it (much like the Logging > mixin available in the server code). We use this in the consumer to ensure > that messages always contain the consumer group and client ids, which is very > helpful when multiple consumers are run on the same instance. Kafka Streams > requires similar contextual logging by including the prefix manually in each > log message. It would be better to take advantage of the new {{LogContext}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Issue Comment Deleted] (KAFKA-5881) Consuming from added partitions without restarting the consumer
[ https://issues.apache.org/jira/browse/KAFKA-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viliam Durina updated KAFKA-5881: - Comment: was deleted (was: Thanks for the reply, i'll give it a test on Monday. If it works then I'll close the bug.) > Consuming from added partitions without restarting the consumer > --- > > Key: KAFKA-5881 > URL: https://issues.apache.org/jira/browse/KAFKA-5881 > Project: Kafka > Issue Type: Improvement > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Viliam Durina > > Currently the {{KafkaConsumer}} is not able to return events from newly added > partitions, neither in automatic nor in manual assignment. I have to create a > new consumer. This was a surprise to me and [other > users|https://stackoverflow.com/q/46175275/952135]. > With manual assignment, the {{consumer.partitionsFor("topic")}} should > eventually return new partitions. > With automatic assignment, one of the consumers should start consuming from > new partitions. > If this is technically not possible, it should at least be documented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5881) Consuming from added partitions without restarting the consumer
[ https://issues.apache.org/jira/browse/KAFKA-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viliam Durina resolved KAFKA-5881. -- Resolution: Not A Problem I can confirm the behaviour, after setting "metadata.max.age.ms" to zero I get fresh copy of metadata each time, including new partitions. > Consuming from added partitions without restarting the consumer > --- > > Key: KAFKA-5881 > URL: https://issues.apache.org/jira/browse/KAFKA-5881 > Project: Kafka > Issue Type: Improvement > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Viliam Durina > > Currently the {{KafkaConsumer}} is not able to return events from newly added > partitions, neither in automatic nor in manual assignment. I have to create a > new consumer. This was a surprise to me and [other > users|https://stackoverflow.com/q/46175275/952135]. > With manual assignment, the {{consumer.partitionsFor("topic")}} should > eventually return new partitions. > With automatic assignment, one of the consumers should start consuming from > new partitions. > If this is technically not possible, it should at least be documented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5881) Consuming from added partitions without restarting the consumer
[ https://issues.apache.org/jira/browse/KAFKA-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169779#comment-16169779 ] Viliam Durina commented on KAFKA-5881: -- Maybe it could be added to javadoc of {{partitionsFor}} (and possibly others), which currently says just this: bq. This method will issue a remote call to the server if it does not already have any metadata about the given topic. > Consuming from added partitions without restarting the consumer > --- > > Key: KAFKA-5881 > URL: https://issues.apache.org/jira/browse/KAFKA-5881 > Project: Kafka > Issue Type: Improvement > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: Viliam Durina > > Currently the {{KafkaConsumer}} is not able to return events from newly added > partitions, neither in automatic nor in manual assignment. I have to create a > new consumer. This was a surprise to me and [other > users|https://stackoverflow.com/q/46175275/952135]. > With manual assignment, the {{consumer.partitionsFor("topic")}} should > eventually return new partitions. > With automatic assignment, one of the consumers should start consuming from > new partitions. > If this is technically not possible, it should at least be documented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5919) Delete records command "version" parameter ignored
Paolo Patierno created KAFKA-5919: - Summary: Delete records command "version" parameter ignored Key: KAFKA-5919 URL: https://issues.apache.org/jira/browse/KAFKA-5919 Project: Kafka Issue Type: Bug Components: tools Reporter: Paolo Patierno Assignee: Paolo Patierno Priority: Minor Hi, the kafka-delete-records script allows user to pass information about records to delete through a JSON file. Such file, as described in the command help, is made by a "partitions" array and a "version" field. Reading [KIP-107|https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient] and the DeleteRecords API (Key: 21) description it's not clear what such field is and even it's not used at all (in the current implementation). I'm going to remove it from tool help description and it should not need a KIP because today it's just ignored and even using a JSON file without "version" the tool just works. [~lindong] you implemented such delete command, are my considerations right ? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5920) Handle SSL authentication failures as non-retriable exceptions in clients
Rajini Sivaram created KAFKA-5920: - Summary: Handle SSL authentication failures as non-retriable exceptions in clients Key: KAFKA-5920 URL: https://issues.apache.org/jira/browse/KAFKA-5920 Project: Kafka Issue Type: Improvement Components: security Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 1.0.0 KIP-152 improves diagnostics for SASL authentication failures and propagates SASL authentication failures to producers and consumers. For SSL authentication, we can't have protocol changes, but we should try and adopt the same behaviour if possible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5919) Delete records command "version" parameter ignored
[ https://issues.apache.org/jira/browse/KAFKA-5919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169820#comment-16169820 ] ASF GitHub Bot commented on KAFKA-5919: --- GitHub user ppatierno opened a pull request: https://github.com/apache/kafka/pull/3887 KAFKA-5919: Delete records command "version" parameter ignored Removed ignored "version" field in JSON file for deleting records You can merge this pull request into a Git repository by running: $ git pull https://github.com/ppatierno/kafka kafka-5919 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3887.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3887 commit ea9bd5e66ae746f1708ea961a049e6656d7eb72c Author: Paolo Patierno Date: 2017-09-18T09:49:08Z Removed ignored "version" field in JSON file for deleting records > Delete records command "version" parameter ignored > -- > > Key: KAFKA-5919 > URL: https://issues.apache.org/jira/browse/KAFKA-5919 > Project: Kafka > Issue Type: Bug > Components: tools >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > > Hi, > the kafka-delete-records script allows user to pass information about records > to delete through a JSON file. Such file, as described in the command help, > is made by a "partitions" array and a "version" field. Reading > [KIP-107|https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient] > and the DeleteRecords API (Key: 21) description it's not clear what such > field is and even it's not used at all (in the current implementation). > I'm going to remove it from tool help description and it should not need a > KIP because today it's just ignored and even using a JSON file without > "version" the tool just works. > [~lindong] you implemented such delete command, are my considerations right ? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5757) Scheduled or Delayed connector start
[ https://issues.apache.org/jira/browse/KAFKA-5757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169868#comment-16169868 ] Dhananjay Patkar commented on KAFKA-5757: - Thanks [~rhauch] , for the suggestion. I have taken this approach and overriding connector. I am closing this JIRA. > Scheduled or Delayed connector start > > > Key: KAFKA-5757 > URL: https://issues.apache.org/jira/browse/KAFKA-5757 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Dhananjay Patkar > Labels: feature > > Currently connectors / workers start immediately on creation / updating. > We plan to use kafka connect in distributed mode as a batch data sync and > need connectors to run at scheduled time. > Ex: Someone submits connector configuration at 14:00 , but we don not want > connector to push data until 23:00. > Can we extend connector configuration to support "sync.start.time" property, > which takes "hh:mm" ? > Let me know, if there are alternate ways to achieve this functionality in > existing framework. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5654) Add new API methods to KGroupedStream
[ https://issues.apache.org/jira/browse/KAFKA-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169870#comment-16169870 ] ASF GitHub Bot commented on KAFKA-5654: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3827 > Add new API methods to KGroupedStream > - > > Key: KAFKA-5654 > URL: https://issues.apache.org/jira/browse/KAFKA-5654 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Assignee: Damian Guy > Fix For: 1.0.0 > > > Placeholder until API finalized -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5757) Scheduled or Delayed connector start
[ https://issues.apache.org/jira/browse/KAFKA-5757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhananjay Patkar resolved KAFKA-5757. - Resolution: Not A Problem > Scheduled or Delayed connector start > > > Key: KAFKA-5757 > URL: https://issues.apache.org/jira/browse/KAFKA-5757 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Dhananjay Patkar > Labels: feature > > Currently connectors / workers start immediately on creation / updating. > We plan to use kafka connect in distributed mode as a batch data sync and > need connectors to run at scheduled time. > Ex: Someone submits connector configuration at 14:00 , but we don not want > connector to push data until 23:00. > Can we extend connector configuration to support "sync.start.time" property, > which takes "hh:mm" ? > Let me know, if there are alternate ways to achieve this functionality in > existing framework. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5515) Consider removing date formatting from Segments class
[ https://issues.apache.org/jira/browse/KAFKA-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169888#comment-16169888 ] ASF GitHub Bot commented on KAFKA-5515: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3783 > Consider removing date formatting from Segments class > - > > Key: KAFKA-5515 > URL: https://issues.apache.org/jira/browse/KAFKA-5515 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Damian Guy > Labels: performance > Fix For: 1.0.0 > > > Currently the {{Segments}} class uses a date when calculating the segment id > and uses {{SimpleDateFormat}} for formatting the segment id. However this is > a high volume code path and creating a new {{SimpleDateFormat}} and > formatting each segment id is expensive. We should look into removing the > date from the segment id or at a minimum use a faster alternative to > {{SimpleDateFormat}}. We should also consider keeping a lookup of existing > segments to avoid as many string operations as possible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5921) Add Materialized overloads to WindowedKStream
Damian Guy created KAFKA-5921: - Summary: Add Materialized overloads to WindowedKStream Key: KAFKA-5921 URL: https://issues.apache.org/jira/browse/KAFKA-5921 Project: Kafka Issue Type: Sub-task Components: streams Reporter: Damian Guy Assignee: Damian Guy Fix For: 1.0.0 Add the {{Materialized}} overloads to {{WindowedKStream} - KIP-182 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5921) Add Materialized overloads to WindowedKStream
[ https://issues.apache.org/jira/browse/KAFKA-5921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5921: -- Description: Add the {{Materialized}} overloads to {{WindowedKStream}} - KIP-182 (was: Add the {{Materialized}} overloads to {{WindowedKStream} - KIP-182) > Add Materialized overloads to WindowedKStream > - > > Key: KAFKA-5921 > URL: https://issues.apache.org/jira/browse/KAFKA-5921 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Assignee: Damian Guy > Fix For: 1.0.0 > > > Add the {{Materialized}} overloads to {{WindowedKStream}} - KIP-182 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5921) Add Materialized overloads to WindowedKStream
[ https://issues.apache.org/jira/browse/KAFKA-5921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169902#comment-16169902 ] ASF GitHub Bot commented on KAFKA-5921: --- GitHub user dguy opened a pull request: https://github.com/apache/kafka/pull/3889 KAFKA-5921: add Materialized overloads to windowed kstream Add `Materialized` overloads to `WindowedKStream`. Deprecate existing methods on `KGroupedStream` You can merge this pull request into a Git repository by running: $ git pull https://github.com/dguy/kafka kafka-5921 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3889.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3889 commit 0c4d78f696c36ba632895b16cc83e3517b3637be Author: Damian Guy Date: 2017-09-18T11:25:35Z add materialized to windowed kstream > Add Materialized overloads to WindowedKStream > - > > Key: KAFKA-5921 > URL: https://issues.apache.org/jira/browse/KAFKA-5921 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Assignee: Damian Guy > Fix For: 1.0.0 > > > Add the {{Materialized}} overloads to {{WindowedKStream}} - KIP-182 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4714) Implement remaining KIP-66 SMTs
[ https://issues.apache.org/jira/browse/KAFKA-4714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169907#comment-16169907 ] Dhananjay Patkar commented on KAFKA-4714: - Currently on the sink side, transformations are applied on the consumed record. ??For sink connectors, transformations are applied on the collection of SinkRecord before being provided to SinkTask.put().?? Is it possible to get original ConnectRecord object in consumer task? If this is not available then ,can we preserve original message or is it possible to add support for post "SinkTask.put()" transformation? > Implement remaining KIP-66 SMTs > --- > > Key: KAFKA-4714 > URL: https://issues.apache.org/jira/browse/KAFKA-4714 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava > Fix For: 0.11.0.0 > > > Three didn't make it for the 0.10.2.0 release: Flatten, Cast, and > TimestampConverter. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-4714) Implement remaining KIP-66 SMTs
[ https://issues.apache.org/jira/browse/KAFKA-4714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169907#comment-16169907 ] Dhananjay Patkar edited comment on KAFKA-4714 at 9/18/17 11:53 AM: --- Currently on the sink side, transformations are applied before consuming record. ??For sink connectors, transformations are applied on the collection of SinkRecord before being provided to SinkTask.put().?? Is it possible to get original ConnectRecord object in consumer task? If this is not available then ,can we preserve original message or is it possible to add support for post "SinkTask.put()" transformation? was (Author: dhananjaydp): Currently on the sink side, transformations are applied on the consumed record. ??For sink connectors, transformations are applied on the collection of SinkRecord before being provided to SinkTask.put().?? Is it possible to get original ConnectRecord object in consumer task? If this is not available then ,can we preserve original message or is it possible to add support for post "SinkTask.put()" transformation? > Implement remaining KIP-66 SMTs > --- > > Key: KAFKA-4714 > URL: https://issues.apache.org/jira/browse/KAFKA-4714 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava > Fix For: 0.11.0.0 > > > Three didn't make it for the 0.10.2.0 release: Flatten, Cast, and > TimestampConverter. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5922) Add SessionWindowedKStream
Damian Guy created KAFKA-5922: - Summary: Add SessionWindowedKStream Key: KAFKA-5922 URL: https://issues.apache.org/jira/browse/KAFKA-5922 Project: Kafka Issue Type: Sub-task Components: streams Reporter: Damian Guy Assignee: Damian Guy Fix For: 1.0.0 Add SessionWindowedKStream interface and implementation -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5893) ResetIntegrationTest fails
[ https://issues.apache.org/jira/browse/KAFKA-5893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169940#comment-16169940 ] ASF GitHub Bot commented on KAFKA-5893: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3859 > ResetIntegrationTest fails > -- > > Key: KAFKA-5893 > URL: https://issues.apache.org/jira/browse/KAFKA-5893 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 0.11.0.0, 0.11.0.1 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > > {noformat} > java.lang.AssertionError: expected:<0> but was:<1> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at org.junit.Assert.assertEquals(Assert.java:631) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) > {noformat} > One issue with debugging is, that we catch exceptions and print the exception > message that is {{null}}: > {noformat} > Standard Error > ERROR: null > ERROR: null > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5884) Enable PowerMock tests when running on Java 9
[ https://issues.apache.org/jira/browse/KAFKA-5884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169961#comment-16169961 ] Ismael Juma commented on KAFKA-5884: Resolved via https://github.com/apache/kafka/commit/a3f068e22da6915cbc473be8da0cfbb229817436. > Enable PowerMock tests when running on Java 9 > - > > Key: KAFKA-5884 > URL: https://issues.apache.org/jira/browse/KAFKA-5884 > Project: Kafka > Issue Type: Task >Reporter: Ismael Juma >Assignee: Ismael Juma > Fix For: 1.0.0 > > > PowerMock 2.0.0 will support Java 9. Once that is released, we should upgrade > to it and remove the following code from build.gradle: > {code} > String[] testsToExclude = [] > if (JavaVersion.current().isJava9Compatible()) { > testsToExclude = [ > "**/KafkaProducerTest.*", "**/BufferPoolTest.*", > "**/SourceTaskOffsetCommitterTest.*", "**/WorkerSinkTaskTest.*", > "**/WorkerSinkTaskThreadedTest.*", > "**/WorkerSourceTaskTest.*", "**/WorkerTest.*", > "**/DistributedHerderTest.*", "**/WorkerCoordinatorTest.*", > "**/RestServerTest.*", "**/ConnectorPluginsResourceTest.*", > "**/ConnectorsResourceTest.*", > "**/StandaloneHerderTest.*", "**/FileOffsetBakingStoreTest.*", > "**/KafkaConfigBackingStoreTest.*", > "**/KafkaOffsetBackingStoreTest.*", "**/OffsetStorageWriterTest.*", > "**/KafkaBasedLogTest.*" > ] > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169964#comment-16169964 ] Seweryn Habdank-Wojewodzki commented on KAFKA-5882: --- Scenario for Kafka (0.11.0.0): 1. Kafka broker with 2 nodes (active) + 3 Zookeeper nodes are working. 2. One instance of the streaming app works. 3. Start second instance of the streaming app on another mashine. 4. Settings: 1 stream thread. 5. Kafka start up: {code} 2017-09-18 14:13:16 INFO AppInfoParser:83 - Kafka version : 0.11.0.0 2017-09-18 14:13:16 INFO AppInfoParser:84 - Kafka commitId : cb8625948210849f 2017-09-18 14:13:16 DEBUG KafkaProducer:410 - Kafka producer started 2017-09-18 14:13:16 DEBUG StateDirectory:135 - stream-thread [streamer-923cc4d5-22aa-40f2-95ff-0fbda292b346-StreamThread-1] Acquired state dir lock for task 13_5 2017-09-18 14:13:16 DEBUG Sender:157 - Starting Kafka producer I/O thread. 2017-09-18 14:13:16 INFO ProcessorStateManager:122 - task [13_5] Created state store manager for task 13_5 with the acquired state dir lock 2017-09-18 14:13:17 DEBUG Metrics:403 - Added sensor with name commit 2017-09-18 14:13:17 DEBUG Metrics:403 - Added sensor with name 13_5-commit 2017-09-18 14:13:17 INFO StreamThread:1248 - stream-thread [streamer-923cc4d5-22aa-40f2-95ff-0fbda292b346-StreamThread-1] Created active task 13_5 with assigned partitions [a0291_topic-5] 2017-09-18 14:13:17 INFO StreamThread:193 - stream-thread [streamer-923cc4d5-22aa-40f2-95ff-0fbda292b346-StreamThread-1] partition assignment took 68 ms. current active tasks: [] current standby tasks: [] 2017-09-18 14:13:17 ERROR ConsumerCoordinator:269 - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener for group streamer failed on partition assignment java.lang.NullPointerException: null at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123) ~[myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) ~[myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) ~[myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) ~[myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) ~[myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) ~[myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) ~[myapp-streamer.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) [myapp-streamer.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) [myapp-streamer.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) [myapp-streamer.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) [myapp-streamer.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) [myapp-streamer.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) [myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) [myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) [myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) [myapp-streamer.jar:?] 2017-09-18 14:13:17 DEBUG ConsumerCoordinator:826 - Group streamer fetching committed offsets for partitions: [gai34_topic-5, gai34_topic-9, int06_topic-8, gai34_topic-7, int77_topic-8, a0291_topic-6, a0291_topic-8, gai10_topic-9, int06_topic-3, gai10_topic-7, int06_topic-5, gai10_topic- 5, int62_topic-8, gai10_topic-3, gai34_topic-4, gai34_topic-8, int06_topic-7, c0737_topic-9, gai34_topic-6, int06_topic-9, int77_topic-7, a0291_topic-9, a0291_topic-5, gai10_topic-8, a0291_topic-7, int06_topic-4, gai10_topic-6, int06_topic-6, gai10_topic-4, int62_topic-9, int62_topic-7, in t77_topic-9] {code} This week I will intensively test Kafka 0.11.0.1, perhaps it will be better. > NullPointerException in StreamTask > -- > > Key: KAFKA-5882 > URL: https://issues.apache.org/jira/browse/KAFKA-5882 > Project: Kafka >
[jira] [Comment Edited] (KAFKA-5882) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169964#comment-16169964 ] Seweryn Habdank-Wojewodzki edited comment on KAFKA-5882 at 9/18/17 12:51 PM: - Scenario for Kafka (0.11.0.0): 1. Kafka broker with 2 nodes (active) + 3 Zookeeper nodes are working. 2. One instance of the streaming app works. 3. Start the second instance of the streaming app on another mashine. 4. Settings: 1 stream thread. 5. Kafka start up: {code} 2017-09-18 14:13:16 INFO AppInfoParser:83 - Kafka version : 0.11.0.0 2017-09-18 14:13:16 INFO AppInfoParser:84 - Kafka commitId : cb8625948210849f 2017-09-18 14:13:16 DEBUG KafkaProducer:410 - Kafka producer started 2017-09-18 14:13:16 DEBUG StateDirectory:135 - stream-thread [streamer-923cc4d5-22aa-40f2-95ff-0fbda292b346-StreamThread-1] Acquired state dir lock for task 13_5 2017-09-18 14:13:16 DEBUG Sender:157 - Starting Kafka producer I/O thread. 2017-09-18 14:13:16 INFO ProcessorStateManager:122 - task [13_5] Created state store manager for task 13_5 with the acquired state dir lock 2017-09-18 14:13:17 DEBUG Metrics:403 - Added sensor with name commit 2017-09-18 14:13:17 DEBUG Metrics:403 - Added sensor with name 13_5-commit 2017-09-18 14:13:17 INFO StreamThread:1248 - stream-thread [streamer-923cc4d5-22aa-40f2-95ff-0fbda292b346-StreamThread-1] Created active task 13_5 with assigned partitions [a0291_topic-5] 2017-09-18 14:13:17 INFO StreamThread:193 - stream-thread [streamer-923cc4d5-22aa-40f2-95ff-0fbda292b346-StreamThread-1] partition assignment took 68 ms. current active tasks: [] current standby tasks: [] 2017-09-18 14:13:17 ERROR ConsumerCoordinator:269 - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener for group streamer failed on partition assignment java.lang.NullPointerException: null at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123) ~[myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) ~[myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) ~[myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) ~[myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) ~[myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) ~[myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) ~[myapp-streamer.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) [myapp-streamer.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) [myapp-streamer.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) [myapp-streamer.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) [myapp-streamer.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) [myapp-streamer.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) [myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) [myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) [myapp-streamer.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) [myapp-streamer.jar:?] 2017-09-18 14:13:17 DEBUG ConsumerCoordinator:826 - Group streamer fetching committed offsets for partitions: [gai34_topic-5, gai34_topic-9, int06_topic-8, gai34_topic-7, int77_topic-8, a0291_topic-6, a0291_topic-8, gai10_topic-9, int06_topic-3, gai10_topic-7, int06_topic-5, gai10_topic- 5, int62_topic-8, gai10_topic-3, gai34_topic-4, gai34_topic-8, int06_topic-7, c0737_topic-9, gai34_topic-6, int06_topic-9, int77_topic-7, a0291_topic-9, a0291_topic-5, gai10_topic-8, a0291_topic-7, int06_topic-4, gai10_topic-6, int06_topic-6, gai10_topic-4, int62_topic-9, int62_topic-7, in t77_topic-9] {code} This week I will intensively test Kafka 0.11.0.1, perhaps it will be better. was (Author: habdank): Scenario for Kafka (0.11.0.0): 1. Kafka broker with 2 nodes (active) + 3 Zookeeper nodes are working. 2. One instance of the streamin
[jira] [Created] (KAFKA-5923) Output generated by kafka_acls, kafka_topics. kafka_topics should be easily usable in a pipe
Holger Rauch created KAFKA-5923: --- Summary: Output generated by kafka_acls, kafka_topics. kafka_topics should be easily usable in a pipe Key: KAFKA-5923 URL: https://issues.apache.org/jira/browse/KAFKA-5923 Project: Kafka Issue Type: New Feature Components: tools Affects Versions: 0.11.0.0 Environment: Linux Reporter: Holger Rauch Priority: Minor The output produced by {{kafka_topics}}, {{kafka_acls}}, and {{kafka_configs}} (or rather, their corresponding, underlying classes) should be suitable for use in a pipe (e.g. to be piped into {{grep}} when these commands are used in an Ansible playbook/role). AFAIK, the current implementations produce free form text. Using that inside a grep can be error prone IMHO (especially in case the output should change from one release to the next). A more reliable approach would be to provide a cmd line switch to enable machine parseable output suitable for use in pipes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5924) Add the compose method to the Kafka Stream API
Laurent T created KAFKA-5924: Summary: Add the compose method to the Kafka Stream API Key: KAFKA-5924 URL: https://issues.apache.org/jira/browse/KAFKA-5924 Project: Kafka Issue Type: Wish Components: streams Reporter: Laurent T Priority: Minor Hi, I'm referencing RxJava for it's [compose method|https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators#transformational-operators] which is very handy. It would be great if the Streams API would give us something similar. It's pretty easy to implement and allows to have much more clarity to the code (it avoids breaking the linearity of the code when you want to reuse parts of the stream topology). e.g. Without compose: {code:java} TopologyUtils .myUtil(topology .map(...) .flatMap(...) .through(...)) .map(...) .to(...); {code} With compose: {code:java} topology .map(...) .flatMap(...) .through(...) .compose(TopologyUtils::myUtil) .map(...) .to(...); {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5873) Add Materialized overloads to StreamBuilder
[ https://issues.apache.org/jira/browse/KAFKA-5873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-5873. --- Issue resolved by pull request 3837 [https://github.com/apache/kafka/pull/3837] > Add Materialized overloads to StreamBuilder > --- > > Key: KAFKA-5873 > URL: https://issues.apache.org/jira/browse/KAFKA-5873 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Assignee: Damian Guy > Fix For: 1.0.0 > > > Add the overloads from KIP-182 that use {{Materialized}} to {{StreamsBuilder}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5873) Add Materialized overloads to StreamBuilder
[ https://issues.apache.org/jira/browse/KAFKA-5873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170112#comment-16170112 ] ASF GitHub Bot commented on KAFKA-5873: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3837 > Add Materialized overloads to StreamBuilder > --- > > Key: KAFKA-5873 > URL: https://issues.apache.org/jira/browse/KAFKA-5873 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Assignee: Damian Guy > Fix For: 1.0.0 > > > Add the overloads from KIP-182 that use {{Materialized}} to {{StreamsBuilder}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5917) Kafka not starting
[ https://issues.apache.org/jira/browse/KAFKA-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170134#comment-16170134 ] Balu commented on KAFKA-5917: - Thanks mani & huxihx. Iam new to kafka can you help me where exactly i can find these /config/changes/config_change folder to remove manually. is it in Kafka nodes or zookeeper nodes. Sorry if i ask basic q. > Kafka not starting > -- > > Key: KAFKA-5917 > URL: https://issues.apache.org/jira/browse/KAFKA-5917 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.10.0.0 >Reporter: Balu > > Getting this error in kafka,zookeeper,schema repository cluster. > FATAL [Kafka Server 3], Fatal error during KafkaServer startup. Prepare to > shutdown (kafka.server.KafkaServer) > java.lang.IllegalArgumentException: requirement failed > at scala.Predef$.require(Predef.scala:212) > at > kafka.server.DynamicConfigManager$ConfigChangedNotificationHandler$.processNotification(DynamicConfigManager.scala:90) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2$$anonfun$apply$2.apply(ZkNodeChangeNotificationListener.scala:95) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2$$anonfun$apply$2.apply(ZkNodeChangeNotificationListener.scala:95) > at scala.Option.map(Option.scala:146) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2.apply(ZkNodeChangeNotificationListener.scala:95) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2.apply(ZkNodeChangeNotificationListener.scala:90) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.common.ZkNodeChangeNotificationListener.kafka$common$ZkNodeChangeNotificationListener$$processNotifications(ZkNodeChangeNotificationListener.scala:90) > at > kafka.common.ZkNodeChangeNotificationListener.processAllNotifications(ZkNodeChangeNotificationListener.scala:79) > at > kafka.common.ZkNodeChangeNotificationListener.init(ZkNodeChangeNotificationListener.scala:67) > at > kafka.server.DynamicConfigManager.startup(DynamicConfigManager.scala:122) > at kafka.server.KafkaServer.startup(KafkaServer.scala:233) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37) > at kafka.Kafka$.main(Kafka.scala:67) > at kafka.Kafka.main(Kafka.scala) > Please help -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5910) Kafka 0.11.0.1 Kafka consumer/producers retries in infinite loop when wrong SASL creds are passed
[ https://issues.apache.org/jira/browse/KAFKA-5910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170159#comment-16170159 ] Edoardo Comar commented on KAFKA-5910: -- No Kafka 1.0 isn't released yet. Please check the kafka-dev mailing list for progress. > Kafka 0.11.0.1 Kafka consumer/producers retries in infinite loop when wrong > SASL creds are passed > - > > Key: KAFKA-5910 > URL: https://issues.apache.org/jira/browse/KAFKA-5910 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.11.0.0 >Reporter: Ramkumar > > Similar to https://issues.apache.org/jira/browse/KAFKA-4764 , but the status > shows patch available but the client wont disconnects after getting the > warning. > Issue 1: > Publisher flow: > Kafka publisher goes into infinite loop if the AAF credentials are wrong when > authenticating in Kaka broker. > Detail: > If the correct user name and password are used at the kafka publisher client > side to connect to kafka broker, then it authenticates and authorizes fine. > If incorrect username or password is used at the kafka publisher client > side, then broker logs shows a continuous (infinite loop) log showing client > is trying to reconnect the broker as it doesn’t get authentication failure > exception from broker. > JIRA defect in apache: > https://issues.apache.org/jira/browse/KAFKA-4764 > Can you pls let me know if this issue is resolved in kafka 0.11.0.1 version > or still an open issue? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5917) Kafka not starting
[ https://issues.apache.org/jira/browse/KAFKA-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170164#comment-16170164 ] Balu commented on KAFKA-5917: - can you provide syntax to delete config changes as i see few changes with version:2 {"version":2,"entity_path":"topics/* > Kafka not starting > -- > > Key: KAFKA-5917 > URL: https://issues.apache.org/jira/browse/KAFKA-5917 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.10.0.0 >Reporter: Balu > > Getting this error in kafka,zookeeper,schema repository cluster. > FATAL [Kafka Server 3], Fatal error during KafkaServer startup. Prepare to > shutdown (kafka.server.KafkaServer) > java.lang.IllegalArgumentException: requirement failed > at scala.Predef$.require(Predef.scala:212) > at > kafka.server.DynamicConfigManager$ConfigChangedNotificationHandler$.processNotification(DynamicConfigManager.scala:90) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2$$anonfun$apply$2.apply(ZkNodeChangeNotificationListener.scala:95) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2$$anonfun$apply$2.apply(ZkNodeChangeNotificationListener.scala:95) > at scala.Option.map(Option.scala:146) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2.apply(ZkNodeChangeNotificationListener.scala:95) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2.apply(ZkNodeChangeNotificationListener.scala:90) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.common.ZkNodeChangeNotificationListener.kafka$common$ZkNodeChangeNotificationListener$$processNotifications(ZkNodeChangeNotificationListener.scala:90) > at > kafka.common.ZkNodeChangeNotificationListener.processAllNotifications(ZkNodeChangeNotificationListener.scala:79) > at > kafka.common.ZkNodeChangeNotificationListener.init(ZkNodeChangeNotificationListener.scala:67) > at > kafka.server.DynamicConfigManager.startup(DynamicConfigManager.scala:122) > at kafka.server.KafkaServer.startup(KafkaServer.scala:233) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37) > at kafka.Kafka$.main(Kafka.scala:67) > at kafka.Kafka.main(Kafka.scala) > Please help -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5925) Adding records deletion operation to the new Admin Client API
Paolo Patierno created KAFKA-5925: - Summary: Adding records deletion operation to the new Admin Client API Key: KAFKA-5925 URL: https://issues.apache.org/jira/browse/KAFKA-5925 Project: Kafka Issue Type: Improvement Components: admin Reporter: Paolo Patierno Assignee: Paolo Patierno Priority: Minor Hi, The [KIP-107|https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient] provides a way to delete messages starting from a specified offset inside a topic partition which we don’t want to take anymore so without relying on time-based and size-based log retention policies. The already implemented protocol request and response messages (DeleteRecords API, key 21) are used only by the “legacy” Admin Client in Scala and aren’t provided by the new Admin Client API in Java. The [KIP-204|https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+adding+records+deletion+operation+to+the+new+Admin+Client+API] is about addressing this JIRA. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4714) Implement remaining KIP-66 SMTs
[ https://issues.apache.org/jira/browse/KAFKA-4714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170244#comment-16170244 ] Ewen Cheslack-Postava commented on KAFKA-4714: -- [~dhananjaydp] The record is maintained internally, but it is not exposed to the connector. It's somewhat counter to the point of SMTs since in some cases you may be trying to do things like remove PII, in which case you definitely don't want to give the connector task a chance to see that data. Regarding support for a transformation after SinkTask.put(), that doesn't really make sense since the whole point of put() is that it passes the data to the task to be written to the external system -- not only is the message in a different system, it's likely been converted to some native format for that system and is no longer in Connect's data API format. Is there a reason you want the original value in the task? Can you describe your use case? > Implement remaining KIP-66 SMTs > --- > > Key: KAFKA-4714 > URL: https://issues.apache.org/jira/browse/KAFKA-4714 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava > Fix For: 0.11.0.0 > > > Three didn't make it for the 0.10.2.0 release: Flatten, Cast, and > TimestampConverter. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5825) Streams not processing when exactly once is set
[ https://issues.apache.org/jira/browse/KAFKA-5825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170267#comment-16170267 ] Ryan Worsley commented on KAFKA-5825: - Hi [~guozhang] and [~mjsax] - I'd really appreciate any further suggestions on how to take this forward. The organisation I represent is really keen to prove out Kafka transactions. Really grateful for any help you can give. > Streams not processing when exactly once is set > --- > > Key: KAFKA-5825 > URL: https://issues.apache.org/jira/browse/KAFKA-5825 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 > Environment: EmbeddedKafka running on Windows. Relevant files > attached. >Reporter: Ryan Worsley > Attachments: build.sbt, log4j.properties, log-output.txt, Tests.scala > > > +Set-up+ > I'm using [EmbeddedKafka|https://github.com/manub/scalatest-embedded-kafka/] > for ScalaTest. > This spins up a single broker internally on a random port. > I've written two tests - the first without transactions, the second with. > They're nearly identical apart from the config and the transactional > semantics. I've written the transactional version based on Neha's > [blog|https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/] > which is the closest thing I could find to instructions. > The tests wait until a single message is processed by the streams topology, > they use this message to complete a promise that the test is waiting on. > Once the promise completes the test verifies the value of the promise as > being the expected value of the message. > +Observed behaviour+ > The first test passes fine, the second test times out, the stream processor > never seems to read the transactional message. > +Notes+ > I've attached my build.sbt, log4j.properties and my Tests.scala file in order > to make it as easy as possible for someone to re-create. I'm running on > Windows and using Scala as this reflects my workplace. I completely expect > there to be some configuration issue that's causing this, but am unable to > proceed at this time. > Related information: > https://github.com/manub/scalatest-embedded-kafka/issues/82 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170270#comment-16170270 ] Ewen Cheslack-Postava commented on KAFKA-5716: -- The basic analysis sounds right. That javadoc is from way back from the initial PR. I can't remember if we even had the separate thread for offset commits at that point, so it may have just grown outdated (and that may have been the case just based on my development of the initial version and may have just been missed in the initial review, which given it was an [11k line patch|https://github.com/apache/kafka/pull/99], I'm not surprised some things were missed in the review... There's actually another problem that reviewing this revealed that, at best is important but not clearly documented and at worst is another bug: WorkerSourceTask doesn't have synchronization around at least one call to commitSourceTask (which invokes the SourceTask.commit()) method so you may call this while the main thread is invoking some other method on the connector. I think we document elsewhere that stop() has this behavior (to interrupt polling) and so perhaps people would have handled synchronization properly, but more likely there would just be bugs. Correcting the code would be tough. We could update the javadoc, but I'd propose a different solution: let's just remove the method. It's bad that it's not actually doing what it claims and correcting it would require committing offsets synchronously which we don't want to do either. Given its current state, it seems unlikely anyone is actually using this functionality anyway. The intent was to allow you to, e.g., delete or ack data once you know it has been committed, but it clearly isn't serving that purpose and commitRecord() was added to give finer grained feedback. If someone did want this functionality, we likely should just add a new commit(Map) variant that can actually express to the user what we actually want... It'd be interesting to know if the method is overridden in any connectors that we know about in the wild. If we just wanted a short term fix, we could definitely update the javadoc to make it clear what's actually happening and that this probably isn't what you want. > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > --- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Per Steffensen >Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and o
[jira] [Updated] (KAFKA-5924) Add the compose method to the Kafka Stream API
[ https://issues.apache.org/jira/browse/KAFKA-5924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5924: --- Labels: needs-kip (was: ) > Add the compose method to the Kafka Stream API > -- > > Key: KAFKA-5924 > URL: https://issues.apache.org/jira/browse/KAFKA-5924 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Laurent T >Priority: Minor > Labels: needs-kip > > Hi, > I'm referencing RxJava for it's [compose > method|https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators#transformational-operators] > which is very handy. It would be great if the Streams API would give us > something similar. It's pretty easy to implement and allows to have much more > clarity to the code (it avoids breaking the linearity of the code when you > want to reuse parts of the stream topology). e.g. > Without compose: > {code:java} > TopologyUtils > .myUtil(topology > .map(...) > .flatMap(...) > .through(...)) > .map(...) > .to(...); > {code} > With compose: > {code:java} > topology > .map(...) > .flatMap(...) > .through(...) > .compose(TopologyUtils::myUtil) > .map(...) > .to(...); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5893) ResetIntegrationTest fails
[ https://issues.apache.org/jira/browse/KAFKA-5893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5893: --- Description: {noformat} java.lang.AssertionError: expected:<0> but was:<1> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:834) at org.junit.Assert.assertEquals(Assert.java:645) at org.junit.Assert.assertEquals(Assert.java:631) at org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363) at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) {noformat} One issue with debugging is, that we catch exceptions and print the exception message that is {{null}}: {noformat} Standard Error ERROR: null ERROR: null {noformat} After print the stack trace in case of failure, we got: {noformat} ERROR: java.lang.NullPointerException java.lang.NullPointerException at kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194) at kafka.tools.StreamsResetter.run(StreamsResetter.java:121) at org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362) at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) {noformat} was: {noformat} java.lang.AssertionError: expected:<0> but was:<1> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:834) at org.junit.Assert.assertEquals(Assert.java:645) at org.junit.Assert.assertEquals(Assert.java:631) at org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363) at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) {noformat} -One issue with debugging is, that we catch exceptions and print the exception message that is {{null}}: {noformat} Standard Error ERROR: null ERROR: null {noformat}- After print the stack trace in case of failure, we got: {noformat} ERROR: java.lang.NullPointerException java.lang.NullPointerException at kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194) at kafka.tools.StreamsResetter.run(StreamsResetter.java:121) at org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362) at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) {noformat} > ResetIntegrationTest fails > -- > > Key: KAFKA-5893 > URL: https://issues.apache.org/jira/browse/KAFKA-5893 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 0.11.0.0, 0.11.0.1 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > > {noformat} > java.lang.AssertionError: expected:<0> but was:<1> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at org.junit.Assert.assertEquals(Assert.java:631) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) > {noformat} > One issue with debugging is, that we catch exceptions and print the exception > message that is {{null}}: > {noformat} > Standard Error > ERROR: null > ERROR: null > {noformat} > After print the stack trace in case of failure, we got: > {noformat} > ERROR: java.lang.NullPointerException > java.lang.NullPointerException > at > kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194) > at kafka.tools.StreamsResetter.run(StreamsResetter.java:121) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5893) ResetIntegrationTest fails
[ https://issues.apache.org/jira/browse/KAFKA-5893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5893: --- Description: {noformat} java.lang.AssertionError: expected:<0> but was:<1> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:834) at org.junit.Assert.assertEquals(Assert.java:645) at org.junit.Assert.assertEquals(Assert.java:631) at org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363) at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) {noformat} One issue with debugging is, that we catch exceptions and print the exception message that is {{null}}: {noformat} Standard Error ERROR: null ERROR: null {noformat} After printing the stack trace in case of failure, we got: {noformat} ERROR: java.lang.NullPointerException java.lang.NullPointerException at kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194) at kafka.tools.StreamsResetter.run(StreamsResetter.java:121) at org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362) at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) {noformat} was: {noformat} java.lang.AssertionError: expected:<0> but was:<1> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:834) at org.junit.Assert.assertEquals(Assert.java:645) at org.junit.Assert.assertEquals(Assert.java:631) at org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363) at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) {noformat} One issue with debugging is, that we catch exceptions and print the exception message that is {{null}}: {noformat} Standard Error ERROR: null ERROR: null {noformat} After print the stack trace in case of failure, we got: {noformat} ERROR: java.lang.NullPointerException java.lang.NullPointerException at kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194) at kafka.tools.StreamsResetter.run(StreamsResetter.java:121) at org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362) at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) {noformat} > ResetIntegrationTest fails > -- > > Key: KAFKA-5893 > URL: https://issues.apache.org/jira/browse/KAFKA-5893 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 0.11.0.0, 0.11.0.1 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > > {noformat} > java.lang.AssertionError: expected:<0> but was:<1> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at org.junit.Assert.assertEquals(Assert.java:631) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) > {noformat} > One issue with debugging is, that we catch exceptions and print the exception > message that is {{null}}: > {noformat} > Standard Error > ERROR: null > ERROR: null > {noformat} > After printing the stack trace in case of failure, we got: > {noformat} > ERROR: java.lang.NullPointerException > java.lang.NullPointerException > at > kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194) > at kafka.tools.StreamsResetter.run(StreamsResetter.java:121) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5893) ResetIntegrationTest fails
[ https://issues.apache.org/jira/browse/KAFKA-5893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5893: --- Description: {noformat} java.lang.AssertionError: expected:<0> but was:<1> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:834) at org.junit.Assert.assertEquals(Assert.java:645) at org.junit.Assert.assertEquals(Assert.java:631) at org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363) at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) {noformat} -One issue with debugging is, that we catch exceptions and print the exception message that is {{null}}: {noformat} Standard Error ERROR: null ERROR: null {noformat}- After print the stack trace in case of failure, we got: {noformat} ERROR: java.lang.NullPointerException java.lang.NullPointerException at kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194) at kafka.tools.StreamsResetter.run(StreamsResetter.java:121) at org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362) at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) {noformat} was: {noformat} java.lang.AssertionError: expected:<0> but was:<1> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:834) at org.junit.Assert.assertEquals(Assert.java:645) at org.junit.Assert.assertEquals(Assert.java:631) at org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363) at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) {noformat} One issue with debugging is, that we catch exceptions and print the exception message that is {{null}}: {noformat} Standard Error ERROR: null ERROR: null {noformat} > ResetIntegrationTest fails > -- > > Key: KAFKA-5893 > URL: https://issues.apache.org/jira/browse/KAFKA-5893 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 0.11.0.0, 0.11.0.1 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > > {noformat} > java.lang.AssertionError: expected:<0> but was:<1> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at org.junit.Assert.assertEquals(Assert.java:631) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) > {noformat} > -One issue with debugging is, that we catch exceptions and print the > exception message that is {{null}}: > {noformat} > Standard Error > ERROR: null > ERROR: null > {noformat}- > After print the stack trace in case of failure, we got: > {noformat} > ERROR: java.lang.NullPointerException > java.lang.NullPointerException > at > kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194) > at kafka.tools.StreamsResetter.run(StreamsResetter.java:121) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5917) Kafka not starting
[ https://issues.apache.org/jira/browse/KAFKA-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170443#comment-16170443 ] Balu commented on KAFKA-5917: - You made my day.. Kafka started successfully after deleting > Kafka not starting > -- > > Key: KAFKA-5917 > URL: https://issues.apache.org/jira/browse/KAFKA-5917 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.10.0.0 >Reporter: Balu > > Getting this error in kafka,zookeeper,schema repository cluster. > FATAL [Kafka Server 3], Fatal error during KafkaServer startup. Prepare to > shutdown (kafka.server.KafkaServer) > java.lang.IllegalArgumentException: requirement failed > at scala.Predef$.require(Predef.scala:212) > at > kafka.server.DynamicConfigManager$ConfigChangedNotificationHandler$.processNotification(DynamicConfigManager.scala:90) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2$$anonfun$apply$2.apply(ZkNodeChangeNotificationListener.scala:95) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2$$anonfun$apply$2.apply(ZkNodeChangeNotificationListener.scala:95) > at scala.Option.map(Option.scala:146) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2.apply(ZkNodeChangeNotificationListener.scala:95) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2.apply(ZkNodeChangeNotificationListener.scala:90) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.common.ZkNodeChangeNotificationListener.kafka$common$ZkNodeChangeNotificationListener$$processNotifications(ZkNodeChangeNotificationListener.scala:90) > at > kafka.common.ZkNodeChangeNotificationListener.processAllNotifications(ZkNodeChangeNotificationListener.scala:79) > at > kafka.common.ZkNodeChangeNotificationListener.init(ZkNodeChangeNotificationListener.scala:67) > at > kafka.server.DynamicConfigManager.startup(DynamicConfigManager.scala:122) > at kafka.server.KafkaServer.startup(KafkaServer.scala:233) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37) > at kafka.Kafka$.main(Kafka.scala:67) > at kafka.Kafka.main(Kafka.scala) > Please help -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5825) Streams not processing when exactly once is set
[ https://issues.apache.org/jira/browse/KAFKA-5825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170462#comment-16170462 ] Matthias J. Sax commented on KAFKA-5825: [~apurva] Maybe you can provide some input here? > Streams not processing when exactly once is set > --- > > Key: KAFKA-5825 > URL: https://issues.apache.org/jira/browse/KAFKA-5825 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 > Environment: EmbeddedKafka running on Windows. Relevant files > attached. >Reporter: Ryan Worsley > Attachments: build.sbt, log4j.properties, log-output.txt, Tests.scala > > > +Set-up+ > I'm using [EmbeddedKafka|https://github.com/manub/scalatest-embedded-kafka/] > for ScalaTest. > This spins up a single broker internally on a random port. > I've written two tests - the first without transactions, the second with. > They're nearly identical apart from the config and the transactional > semantics. I've written the transactional version based on Neha's > [blog|https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/] > which is the closest thing I could find to instructions. > The tests wait until a single message is processed by the streams topology, > they use this message to complete a promise that the test is waiting on. > Once the promise completes the test verifies the value of the promise as > being the expected value of the message. > +Observed behaviour+ > The first test passes fine, the second test times out, the stream processor > never seems to read the transactional message. > +Notes+ > I've attached my build.sbt, log4j.properties and my Tests.scala file in order > to make it as easy as possible for someone to re-create. I'm running on > Windows and using Scala as this reflects my workplace. I completely expect > there to be some configuration issue that's causing this, but am unable to > proceed at this time. > Related information: > https://github.com/manub/scalatest-embedded-kafka/issues/82 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5825) Streams not processing when exactly once is set
[ https://issues.apache.org/jira/browse/KAFKA-5825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170495#comment-16170495 ] Apurva Mehta commented on KAFKA-5825: - I looked at the test log that was shared. It is conclusive that a transactional record was written and committed. It seems that the streams app was reading in {{READ_UNCOMMITTED}} mode when exactly once was set. It also fetched 2 records from the topic, per the log, since the position of the consumer at the end was 2. It would help to share the actual data in my-topic-0 to validate that it actually has a message with the desired key and value, and with the transactional bit set. It should also have the commit record. It would also help to run a regular (non-streams) read-committed consumer on this log, to see if the message is actually returned. If the stock producer/consumer is working as expected, then we can isolate the problem to the streams code, and it would help debug. > Streams not processing when exactly once is set > --- > > Key: KAFKA-5825 > URL: https://issues.apache.org/jira/browse/KAFKA-5825 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 > Environment: EmbeddedKafka running on Windows. Relevant files > attached. >Reporter: Ryan Worsley > Attachments: build.sbt, log4j.properties, log-output.txt, Tests.scala > > > +Set-up+ > I'm using [EmbeddedKafka|https://github.com/manub/scalatest-embedded-kafka/] > for ScalaTest. > This spins up a single broker internally on a random port. > I've written two tests - the first without transactions, the second with. > They're nearly identical apart from the config and the transactional > semantics. I've written the transactional version based on Neha's > [blog|https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/] > which is the closest thing I could find to instructions. > The tests wait until a single message is processed by the streams topology, > they use this message to complete a promise that the test is waiting on. > Once the promise completes the test verifies the value of the promise as > being the expected value of the message. > +Observed behaviour+ > The first test passes fine, the second test times out, the stream processor > never seems to read the transactional message. > +Notes+ > I've attached my build.sbt, log4j.properties and my Tests.scala file in order > to make it as easy as possible for someone to re-create. I'm running on > Windows and using Scala as this reflects my workplace. I completely expect > there to be some configuration issue that's causing this, but am unable to > proceed at this time. > Related information: > https://github.com/manub/scalatest-embedded-kafka/issues/82 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5825) Streams not processing when exactly once is set
[ https://issues.apache.org/jira/browse/KAFKA-5825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170517#comment-16170517 ] Ryan Worsley commented on KAFKA-5825: - Hi [~apurva], My understanding was that a streams app should only be reading in READ_COMMITTED mode when exactly-once is set (see the code I've attached). Indeed if I set the consumer config explicitly I get the following error; {{Unexpected user-specified consumer config isolation.level; because processing.guarantee is set to 'exactly_once' consumers will always read committed data only.}} Am I misunderstanding your comments above? I'd like to share the actual data in my-topic-0 - how exactly can I go about getting that for you? I'm hopefully able to take some time tomorrow to run a regular READ_COMMITTED consumer to check the behaviour for you - would appreciate your thoughts on the above though. > Streams not processing when exactly once is set > --- > > Key: KAFKA-5825 > URL: https://issues.apache.org/jira/browse/KAFKA-5825 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 > Environment: EmbeddedKafka running on Windows. Relevant files > attached. >Reporter: Ryan Worsley > Attachments: build.sbt, log4j.properties, log-output.txt, Tests.scala > > > +Set-up+ > I'm using [EmbeddedKafka|https://github.com/manub/scalatest-embedded-kafka/] > for ScalaTest. > This spins up a single broker internally on a random port. > I've written two tests - the first without transactions, the second with. > They're nearly identical apart from the config and the transactional > semantics. I've written the transactional version based on Neha's > [blog|https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/] > which is the closest thing I could find to instructions. > The tests wait until a single message is processed by the streams topology, > they use this message to complete a promise that the test is waiting on. > Once the promise completes the test verifies the value of the promise as > being the expected value of the message. > +Observed behaviour+ > The first test passes fine, the second test times out, the stream processor > never seems to read the transactional message. > +Notes+ > I've attached my build.sbt, log4j.properties and my Tests.scala file in order > to make it as easy as possible for someone to re-create. I'm running on > Windows and using Scala as this reflects my workplace. I completely expect > there to be some configuration issue that's causing this, but am unable to > proceed at this time. > Related information: > https://github.com/manub/scalatest-embedded-kafka/issues/82 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5825) Streams not processing when exactly once is set
[ https://issues.apache.org/jira/browse/KAFKA-5825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170564#comment-16170564 ] Matthias J. Sax commented on KAFKA-5825: I just had one more look into your code. Maybe it's just test setup issue. (1) it seems, you never close your {{KafkaStreams}} instance and (2) you use the same {{application.id}} for both runs. Thus, if you run both tests in parallel, both instances would form a consumer group and this would mess up the test. > Streams not processing when exactly once is set > --- > > Key: KAFKA-5825 > URL: https://issues.apache.org/jira/browse/KAFKA-5825 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 > Environment: EmbeddedKafka running on Windows. Relevant files > attached. >Reporter: Ryan Worsley > Attachments: build.sbt, log4j.properties, log-output.txt, Tests.scala > > > +Set-up+ > I'm using [EmbeddedKafka|https://github.com/manub/scalatest-embedded-kafka/] > for ScalaTest. > This spins up a single broker internally on a random port. > I've written two tests - the first without transactions, the second with. > They're nearly identical apart from the config and the transactional > semantics. I've written the transactional version based on Neha's > [blog|https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/] > which is the closest thing I could find to instructions. > The tests wait until a single message is processed by the streams topology, > they use this message to complete a promise that the test is waiting on. > Once the promise completes the test verifies the value of the promise as > being the expected value of the message. > +Observed behaviour+ > The first test passes fine, the second test times out, the stream processor > never seems to read the transactional message. > +Notes+ > I've attached my build.sbt, log4j.properties and my Tests.scala file in order > to make it as easy as possible for someone to re-create. I'm running on > Windows and using Scala as this reflects my workplace. I completely expect > there to be some configuration issue that's causing this, but am unable to > proceed at this time. > Related information: > https://github.com/manub/scalatest-embedded-kafka/issues/82 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5825) Streams not processing when exactly once is set
[ https://issues.apache.org/jira/browse/KAFKA-5825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170594#comment-16170594 ] Ryan Worsley commented on KAFKA-5825: - You're exactly right [~mjsax]! No wonder they pay you the big bucks... I just changed the code to insert a random UUID into the application ID _et voila_ it works as expected - well spotted! Really grateful for this as I can now progress our evaluation to the next phase. Much obliged. > Streams not processing when exactly once is set > --- > > Key: KAFKA-5825 > URL: https://issues.apache.org/jira/browse/KAFKA-5825 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 > Environment: EmbeddedKafka running on Windows. Relevant files > attached. >Reporter: Ryan Worsley > Attachments: build.sbt, log4j.properties, log-output.txt, Tests.scala > > > +Set-up+ > I'm using [EmbeddedKafka|https://github.com/manub/scalatest-embedded-kafka/] > for ScalaTest. > This spins up a single broker internally on a random port. > I've written two tests - the first without transactions, the second with. > They're nearly identical apart from the config and the transactional > semantics. I've written the transactional version based on Neha's > [blog|https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/] > which is the closest thing I could find to instructions. > The tests wait until a single message is processed by the streams topology, > they use this message to complete a promise that the test is waiting on. > Once the promise completes the test verifies the value of the promise as > being the expected value of the message. > +Observed behaviour+ > The first test passes fine, the second test times out, the stream processor > never seems to read the transactional message. > +Notes+ > I've attached my build.sbt, log4j.properties and my Tests.scala file in order > to make it as easy as possible for someone to re-create. I'm running on > Windows and using Scala as this reflects my workplace. I completely expect > there to be some configuration issue that's causing this, but am unable to > proceed at this time. > Related information: > https://github.com/manub/scalatest-embedded-kafka/issues/82 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5825) Streams not processing when exactly once is set
[ https://issues.apache.org/jira/browse/KAFKA-5825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Worsley resolved KAFKA-5825. - Resolution: Done > Streams not processing when exactly once is set > --- > > Key: KAFKA-5825 > URL: https://issues.apache.org/jira/browse/KAFKA-5825 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 > Environment: EmbeddedKafka running on Windows. Relevant files > attached. >Reporter: Ryan Worsley > Attachments: build.sbt, log4j.properties, log-output.txt, Tests.scala > > > +Set-up+ > I'm using [EmbeddedKafka|https://github.com/manub/scalatest-embedded-kafka/] > for ScalaTest. > This spins up a single broker internally on a random port. > I've written two tests - the first without transactions, the second with. > They're nearly identical apart from the config and the transactional > semantics. I've written the transactional version based on Neha's > [blog|https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/] > which is the closest thing I could find to instructions. > The tests wait until a single message is processed by the streams topology, > they use this message to complete a promise that the test is waiting on. > Once the promise completes the test verifies the value of the promise as > being the expected value of the message. > +Observed behaviour+ > The first test passes fine, the second test times out, the stream processor > never seems to read the transactional message. > +Notes+ > I've attached my build.sbt, log4j.properties and my Tests.scala file in order > to make it as easy as possible for someone to re-create. I'm running on > Windows and using Scala as this reflects my workplace. I completely expect > there to be some configuration issue that's causing this, but am unable to > proceed at this time. > Related information: > https://github.com/manub/scalatest-embedded-kafka/issues/82 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5926) --force option is ginored by kafka-configs and kafka-topics tools
Mickael Maison created KAFKA-5926: - Summary: --force option is ginored by kafka-configs and kafka-topics tools Key: KAFKA-5926 URL: https://issues.apache.org/jira/browse/KAFKA-5926 Project: Kafka Issue Type: Bug Components: tools Reporter: Mickael Maison Assignee: Mickael Maison Both ConfigCommand and TopicCommand list a --force option in their help but it is not used. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5893) ResetIntegrationTest fails
[ https://issues.apache.org/jira/browse/KAFKA-5893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170726#comment-16170726 ] ASF GitHub Bot commented on KAFKA-5893: --- GitHub user mjsax opened a pull request: https://github.com/apache/kafka/pull/3893 KAFKA-5893: Preserve original System.out in PrintedTest You can merge this pull request into a Git repository by running: $ git pull https://github.com/mjsax/kafka kafka-5893-reset-integration-test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3893.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3893 > ResetIntegrationTest fails > -- > > Key: KAFKA-5893 > URL: https://issues.apache.org/jira/browse/KAFKA-5893 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 0.11.0.0, 0.11.0.1 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > > {noformat} > java.lang.AssertionError: expected:<0> but was:<1> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at org.junit.Assert.assertEquals(Assert.java:631) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) > {noformat} > One issue with debugging is, that we catch exceptions and print the exception > message that is {{null}}: > {noformat} > Standard Error > ERROR: null > ERROR: null > {noformat} > After printing the stack trace in case of failure, we got: > {noformat} > ERROR: java.lang.NullPointerException > java.lang.NullPointerException > at > kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194) > at kafka.tools.StreamsResetter.run(StreamsResetter.java:121) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4159) Allow overriding producer & consumer properties at the connector level
[ https://issues.apache.org/jira/browse/KAFKA-4159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170809#comment-16170809 ] Shrijeet Paliwal commented on KAFKA-4159: - Few other scenarios where this would be useful * Being able to control `max.poll.records` etc. per connector. I have a custom connector which is slow to process a record compared to others. I don't want to decrease `max.poll.records` for all connectors, just mine. * Being able to set the desired `auto.offset.reset` behavior per connector > Allow overriding producer & consumer properties at the connector level > -- > > Key: KAFKA-4159 > URL: https://issues.apache.org/jira/browse/KAFKA-4159 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Stephen Durfey > > As an example use cases, overriding a sink connector's consumer's partition > assignment strategy. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5195) Endless NotLeaderForPartitionException for ReplicaFetcherThread
[ https://issues.apache.org/jira/browse/KAFKA-5195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170824#comment-16170824 ] Alexey Pervushin commented on KAFKA-5195: - We have the same issue. It happened once. In server.log I have enormous number of messages: {noformat} [2017-09-02 14:02:19,074] ERROR {ReplicaFetcherThread-0-123} [ReplicaFetcherThread-0-123], Error for partition [partition_name, 0] to broker 123:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread) {noformat} Slightly prior that I noticed pretty significant GC collection time spike and a lot of messages about network issues to/from this broker(probably because of GC freeze), like: {noformat} [2017-09-02 14:04:53,550] WARN {main-SendThread(10.69.102.249:2181)} Client session timed out, have not h eard from server in 4371ms for sessionid 0x35de55b9852d48b (org.apache.zookeeper.ClientCnxn) ... [2017-09-02 14:05:28,739] WARN {main-SendThread(10.69.145.152:2181)} Unable to reconnect to ZooKeeper ser vice, session 0x35de55b9852d48b has expired (org.apache.zookeeper.ClientCnxn) {noformat} and {noformat} [2017-09-02 14:02:18,655] WARN {ReplicaFetcherThread-0-124} [ReplicaFetcherThread-0-124], Err or in fetch kafka.server.ReplicaFetcherThread$FetchRequest@33884d9f (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 124 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$appl y$1.apply(NetworkClientBlockingOps.scala:114) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$appl y$1.apply(NetworkClientBlockingOps.scala:112) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(Network ClientBlockingOps.scala:112) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(Network ClientBlockingOps.scala:108) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:136) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$e xtension(NetworkClientBlockingOps.scala:142) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOp s.scala:108) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:249) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) {noformat} We use Kafka = 0.10.2.1 + backported patch from https://issues.apache.org/jira/browse/KAFKA-5413 java-8-oracle-1.8.0.92 with G1 as GC. Broker configuration: {noformat} broker.id= log.dirs= zookeeper.connect=... auto.create.topics.enable=true connections.max.idle.ms=360 default.replication.factor=3 delete.topic.enable=true group.max.session.timeout.ms=30 inter.broker.protocol.version=0.10.2.0 log.cleaner.dedupe.buffer.size=536870912 log.cleaner.enable=true log.message.format.version=0.9.0.1 log.retention.hours=72 log.segment.bytes=268435456 message.max.bytes=100 min.insync.replicas=2 num.io.threads=5 offsets.retention.minutes=4320 offsets.topic.segment.bytes=104857600 replica.fetch.max.bytes=10485760 request.timeout.ms=31 reserved.broker.max.id=2113929216 unclean.leader.election.enable=false {noformat} > Endless NotLeaderForPartitionException for ReplicaFetcherThread > --- > > Key: KAFKA-5195 > URL: https://issues.apache.org/jira/browse/KAFKA-5195 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.1 > Environment: 3 Kafka brokers on top of Kubernetes, using Docker image > wurstmeister/kafka:0.10.1.1. > Environment variables: > KAFKA_ADVERTISED_HOST_NAME: kafka-ypimp-2 > KAFKA_ADVERTISED_PORT: 9092 > KAFKA_ZOOKEEPER_CONNECT: > zookeeper-ypimp-0:2181,zookeeper-ypimp-1:2181,zookeeper-ypimp-2:2181 > KAFKA_DELETE_TOPIC_ENABLE: true > KAFKA_BROKER_ID:2 > JMX_PORT: 1099 > KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote > -Dcom.sun.management.jmxremote.authenticate=false > -Dcom.sun.management.jmxremote.ssl=false > -Djava.rmi.server.hostname=kafka-ypimp-2.default.svc.cluster.l
[jira] [Comment Edited] (KAFKA-5195) Endless NotLeaderForPartitionException for ReplicaFetcherThread
[ https://issues.apache.org/jira/browse/KAFKA-5195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170824#comment-16170824 ] Alexey Pervushin edited comment on KAFKA-5195 at 9/18/17 10:23 PM: --- We have the same issue. It happened once. In server.log I have enormous number of messages: {noformat} [2017-09-02 14:02:19,074] ERROR {ReplicaFetcherThread-0-123} [ReplicaFetcherThread-0-123], Error for partition [partition_name, 0] to broker 123:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread) {noformat} Slightly prior that I noticed pretty significant GC collection time spike and a lot of messages about network issues to/from this broker(probably because of GC freeze), like: {noformat} [2017-09-02 14:04:53,550] WARN {main-SendThread(:2181)} Client session timed out, have not h eard from server in 4371ms for sessionid 0x35de55b9852d48b (org.apache.zookeeper.ClientCnxn) ... [2017-09-02 14:05:28,739] WARN {main-SendThread(:2181)} Unable to reconnect to ZooKeeper ser vice, session 0x35de55b9852d48b has expired (org.apache.zookeeper.ClientCnxn) {noformat} and {noformat} [2017-09-02 14:02:18,655] WARN {ReplicaFetcherThread-0-124} [ReplicaFetcherThread-0-124], Err or in fetch kafka.server.ReplicaFetcherThread$FetchRequest@33884d9f (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 124 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$appl y$1.apply(NetworkClientBlockingOps.scala:114) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$appl y$1.apply(NetworkClientBlockingOps.scala:112) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(Network ClientBlockingOps.scala:112) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(Network ClientBlockingOps.scala:108) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:136) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$e xtension(NetworkClientBlockingOps.scala:142) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOp s.scala:108) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:249) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) {noformat} We use Kafka = 0.10.2.1 + backported patch from https://issues.apache.org/jira/browse/KAFKA-5413 java-8-oracle-1.8.0.92 with G1 as GC. Broker configuration: {noformat} broker.id= log.dirs= zookeeper.connect=... auto.create.topics.enable=true connections.max.idle.ms=360 default.replication.factor=3 delete.topic.enable=true group.max.session.timeout.ms=30 inter.broker.protocol.version=0.10.2.0 log.cleaner.dedupe.buffer.size=536870912 log.cleaner.enable=true log.message.format.version=0.9.0.1 log.retention.hours=72 log.segment.bytes=268435456 message.max.bytes=100 min.insync.replicas=2 num.io.threads=5 offsets.retention.minutes=4320 offsets.topic.segment.bytes=104857600 replica.fetch.max.bytes=10485760 request.timeout.ms=31 reserved.broker.max.id=2113929216 unclean.leader.election.enable=false {noformat} was (Author: billyevans): We have the same issue. It happened once. In server.log I have enormous number of messages: {noformat} [2017-09-02 14:02:19,074] ERROR {ReplicaFetcherThread-0-123} [ReplicaFetcherThread-0-123], Error for partition [partition_name, 0] to broker 123:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread) {noformat} Slightly prior that I noticed pretty significant GC collection time spike and a lot of messages about network issues to/from this broker(probably because of GC freeze), like: {noformat} [2017-09-02 14:04:53,550] WARN {main-SendThread(10.69.102.249:2181)} Client session timed out, have not h eard from server in 4371ms for sessionid 0x35de55b9852d48b (org.apache.zookeeper.ClientCnxn) ... [2017-09-02 14:05:28,739] WARN {main-SendThread(10.69.145.152:2181)} Unable to reconnect to ZooKeeper ser vice, session 0x35de55b9852d48b has expired (org.apache.zookeeper.ClientCnxn) {noformat} and {n
[jira] [Commented] (KAFKA-5833) Reset thread interrupt state in case of InterruptedException
[ https://issues.apache.org/jira/browse/KAFKA-5833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170849#comment-16170849 ] ASF GitHub Bot commented on KAFKA-5833: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3841 > Reset thread interrupt state in case of InterruptedException > > > Key: KAFKA-5833 > URL: https://issues.apache.org/jira/browse/KAFKA-5833 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Ted Yu >Assignee: Matthias J. Sax > Labels: newbie++ > Fix For: 1.0.0 > > > There are some places where InterruptedException is caught but thread > interrupt state is not reset. > e.g. from WorkerSourceTask#execute() : > {code} > } catch (InterruptedException e) { > // Ignore and allow to exit. > {code} > Proper way of handling InterruptedException is to reset thread interrupt > state. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5893) ResetIntegrationTest fails
[ https://issues.apache.org/jira/browse/KAFKA-5893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170896#comment-16170896 ] ASF GitHub Bot commented on KAFKA-5893: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3893 > ResetIntegrationTest fails > -- > > Key: KAFKA-5893 > URL: https://issues.apache.org/jira/browse/KAFKA-5893 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 0.11.0.0, 0.11.0.1 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > > {noformat} > java.lang.AssertionError: expected:<0> but was:<1> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at org.junit.Assert.assertEquals(Assert.java:631) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) > {noformat} > One issue with debugging is, that we catch exceptions and print the exception > message that is {{null}}: > {noformat} > Standard Error > ERROR: null > ERROR: null > {noformat} > After printing the stack trace in case of failure, we got: > {noformat} > ERROR: java.lang.NullPointerException > java.lang.NullPointerException > at > kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194) > at kafka.tools.StreamsResetter.run(StreamsResetter.java:121) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172) > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5927) Capitalise topicPurgatory Name
[ https://issues.apache.org/jira/browse/KAFKA-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chetna Chaudhari updated KAFKA-5927: Attachment: topicPurgatory.png > Capitalise topicPurgatory Name > -- > > Key: KAFKA-5927 > URL: https://issues.apache.org/jira/browse/KAFKA-5927 > Project: Kafka > Issue Type: Task >Reporter: Chetna Chaudhari >Priority: Minor > Attachments: topicPurgatory.png > > > This is a minor change, to capitalise topicPurgatory name to have consistent > logging. Please find attached snapshot > !topicPurgatory.png|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5927) Capitalise topicPurgatory Name
Chetna Chaudhari created KAFKA-5927: --- Summary: Capitalise topicPurgatory Name Key: KAFKA-5927 URL: https://issues.apache.org/jira/browse/KAFKA-5927 Project: Kafka Issue Type: Task Reporter: Chetna Chaudhari Priority: Minor This is a minor change, to capitalise topicPurgatory name to have consistent logging. Please find attached snapshot !topicPurgatory.png|thumbnail! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5927) Capitalise topicPurgatory Name
[ https://issues.apache.org/jira/browse/KAFKA-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chetna Chaudhari updated KAFKA-5927: Description: This is a minor change, to capitalise topicPurgatory name to have consistent logging. Please find attached snapshot !topicPurgatory.png|thumbnail! Have worked on it, [https://github.com/apache/kafka/pull/3853] . was: This is a minor change, to capitalise topicPurgatory name to have consistent logging. Please find attached snapshot !topicPurgatory.png|thumbnail! > Capitalise topicPurgatory Name > -- > > Key: KAFKA-5927 > URL: https://issues.apache.org/jira/browse/KAFKA-5927 > Project: Kafka > Issue Type: Task >Reporter: Chetna Chaudhari >Priority: Minor > Attachments: topicPurgatory.png > > > This is a minor change, to capitalise topicPurgatory name to have consistent > logging. Please find attached snapshot > !topicPurgatory.png|thumbnail! > Have worked on it, [https://github.com/apache/kafka/pull/3853] . -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5927) Capitalise topicPurgatory Name
[ https://issues.apache.org/jira/browse/KAFKA-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chetna Chaudhari updated KAFKA-5927: Labels: needs-kip (was: ) > Capitalise topicPurgatory Name > -- > > Key: KAFKA-5927 > URL: https://issues.apache.org/jira/browse/KAFKA-5927 > Project: Kafka > Issue Type: Task >Reporter: Chetna Chaudhari >Priority: Minor > Labels: needs-kip > Attachments: topicPurgatory.png > > > This is a minor change, to capitalise topicPurgatory name to have consistent > logging. Please find attached snapshot > !topicPurgatory.png|thumbnail! > Have worked on it, [https://github.com/apache/kafka/pull/3853] . -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5918) Fix minor typos and errors in the Kafka Streams turotial
[ https://issues.apache.org/jira/browse/KAFKA-5918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-5918. -- Resolution: Fixed Issue resolved by pull request 3883 [https://github.com/apache/kafka/pull/3883] > Fix minor typos and errors in the Kafka Streams turotial > > > Key: KAFKA-5918 > URL: https://issues.apache.org/jira/browse/KAFKA-5918 > Project: Kafka > Issue Type: Bug > Components: documentation >Reporter: Jakub Scholz >Priority: Minor > Fix For: 1.0.0 > > > I found several minor issues with the Kafka Streams tutorial: > * Some typos > ** "As shown above, it illustrate that the constructed ..." instead of "As > shown above, it illustrate_s_ that the constructed ..." > ** "same as Pipe.java below" instead of "same as Pipe.java _above_" > ** Wrong class name in the {{LineSplit}} example > * Incorrect imports for the code examples > ** Missing {{import org.apache.kafka.streams.kstream.KStream;}} in > {{LineSplit}} and {{WordCount}} example > * Unnecessary (and potentially confusing) split by whitespaces in the > {{WorkCount}} class (the split into words happened already in {{LineSplit}}) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5927) Capitalise topicPurgatory Name
[ https://issues.apache.org/jira/browse/KAFKA-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chetna Chaudhari updated KAFKA-5927: Description: This is a minor change, to capitalise topicPurgatory name to have consistent logging. Please find attached snapshot !topicPurgatory.png|thumbnail! Have worked on it. was: This is a minor change, to capitalise topicPurgatory name to have consistent logging. Please find attached snapshot !topicPurgatory.png|thumbnail! Have worked on it, [https://github.com/apache/kafka/pull/3853] . > Capitalise topicPurgatory Name > -- > > Key: KAFKA-5927 > URL: https://issues.apache.org/jira/browse/KAFKA-5927 > Project: Kafka > Issue Type: Task >Reporter: Chetna Chaudhari >Priority: Minor > Labels: needs-kip > Attachments: topicPurgatory.png > > > This is a minor change, to capitalise topicPurgatory name to have consistent > logging. Please find attached snapshot > !topicPurgatory.png|thumbnail! > Have worked on it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5918) Fix minor typos and errors in the Kafka Streams turotial
[ https://issues.apache.org/jira/browse/KAFKA-5918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170920#comment-16170920 ] ASF GitHub Bot commented on KAFKA-5918: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3883 > Fix minor typos and errors in the Kafka Streams turotial > > > Key: KAFKA-5918 > URL: https://issues.apache.org/jira/browse/KAFKA-5918 > Project: Kafka > Issue Type: Bug > Components: documentation >Reporter: Jakub Scholz >Priority: Minor > Fix For: 1.0.0 > > > I found several minor issues with the Kafka Streams tutorial: > * Some typos > ** "As shown above, it illustrate that the constructed ..." instead of "As > shown above, it illustrate_s_ that the constructed ..." > ** "same as Pipe.java below" instead of "same as Pipe.java _above_" > ** Wrong class name in the {{LineSplit}} example > * Incorrect imports for the code examples > ** Missing {{import org.apache.kafka.streams.kstream.KStream;}} in > {{LineSplit}} and {{WordCount}} example > * Unnecessary (and potentially confusing) split by whitespaces in the > {{WorkCount}} class (the split into words happened already in {{LineSplit}}) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-5918) Fix minor typos and errors in the Kafka Streams turotial
[ https://issues.apache.org/jira/browse/KAFKA-5918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reassigned KAFKA-5918: Assignee: Jakub Scholz > Fix minor typos and errors in the Kafka Streams turotial > > > Key: KAFKA-5918 > URL: https://issues.apache.org/jira/browse/KAFKA-5918 > Project: Kafka > Issue Type: Bug > Components: documentation >Reporter: Jakub Scholz >Assignee: Jakub Scholz >Priority: Minor > Fix For: 1.0.0 > > > I found several minor issues with the Kafka Streams tutorial: > * Some typos > ** "As shown above, it illustrate that the constructed ..." instead of "As > shown above, it illustrate_s_ that the constructed ..." > ** "same as Pipe.java below" instead of "same as Pipe.java _above_" > ** Wrong class name in the {{LineSplit}} example > * Incorrect imports for the code examples > ** Missing {{import org.apache.kafka.streams.kstream.KStream;}} in > {{LineSplit}} and {{WordCount}} example > * Unnecessary (and potentially confusing) split by whitespaces in the > {{WorkCount}} class (the split into words happened already in {{LineSplit}}) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5928) Avoid redundant requests to zookeeper when reassign topic partition
Genmao Yu created KAFKA-5928: Summary: Avoid redundant requests to zookeeper when reassign topic partition Key: KAFKA-5928 URL: https://issues.apache.org/jira/browse/KAFKA-5928 Project: Kafka Issue Type: Improvement Components: admin Affects Versions: 0.11.0.0, 0.10.2.1 Reporter: Genmao Yu We mistakenly request topic level information according to partitions config in the assignment json file. For example https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala#L550: {code} val validPartitions = proposedPartitionAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) } {code} If reassign 1000 partitions (in 10 topics), we need to request zookeeper 1000 times here. But actually we only need to request just 10 (topics) times. We test a large-scale assignment, about 10K partitions. It takes tens of minutes. After optimization, it will reduce to less than 1minute. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5928) Avoid redundant requests to zookeeper when reassign topic partition
[ https://issues.apache.org/jira/browse/KAFKA-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171042#comment-16171042 ] Genmao Yu commented on KAFKA-5928: -- I am working on this > Avoid redundant requests to zookeeper when reassign topic partition > --- > > Key: KAFKA-5928 > URL: https://issues.apache.org/jira/browse/KAFKA-5928 > Project: Kafka > Issue Type: Improvement > Components: admin >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Genmao Yu > > We mistakenly request topic level information according to partitions config > in the assignment json file. For example > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala#L550: > {code} > val validPartitions = proposedPartitionAssignment.filter { case (p, _) => > validatePartition(zkUtils, p.topic, p.partition) } > {code} > If reassign 1000 partitions (in 10 topics), we need to request zookeeper 1000 > times here. But actually we only need to request just 10 (topics) times. We > test a large-scale assignment, about 10K partitions. It takes tens of > minutes. After optimization, it will reduce to less than 1minute. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5928) Avoid redundant requests to zookeeper when reassign topic partition
[ https://issues.apache.org/jira/browse/KAFKA-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171046#comment-16171046 ] ASF GitHub Bot commented on KAFKA-5928: --- GitHub user uncleGen opened a pull request: https://github.com/apache/kafka/pull/3894 KAFKA-5928: Avoid redundant requests to zookeeper when reassign topic partition We mistakenly request topic level information according to partitions config in the assignment json file. For example https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala#L550: ``` val validPartitions = proposedPartitionAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) } ``` If reassign 1000 partitions (in 10 topics), we need to request zookeeper 1000 times here. But actually we only need to request just 10 (topics) times. We test a large-scale assignment, about 10K partitions. It takes tens of minutes. After optimization, it will reduce to less than 1minute. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uncleGen/kafka KAFKA-5928 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3894.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3894 commit f6c30e81c7110f72e254bb9dfa81a25f951b70a1 Author: 木艮 Date: 2017-09-19T03:01:20Z Avoid redundant requests to zookeeper when reassign topic partition > Avoid redundant requests to zookeeper when reassign topic partition > --- > > Key: KAFKA-5928 > URL: https://issues.apache.org/jira/browse/KAFKA-5928 > Project: Kafka > Issue Type: Improvement > Components: admin >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Genmao Yu > > We mistakenly request topic level information according to partitions config > in the assignment json file. For example > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala#L550: > {code} > val validPartitions = proposedPartitionAssignment.filter { case (p, _) => > validatePartition(zkUtils, p.topic, p.partition) } > {code} > If reassign 1000 partitions (in 10 topics), we need to request zookeeper 1000 > times here. But actually we only need to request just 10 (topics) times. We > test a large-scale assignment, about 10K partitions. It takes tens of > minutes. After optimization, it will reduce to less than 1minute. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5926) --force option is ginored by kafka-configs and kafka-topics tools
[ https://issues.apache.org/jira/browse/KAFKA-5926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171081#comment-16171081 ] huxihx commented on KAFKA-5926: --- Seems it's a duplicate of [KAFKA-5707|https://issues.apache.org/jira/browse/KAFKA-5707] which had been closed due to compatibility maintenance. > --force option is ginored by kafka-configs and kafka-topics tools > - > > Key: KAFKA-5926 > URL: https://issues.apache.org/jira/browse/KAFKA-5926 > Project: Kafka > Issue Type: Bug > Components: tools >Reporter: Mickael Maison >Assignee: Mickael Maison > > Both ConfigCommand and TopicCommand list a --force option in their help but > it is not used. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5917) Kafka not starting
[ https://issues.apache.org/jira/browse/KAFKA-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-5917. -- Resolution: Won't Fix These kinds of issues can be avoided once we completely move Kafka tools to Java Admin API. > Kafka not starting > -- > > Key: KAFKA-5917 > URL: https://issues.apache.org/jira/browse/KAFKA-5917 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.10.0.0 >Reporter: Balu > > Getting this error in kafka,zookeeper,schema repository cluster. > FATAL [Kafka Server 3], Fatal error during KafkaServer startup. Prepare to > shutdown (kafka.server.KafkaServer) > java.lang.IllegalArgumentException: requirement failed > at scala.Predef$.require(Predef.scala:212) > at > kafka.server.DynamicConfigManager$ConfigChangedNotificationHandler$.processNotification(DynamicConfigManager.scala:90) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2$$anonfun$apply$2.apply(ZkNodeChangeNotificationListener.scala:95) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2$$anonfun$apply$2.apply(ZkNodeChangeNotificationListener.scala:95) > at scala.Option.map(Option.scala:146) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2.apply(ZkNodeChangeNotificationListener.scala:95) > at > kafka.common.ZkNodeChangeNotificationListener$$anonfun$kafka$common$ZkNodeChangeNotificationListener$$processNotifications$2.apply(ZkNodeChangeNotificationListener.scala:90) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.common.ZkNodeChangeNotificationListener.kafka$common$ZkNodeChangeNotificationListener$$processNotifications(ZkNodeChangeNotificationListener.scala:90) > at > kafka.common.ZkNodeChangeNotificationListener.processAllNotifications(ZkNodeChangeNotificationListener.scala:79) > at > kafka.common.ZkNodeChangeNotificationListener.init(ZkNodeChangeNotificationListener.scala:67) > at > kafka.server.DynamicConfigManager.startup(DynamicConfigManager.scala:122) > at kafka.server.KafkaServer.startup(KafkaServer.scala:233) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37) > at kafka.Kafka$.main(Kafka.scala:67) > at kafka.Kafka.main(Kafka.scala) > Please help -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5914) Return MessageFormatVersion and MessageMaxBytes in MetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-5914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171163#comment-16171163 ] ASF GitHub Bot commented on KAFKA-5914: --- GitHub user apurvam opened a pull request: https://github.com/apache/kafka/pull/3896 KAFKA-5914 add message format version and message max bytes to metadata response Updated the `TopicResponse` part of the `MetadataResponse` to include the message format version and the message max bytes. One problem here is that we use the `TopicConfigHandler` to listen for topic changes. However this is not invoked for topic _creates_ since the change notification path is not updated during creates. I am not sure what the right solution is here. Intuitively, we should update the the change notification path for topic creates, but not sure if that has compatibility (or other) implications. TODO: 1. Add a more complete integration test where the client sends a real `MetadataRequest` and receives the proper `MetadataResponse`. 2. Rebase to incorporate Jason's changes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/apurvam/kafka KAFKA-5914-add-message-format-version-and-message-max-bytes-to-metadata-response Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3896.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3896 commit 7cc943c30be8bef4646580f19e5191ef7e476b98 Author: Apurva Mehta Date: 2017-09-19T04:29:00Z Initial commit with a few tests commit 5099d5163b071020cc627b6b0a7c4f388de99eaa Author: Apurva Mehta Date: 2017-09-19T06:05:43Z Added one more test > Return MessageFormatVersion and MessageMaxBytes in MetadataResponse > --- > > Key: KAFKA-5914 > URL: https://issues.apache.org/jira/browse/KAFKA-5914 > Project: Kafka > Issue Type: Sub-task >Reporter: Apurva Mehta >Assignee: Apurva Mehta > Fix For: 1.0.0 > > > As part of KIP-192, we want to send two additional fields in the > {{TopicMetadata}} which is part of the {{MetadataResponse}}. These fields are > the {{MessageFormatVersion}} and the {{MessageMaxBytes}}. > The {{MessageFormatVersion}} is required to implement > https://issues.apache.org/jira/browse/KAFKA-5794 . The latter will be > implemented in a future release, but with the changes proposed here, the said > future release will be backward compatible with 1.0.0 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4714) Implement remaining KIP-66 SMTs
[ https://issues.apache.org/jira/browse/KAFKA-4714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171189#comment-16171189 ] Dhananjay Patkar commented on KAFKA-4714: - Thanks [~ewencp] for quick reply. I have a use case, wherein I need to map incoming raw message to standard format and persist, as well as persist raw message as is. Converting from raw message to standard format message is not always guaranteed, but I still need to persist raw message. I understand I can write multiple consumer to same topic, 1 consumer will persist raw messages as is and other will transform raw message into standard format and persist. Just wanted to know, is there way I can do it as a single consumer through SMT. ??The record is maintained internally, but it is not exposed to the connector?? Is there a possibility to expose raw message based on configuration during SMT? > Implement remaining KIP-66 SMTs > --- > > Key: KAFKA-4714 > URL: https://issues.apache.org/jira/browse/KAFKA-4714 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava > Fix For: 0.11.0.0 > > > Three didn't make it for the 0.10.2.0 release: Flatten, Cast, and > TimestampConverter. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-4714) Implement remaining KIP-66 SMTs
[ https://issues.apache.org/jira/browse/KAFKA-4714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171189#comment-16171189 ] Dhananjay Patkar edited comment on KAFKA-4714 at 9/19/17 6:43 AM: -- Thanks [~ewencp] for quick reply. I have a use case, wherein I need to map incoming raw message to standard format and persist, as well as persist raw message as is. Converting from raw message to standard format message is not always guaranteed, but I still need to persist raw message. I understand I can write multiple consumer to same topic, 1 consumer will persist raw messages as is and other will transform raw message into standard format and persist. Just wanted to know, is there way I can do it as a single consumer through SMT. ??The record is maintained internally, but it is not exposed to the connector?? Is there a possibility to expose raw message based on configuration of SMT? was (Author: dhananjaydp): Thanks [~ewencp] for quick reply. I have a use case, wherein I need to map incoming raw message to standard format and persist, as well as persist raw message as is. Converting from raw message to standard format message is not always guaranteed, but I still need to persist raw message. I understand I can write multiple consumer to same topic, 1 consumer will persist raw messages as is and other will transform raw message into standard format and persist. Just wanted to know, is there way I can do it as a single consumer through SMT. ??The record is maintained internally, but it is not exposed to the connector?? Is there a possibility to expose raw message based on configuration during SMT? > Implement remaining KIP-66 SMTs > --- > > Key: KAFKA-4714 > URL: https://issues.apache.org/jira/browse/KAFKA-4714 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava > Fix For: 0.11.0.0 > > > Three didn't make it for the 0.10.2.0 release: Flatten, Cast, and > TimestampConverter. -- This message was sent by Atlassian JIRA (v6.4.14#64029)