[jira] [Commented] (KAFKA-2170) 10 LogTest cases failed for file.renameTo failed under windows
[ https://issues.apache.org/jira/browse/KAFKA-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16463462#comment-16463462 ] Christoph Schmidt commented on KAFKA-2170: -- Adding link to KAFKA-1194, which appears to be the ancient mother of this issue. There's also some PRs over there :/ I'd love to see this (and all related issues) upgraded to Blocker criticality for recurring crashes and no viable workaround apart from unhelpful tips like switching OSs > 10 LogTest cases failed for file.renameTo failed under windows > --- > > Key: KAFKA-2170 > URL: https://issues.apache.org/jira/browse/KAFKA-2170 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.1.0 > Environment: Windows >Reporter: Honghai Chen >Assignee: Jay Kreps >Priority: Major > Labels: windows > > get latest code from trunk, then run test > gradlew -i core:test --tests kafka.log.LogTest > Got 10 cases failed for same reason: > kafka.common.KafkaStorageException: Failed to change the log file suffix from > to .deleted for log segment 0 > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:259) > at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:756) > at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:747) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514) > at scala.collection.immutable.List.foreach(List.scala:318) > at kafka.log.Log.deleteOldSegments(Log.scala:514) > at kafka.log.LogTest.testAsyncDelete(LogTest.scala:633) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:601) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:44) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:180) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:41) > at org.junit.runners.ParentRunner$1.evaluate(ParentRunner.java:173) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31) > at org.junit.runners.ParentRunner.run(ParentRunner.java:220) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48) > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:601) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at > org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) > at $Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105) > at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Me
[jira] [Commented] (KAFKA-5965) Remove Deprecated AdminClient from Streams Resetter Tool
[ https://issues.apache.org/jira/browse/KAFKA-5965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16463586#comment-16463586 ] ASF GitHub Bot commented on KAFKA-5965: --- fedosov-alexander opened a new pull request #4968: KAFKA-5965: Remove Deprecated AdminClient from Streams Resetter Tool URL: https://github.com/apache/kafka/pull/4968 Removed usage of deprecated AdminClient from StreamsResetter No additional tests are required. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove Deprecated AdminClient from Streams Resetter Tool > > > Key: KAFKA-5965 > URL: https://issues.apache.org/jira/browse/KAFKA-5965 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Bill Bejeck >Assignee: Alexander Fedosov >Priority: Major > Labels: newbie > > To break the dependency on using ZK, the {{StreamsResetter}} tool now uses > the {{KafkaAdminClient}} for deleting topics and the > {{kafka.admin.AdminClient}} for verfiying no consumer groups are active > before running. > Once the {{KafkaAdminClient}} has a describe group functionality, we should > remove the dependency on {{kafka.admin.AdminClient}} from the > {{StreamsResetter}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6832) Wrong start position in the log file on the leader, on fetch request.
[ https://issues.apache.org/jira/browse/KAFKA-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16463689#comment-16463689 ] Ciprian Pascu commented on KAFKA-6832: -- Hi, For us, the problem is easily reproducible e.g by killing all brokers, at short time one after another. In this case, broker-0 was not able to follow broker-2 due to the below situation (decoded using kafka.tools.DumpLogSegments): broker-0: *- baseOffset: 10712 lastOffset: 10712* baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: 0 *partitionLeaderEpoch: 0* isTransactional: false position: 3476142 CreateTime: 1525424946243 isvalid: true size: 316 magic: 2 compresscodec: NONE crc: 1743233761 *- baseOffset: 10713 lastOffset: 10713* baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: 0 *partitionLeaderEpoch: 0* isTransactional: false position: 3476458 CreateTime: 1525424947245 isvalid: true size: 316 magic: 2 compresscodec: NONE crc: 634274826 *- baseOffset: 10714 lastOffset: 10714* baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: 0 *partitionLeaderEpoch: 0* isTransactional: false position: 3476774 CreateTime: 1525424948248 isvalid: true size: 316 magic: 2 compresscodec: NONE crc: 4225911256 broker-2: *- baseOffset: 10712 lastOffset: 10712* baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: 0 *partitionLeaderEpoch: 0* isTransactional: false position: 3476142 Create Time: 1525424946243 isvalid: true size: 316 magic: 2 compresscodec: NONE crc: 1743233761 *- baseOffset: 10713 lastOffset: 10720* baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: 0 *partitionLeaderEpoch: 1* isTransactional: false position: 3476458 CreateTime: 1525424957261 isvalid: true size: 2108 magic: 2 compresscodec: NONE crc: 3444932391 *- baseOffset: 10721 lastOffset: 10721* baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: 0 *partitionLeaderEpoch: 1* isTransactional: false position: 3478566 CreateTime: 1525424958263 isvalid: true size: 316 magic: 2 compresscodec: NONE crc: 1708198842 Will KAFKA-6361 address this issue as well? Because, as we see, here the logs have diverged by several batches; broker-0 should go 2 batches backwards, in order to start following again. Ciprian. > Wrong start position in the log file on the leader, on fetch request. > - > > Key: KAFKA-6832 > URL: https://issues.apache.org/jira/browse/KAFKA-6832 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.0 >Reporter: Ciprian Pascu >Priority: Major > > Hi, > We have an environment with 3 Kafka brokers; after hard reboot all brokers > (by hard rebooting the VMs on which they are located), we experience drop in > the ISR, for the topics that have replication factor greater than 1; it is > caused by the death of some of the replica threads with the following > exception: > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: > *kafka.common.KafkaException: Error processing data for partition > __consumer_offsets-39 offset 308060* > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$ > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$ > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > scala.Option.foreach(Option.scala:257) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(Abs > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(Abs > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThrea > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.Abstract
[jira] [Commented] (KAFKA-6832) Wrong start position in the log file on the leader, on fetch request.
[ https://issues.apache.org/jira/browse/KAFKA-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16463725#comment-16463725 ] Ciprian Pascu commented on KAFKA-6832: -- Also, the content of leader-epoch-checkpoint file seems strange; for that same topic partition that had the problem described above, it is the same on all 3 brokers: 0 2 0 0 1 10056 On broker-2, shouldn't it contain the record '1 10713'? Kafka version we are using is 1.0.0. > Wrong start position in the log file on the leader, on fetch request. > - > > Key: KAFKA-6832 > URL: https://issues.apache.org/jira/browse/KAFKA-6832 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.0 >Reporter: Ciprian Pascu >Priority: Major > > Hi, > We have an environment with 3 Kafka brokers; after hard reboot all brokers > (by hard rebooting the VMs on which they are located), we experience drop in > the ISR, for the topics that have replication factor greater than 1; it is > caused by the death of some of the replica threads with the following > exception: > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: > *kafka.common.KafkaException: Error processing data for partition > __consumer_offsets-39 offset 308060* > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$ > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$ > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > scala.Option.foreach(Option.scala:257) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(Abs > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(Abs > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThrea > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: *Caused by: > java.lang.IllegalArgumentException: Out of order offsets found in > List(308059, 308060)* > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.log.Log$$anonfun$append$2.apply(Log.scala:683) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.log.Log$$anonfun$append$2.apply(Log.scala:624) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.log.Log.maybeHandleIOException(Log.scala:1679) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.log.Log.append(Log.scala:624) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.log.Log.appendAsFollower(Log.scala:607) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41) > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$ > Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: ... 13 more > > The replica requests for offset *308060, but it gets a message set containing > (**308059, 308060), which makes the replica thread crash, due to the above > exception. The reason why the leader sends a message set with a smal
[jira] [Created] (KAFKA-6862) test toolchain
ravi created KAFKA-6862: --- Summary: test toolchain Key: KAFKA-6862 URL: https://issues.apache.org/jira/browse/KAFKA-6862 Project: Kafka Issue Type: Test Components: build Reporter: ravi test toolchain -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6863) Kafka clients should try to use multiple DNS resolved IP addresses if the first one fails
Edoardo Comar created KAFKA-6863: Summary: Kafka clients should try to use multiple DNS resolved IP addresses if the first one fails Key: KAFKA-6863 URL: https://issues.apache.org/jira/browse/KAFKA-6863 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 1.1.0, 1.0.0 Reporter: Edoardo Comar Assignee: Edoardo Comar Fix For: 2.0.0 Currently Kafka clients resolve a symbolic hostname using {{new InetSocketAddress(String hostname, int port)}} which only picks one IP address even if the DNS has multiple records for the hostname, as it calls {{InetAddress.getAllByName(host)[0]}} For some environments where the hostnames are mapped by the DNS to multiple IPs, e.g. in clouds where the IPs point to the external load balancers, it would be preferable that the client, on failing to connect to one of the IPs, would try the other ones before giving up the connection. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6863) Kafka clients should try to use multiple DNS resolved IP addresses if the first one fails
[ https://issues.apache.org/jira/browse/KAFKA-6863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16463883#comment-16463883 ] Ismael Juma commented on KAFKA-6863: Yeah, this has come up before [https://github.com/apache/kafka/pull/508,] but I didn't have time to follow up. > Kafka clients should try to use multiple DNS resolved IP addresses if the > first one fails > - > > Key: KAFKA-6863 > URL: https://issues.apache.org/jira/browse/KAFKA-6863 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 1.0.0, 1.1.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Fix For: 2.0.0 > > > Currently Kafka clients resolve a symbolic hostname using > {{new InetSocketAddress(String hostname, int port)}} > which only picks one IP address even if the DNS has multiple records for the > hostname, as it calls > {{InetAddress.getAllByName(host)[0]}} > For some environments where the hostnames are mapped by the DNS to multiple > IPs, e.g. in clouds where the IPs point to the external load balancers, it > would be preferable that the client, on failing to connect to one of the IPs, > would try the other ones before giving up the connection. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6864) null is not passed to keyDeserialzer and valueDeserializer in Fetcher
Mathias Kub created KAFKA-6864: -- Summary: null is not passed to keyDeserialzer and valueDeserializer in Fetcher Key: KAFKA-6864 URL: https://issues.apache.org/jira/browse/KAFKA-6864 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 1.1.0 Reporter: Mathias Kub Hi, currently null values are not passed to the keyDeserializer and valueDeserializer in the Fetcher class. This prevents custom deserialization of null values. The Deserializer JavaDoc says {code:java} /** * Deserialize a record value from a byte array into a value or object. * @param topic topic associated with the data * @param data serialized bytes; may be null; implementations are recommended to handle null by returning a value or null rather than throwing an exception. * @return deserialized typed data; may be null */ T deserialize(String topic, byte[] data);{code} so as for my understanding, null values should be passed to it. Thanks, Mathias -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6864) null is not passed to keyDeserialzer and valueDeserializer in Fetcher
[ https://issues.apache.org/jira/browse/KAFKA-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16463952#comment-16463952 ] ASF GitHub Bot commented on KAFKA-6864: --- makubi opened a new pull request #4969: KAFKA-6864: Pass null key and value to deserializer URL: https://github.com/apache/kafka/pull/4969 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > null is not passed to keyDeserialzer and valueDeserializer in Fetcher > - > > Key: KAFKA-6864 > URL: https://issues.apache.org/jira/browse/KAFKA-6864 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.1.0 >Reporter: Mathias Kub >Priority: Major > > Hi, > currently null values are not passed to the keyDeserializer and > valueDeserializer in the Fetcher class. This prevents custom deserialization > of null values. > The Deserializer JavaDoc says > {code:java} > /** > * Deserialize a record value from a byte array into a value or object. > * @param topic topic associated with the data > * @param data serialized bytes; may be null; implementations are recommended > to handle null by returning a value or null rather than throwing an exception. > * @return deserialized typed data; may be null > */ > T deserialize(String topic, byte[] data);{code} > so as for my understanding, null values should be passed to it. > > Thanks, > Mathias -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6857) LeaderEpochFileCache.endOffsetFor() should check for UNDEFINED_EPOCH explicitly
[ https://issues.apache.org/jira/browse/KAFKA-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6857. Resolution: Fixed Fix Version/s: 1.1.1 1.0.2 2.0.0 > LeaderEpochFileCache.endOffsetFor() should check for UNDEFINED_EPOCH > explicitly > --- > > Key: KAFKA-6857 > URL: https://issues.apache.org/jira/browse/KAFKA-6857 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.11.0.0 >Reporter: Jun Rao >Assignee: Anna Povzner >Priority: Major > Fix For: 2.0.0, 1.0.2, 1.1.1 > > > In LeaderEpochFileCache.endOffsetFor() , we have the following code. > > > {code:java} > if (requestedEpoch == latestEpoch) { > leo().messageOffset > {code} > > In the case when the requestedEpoch is UNDEFINED_EPOCH and latestEpoch is > also UNDEFINED_EPOCH, we return leo. This will cause the follower to truncate > to a wrong offset. If requestedEpoch is UNDEFINED_EPOCH, we need to request > UNDEFINED_EPOCH_OFFSET. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint
[ https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464036#comment-16464036 ] ASF GitHub Bot commented on KAFKA-6761: --- guozhangwang closed pull request #4923: KAFKA-6761: Part 1 of 3; Graph nodes URL: https://github.com/apache/kafka/pull/4923 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java new file mode 100644 index 000..899ee718e28 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +/** + * Utility base class containing the common fields between + * a Stream-Stream join and a Table-Table join + */ +abstract class BaseJoinProcessorNode extends StreamsGraphNode { + +private final ProcessorSupplier joinThisProcessSupplier; +private final ProcessorSupplier joinOtherProcessSupplier; +private final ProcessorSupplier joinMergeProcessor; +private final ValueJoiner valueJoiner; +private final String joinThisProcessorName; +private final String joinOtherProcessorName; +private final String joinMergeProcessorName; +private final String thisJoinSide; +private final String otherJoinSide; + + +BaseJoinProcessorNode(final String parentProcessorNodeName, + final String processorNodeName, + final ValueJoiner valueJoiner, + final ProcessorParameters joinThisProcessorDetails, + final ProcessorParameters joinOtherProcessDetails, + final ProcessorParameters joinMergeProcessorDetails, + final String thisJoinSide, + final String otherJoinSide) { + +super(parentProcessorNodeName, + processorNodeName, + false); + +this.valueJoiner = valueJoiner; +this.joinThisProcessSupplier = joinThisProcessorDetails.processorSupplier(); +this.joinOtherProcessSupplier = joinOtherProcessDetails.processorSupplier(); +this.joinMergeProcessor = joinMergeProcessorDetails.processorSupplier(); +this.joinThisProcessorName = joinThisProcessorDetails.processorName(); +this.joinOtherProcessorName = joinOtherProcessDetails.processorName(); +this.joinMergeProcessorName = joinMergeProcessorDetails.processorName(); +this.thisJoinSide = thisJoinSide; +this.otherJoinSide = otherJoinSide; +} + +ProcessorSupplier joinThisProcessorSupplier() { +return joinThisProcessSupplier; +} + +ProcessorSupplier joinOtherProcessorSupplier() { +return joinOtherProcessSupplier; +} + +ProcessorSupplier joinMergeProcessor() { +return joinMergeProcessor; +} + +ValueJoiner valueJoiner() { +return valueJoiner; +} + +String joinThisProcessorName() { +return joinThisProcessorName; +} + +String joinOtherProcessorName() { +return joinOtherProcessorName; +} + +String joinMergeProcessorName() { +return joinMergeProcessorName; +} + +String thisJoinSide() { +return thisJoinSide; +} + +String otherJoinSide() { +return otherJoinSide; +} +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinNode.java new file mode 100644 index 000..f76aa0d64d2 --- /dev/null +++ b/stream
[jira] [Commented] (KAFKA-6863) Kafka clients should try to use multiple DNS resolved IP addresses if the first one fails
[ https://issues.apache.org/jira/browse/KAFKA-6863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464049#comment-16464049 ] Edoardo Comar commented on KAFKA-6863: -- Thanks for looking [~ijuma] we're coming up with a PR soon based on current trunk (which we're testing against some IPs whose authoritative DNS I can edit at will :) . We aren't changing any external interfaces or configurations, so this may not need a KIP. Though it may be useful to mention the Java dns ttl in Kafka somewhere https://issues.apache.org/jira/browse/KAFKA-6843?filter=12343790 cc [~mimaison] > Kafka clients should try to use multiple DNS resolved IP addresses if the > first one fails > - > > Key: KAFKA-6863 > URL: https://issues.apache.org/jira/browse/KAFKA-6863 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 1.0.0, 1.1.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Fix For: 2.0.0 > > > Currently Kafka clients resolve a symbolic hostname using > {{new InetSocketAddress(String hostname, int port)}} > which only picks one IP address even if the DNS has multiple records for the > hostname, as it calls > {{InetAddress.getAllByName(host)[0]}} > For some environments where the hostnames are mapped by the DNS to multiple > IPs, e.g. in clouds where the IPs point to the external load balancers, it > would be preferable that the client, on failing to connect to one of the IPs, > would try the other ones before giving up the connection. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-5697) StreamThread.shutdown() need to interrupt the stream threads to break the loop
[ https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-5697. -- Resolution: Fixed Fix Version/s: 2.0.0 Issue resolved by pull request 4930 [https://github.com/apache/kafka/pull/4930] > StreamThread.shutdown() need to interrupt the stream threads to break the loop > -- > > Key: KAFKA-5697 > URL: https://issues.apache.org/jira/browse/KAFKA-5697 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: John Roesler >Priority: Major > Labels: newbie > Fix For: 2.0.0 > > > In {{StreamThread.shutdown()}} we currently do nothing but set the state, > hoping the stream thread may eventually check it and shutdown itself. > However, under certain scenarios the thread may get blocked within a single > loop and hence will never check on this state enum. For example, it's > {{consumer.poll}} call trigger {{ensureCoordinatorReady()}} which will block > until the coordinator can be found. If the coordinator broker is never up and > running then the Stream instance will be blocked forever. > A simple way to produce this issue is to start the work count demo without > starting the ZK / Kafka broker, and then it will get stuck in a single loop > and even `ctrl-C` will not stop it since its set state will never be read by > the thread: > {code:java} > [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5697) StreamThread.shutdown() need to interrupt the stream threads to break the loop
[ https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464072#comment-16464072 ] ASF GitHub Bot commented on KAFKA-5697: --- guozhangwang closed pull request #4930: KAFKA-5697: issue Consumer#wakeup during Streams shutdown URL: https://github.com/apache/kafka/pull/4930 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 66a8934d283..2c409d1015d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -67,6 +67,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -382,7 +383,13 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState * @return Map of all metrics. */ public Map metrics() { -return Collections.unmodifiableMap(metrics.metrics()); +final Map result = new LinkedHashMap<>(); +for (final StreamThread thread : threads) { +result.putAll(thread.consumerMetrics()); +} +if (globalStreamThread != null) result.putAll(globalStreamThread.consumerMetrics()); +result.putAll(metrics.metrics()); +return Collections.unmodifiableMap(result); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java b/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java new file mode 100644 index 000..d404642793c --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public class ShutdownException extends StreamsException { +public ShutdownException(final String message) { +super(message); +} + +public ShutdownException(final String message, final Throwable throwable) { +super(message, throwable); +} + +public ShutdownException(final Throwable throwable) { +super(throwable); +} +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java new file mode 100644 index 000..8b912579b9a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; + +import java.util.Collections; +import java.util.List; + +public final class Consume
[jira] [Commented] (KAFKA-6863) Kafka clients should try to use multiple DNS resolved IP addresses if the first one fails
[ https://issues.apache.org/jira/browse/KAFKA-6863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464080#comment-16464080 ] Manikumar commented on KAFKA-6863: -- Is this not related to KIP-235 / KAFKA-6195? https://cwiki.apache.org/confluence/display/KAFKA/KIP-235%3A+Add+DNS+alias+support+for+secured+connection > Kafka clients should try to use multiple DNS resolved IP addresses if the > first one fails > - > > Key: KAFKA-6863 > URL: https://issues.apache.org/jira/browse/KAFKA-6863 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 1.0.0, 1.1.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Fix For: 2.0.0 > > > Currently Kafka clients resolve a symbolic hostname using > {{new InetSocketAddress(String hostname, int port)}} > which only picks one IP address even if the DNS has multiple records for the > hostname, as it calls > {{InetAddress.getAllByName(host)[0]}} > For some environments where the hostnames are mapped by the DNS to multiple > IPs, e.g. in clouds where the IPs point to the external load balancers, it > would be preferable that the client, on failing to connect to one of the IPs, > would try the other ones before giving up the connection. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6861) Missing ')' in Kafka Streams documentation
[ https://issues.apache.org/jira/browse/KAFKA-6861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464140#comment-16464140 ] ASF GitHub Bot commented on KAFKA-6861: --- guozhangwang closed pull request #139: [KAFKA-6861] Missing parentheses in Kafka Streams documentation URL: https://github.com/apache/kafka-site/pull/139 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/10/tutorial.html b/10/tutorial.html index 7d75ed6..e7b5ea4 100644 --- a/10/tutorial.html +++ b/10/tutorial.html @@ -530,7 +530,7 @@ Writing a th .groupBy((key, value) -> value) .count(Materialized.>as("counts-store")) .toStream() - .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()); + .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Missing ')' in Kafka Streams documentation > -- > > Key: KAFKA-6861 > URL: https://issues.apache.org/jira/browse/KAFKA-6861 > Project: Kafka > Issue Type: Bug >Reporter: Ravi Raj Singh >Priority: Trivial > > Page: [https://kafka.apache.org/11/documentation/streams/tutorial] > Code snippet: > {{KStream source = > builder.stream(}}{{"streams-plaintext-input"}}{{);}} > {{source.flatMapValues(value -> > Arrays.asList(value.toLowerCase(Locale.getDefault()).split(}}{{"\\W+"}}{{)))}} > {{ }}{{.groupBy((key, value) -> value)}} > {{ }}{{.count(Materialized. }}{{byte}}{{[]>>as(}}{{"counts-store"}}{{))}} > {{ }}{{.toStream()}} > {{ }}{{.to(}}{{"streams-wordcount-output"}}{{, > Produced.with(Serdes.String(), Serdes.Long{color:#d04437}());{color}}} > {{Should have an additional closing parentheses after Serdes.Long.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6865) Devops Insights
ravi created KAFKA-6865: --- Summary: Devops Insights Key: KAFKA-6865 URL: https://issues.apache.org/jira/browse/KAFKA-6865 Project: Kafka Issue Type: New Feature Reporter: ravi Devops Insights -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6861) Missing ')' in Kafka Streams documentation
[ https://issues.apache.org/jira/browse/KAFKA-6861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-6861. -- Resolution: Fixed > Missing ')' in Kafka Streams documentation > -- > > Key: KAFKA-6861 > URL: https://issues.apache.org/jira/browse/KAFKA-6861 > Project: Kafka > Issue Type: Bug >Reporter: Ravi Raj Singh >Priority: Trivial > Fix For: 1.0.0 > > > Page: [https://kafka.apache.org/11/documentation/streams/tutorial] > Code snippet: > {{KStream source = > builder.stream(}}{{"streams-plaintext-input"}}{{);}} > {{source.flatMapValues(value -> > Arrays.asList(value.toLowerCase(Locale.getDefault()).split(}}{{"\\W+"}}{{)))}} > {{ }}{{.groupBy((key, value) -> value)}} > {{ }}{{.count(Materialized. }}{{byte}}{{[]>>as(}}{{"counts-store"}}{{))}} > {{ }}{{.toStream()}} > {{ }}{{.to(}}{{"streams-wordcount-output"}}{{, > Produced.with(Serdes.String(), Serdes.Long{color:#d04437}());{color}}} > {{Should have an additional closing parentheses after Serdes.Long.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6861) Missing ')' in Kafka Streams documentation
[ https://issues.apache.org/jira/browse/KAFKA-6861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6861: - Fix Version/s: 1.0.0 > Missing ')' in Kafka Streams documentation > -- > > Key: KAFKA-6861 > URL: https://issues.apache.org/jira/browse/KAFKA-6861 > Project: Kafka > Issue Type: Bug >Reporter: Ravi Raj Singh >Priority: Trivial > Fix For: 1.0.0 > > > Page: [https://kafka.apache.org/11/documentation/streams/tutorial] > Code snippet: > {{KStream source = > builder.stream(}}{{"streams-plaintext-input"}}{{);}} > {{source.flatMapValues(value -> > Arrays.asList(value.toLowerCase(Locale.getDefault()).split(}}{{"\\W+"}}{{)))}} > {{ }}{{.groupBy((key, value) -> value)}} > {{ }}{{.count(Materialized. }}{{byte}}{{[]>>as(}}{{"counts-store"}}{{))}} > {{ }}{{.toStream()}} > {{ }}{{.to(}}{{"streams-wordcount-output"}}{{, > Produced.with(Serdes.String(), Serdes.Long{color:#d04437}());{color}}} > {{Should have an additional closing parentheses after Serdes.Long.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6866) Increase memory
ravi created KAFKA-6866: --- Summary: Increase memory Key: KAFKA-6866 URL: https://issues.apache.org/jira/browse/KAFKA-6866 Project: Kafka Issue Type: Improvement Reporter: ravi Increase memory -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6861) Missing ')' in Kafka Streams documentation
[ https://issues.apache.org/jira/browse/KAFKA-6861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6861: --- Component/s: streams documentation > Missing ')' in Kafka Streams documentation > -- > > Key: KAFKA-6861 > URL: https://issues.apache.org/jira/browse/KAFKA-6861 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Reporter: Ravi Raj Singh >Priority: Trivial > Fix For: 1.0.0 > > > Page: [https://kafka.apache.org/11/documentation/streams/tutorial] > Code snippet: > {{KStream source = > builder.stream(}}{{"streams-plaintext-input"}}{{);}} > {{source.flatMapValues(value -> > Arrays.asList(value.toLowerCase(Locale.getDefault()).split(}}{{"\\W+"}}{{)))}} > {{ }}{{.groupBy((key, value) -> value)}} > {{ }}{{.count(Materialized. }}{{byte}}{{[]>>as(}}{{"counts-store"}}{{))}} > {{ }}{{.toStream()}} > {{ }}{{.to(}}{{"streams-wordcount-output"}}{{, > Produced.with(Serdes.String(), Serdes.Long{color:#d04437}());{color}}} > {{Should have an additional closing parentheses after Serdes.Long.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6840) support windowing in ktable API
[ https://issues.apache.org/jira/browse/KAFKA-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464281#comment-16464281 ] Matthias J. Sax commented on KAFKA-6840: Actually, {{Stores#persistenWindowStore}} does exist: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/Stores.java#L72-L77] Thus, you should be able to pass in a windowed store into {{StreamsBuilder#table()}} using {{Materialized.as(WindowBytesStoreSupplier)}}? > support windowing in ktable API > --- > > Key: KAFKA-6840 > URL: https://issues.apache.org/jira/browse/KAFKA-6840 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Labels: api, needs-kip > > The StreamsBuilder provides table() API to materialize a changelog topic into > a local key-value store (KTable), which is very convenient. However, current > underlying implementation does not support materializing one topic to a > windowed key-value store, which in certain cases would be very useful. > To make up the gap, we proposed a new API in StreamsBuilder that could get a > windowed Ktable. > The table() API in StreamsBuilder looks like this: > public synchronized KTable table(final String topic, > final Consumed > consumed, > final Materialized KeyValueStore> materialized) { > Objects.requireNonNull(topic, "topic can't be null"); > Objects.requireNonNull(consumed, "consumed can't be null"); > Objects.requireNonNull(materialized, "materialized can't be null"); > > materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde); > return internalStreamsBuilder.table(topic, > new ConsumedInternal<>(consumed), > new > MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-")); > } > > Where we could see that the store type is given as KeyValueStore. There is no > flexibility to change it to WindowStore. > > To maintain compatibility of the existing API, we have two options to define > a new API: > 1.Overload existing KTable struct > public synchronized KTable, V> windowedTable(final String > topic, > final Consumed > consumed, > final Materialized WindowStore> materialized); > > This could give developer an alternative to use windowed table instead. > However, this implies that we need to make sure all the KTable logic still > works as expected, such as join, aggregation, etc, so the challenge would be > making sure all current KTable logics work. > > 2.Define a new type called WindowedKTable > public synchronized WindowedKTable windowedTable(final String > topic, > final Consumed > consumed, > final Materialized WindowStore> materialized); > The benefit of doing this is that we don’t need to worry about the existing > functionality of KTable. However, the cost is to introduce redundancy of > common operation logic. When upgrading common functionality, we need to take > care of both types. > We could fill in more details in the KIP. Right now I would like to hear some > feedbacks on the two approaches, thank you! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6866) Increase memory
[ https://issues.apache.org/jira/browse/KAFKA-6866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-6866. Resolution: Incomplete > Increase memory > --- > > Key: KAFKA-6866 > URL: https://issues.apache.org/jira/browse/KAFKA-6866 > Project: Kafka > Issue Type: Improvement >Reporter: ravi >Priority: Major > > Increase memory -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6865) Devops Insights
[ https://issues.apache.org/jira/browse/KAFKA-6865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-6865. Resolution: Incomplete > Devops Insights > --- > > Key: KAFKA-6865 > URL: https://issues.apache.org/jira/browse/KAFKA-6865 > Project: Kafka > Issue Type: New Feature >Reporter: ravi >Priority: Major > Original Estimate: 4h > Remaining Estimate: 4h > > Devops Insights -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6730) Simplify state store recovery
[ https://issues.apache.org/jira/browse/KAFKA-6730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464298#comment-16464298 ] Ted Yu commented on KAFKA-6730: --- [~mjsax]: Can you take a look at the PR ? > Simplify state store recovery > - > > Key: KAFKA-6730 > URL: https://issues.apache.org/jira/browse/KAFKA-6730 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Richard Yu >Priority: Major > Fix For: 2.0.0 > > > In the current code base, we restore state stores in the main thread (in > contrast to older code that did restore state stored in the rebalance call > back). This has multiple advantages and allows us the further simplify > restore code. > In the original code base, during a long restore phase, it was possible that > a instance misses a rebalance and drops out of the consumer group. To detect > this case, we apply a check during the restore phase, that the end-offset of > the changelog topic does not change. A changed offset implies a missed > rebalance as another thread started to write into the changelog topic (ie, > the current thread does not own the task/store/changelog-topic anymore). > With the new code, that restores in the main-loop, it's ensured that `poll()` > is called regularly and thus, a rebalance will be detected automatically. > This make the check about an changing changelog-end-offset unnecessary. > We can simplify the restore logic, to just consuming until `poll()` does not > return any data. For this case, we fetch the end-offset to see if we did > fully restore. If yes, we resume processing, if not, we continue the restore. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6840) support windowing in ktable API
[ https://issues.apache.org/jira/browse/KAFKA-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464310#comment-16464310 ] Boyang Chen commented on KAFKA-6840: Yep, forget to update. One more thing is that: public KTable table(final AutoOffsetReset offsetReset, final String topic, final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier); could not overload by replacing KeyValueStore with WindowStore since it will hit `Method has the same erasure` error. Do you have an idea about that? [~mjsax] > support windowing in ktable API > --- > > Key: KAFKA-6840 > URL: https://issues.apache.org/jira/browse/KAFKA-6840 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Labels: api, needs-kip > > The StreamsBuilder provides table() API to materialize a changelog topic into > a local key-value store (KTable), which is very convenient. However, current > underlying implementation does not support materializing one topic to a > windowed key-value store, which in certain cases would be very useful. > To make up the gap, we proposed a new API in StreamsBuilder that could get a > windowed Ktable. > The table() API in StreamsBuilder looks like this: > public synchronized KTable table(final String topic, > final Consumed > consumed, > final Materialized KeyValueStore> materialized) { > Objects.requireNonNull(topic, "topic can't be null"); > Objects.requireNonNull(consumed, "consumed can't be null"); > Objects.requireNonNull(materialized, "materialized can't be null"); > > materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde); > return internalStreamsBuilder.table(topic, > new ConsumedInternal<>(consumed), > new > MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-")); > } > > Where we could see that the store type is given as KeyValueStore. There is no > flexibility to change it to WindowStore. > > To maintain compatibility of the existing API, we have two options to define > a new API: > 1.Overload existing KTable struct > public synchronized KTable, V> windowedTable(final String > topic, > final Consumed > consumed, > final Materialized WindowStore> materialized); > > This could give developer an alternative to use windowed table instead. > However, this implies that we need to make sure all the KTable logic still > works as expected, such as join, aggregation, etc, so the challenge would be > making sure all current KTable logics work. > > 2.Define a new type called WindowedKTable > public synchronized WindowedKTable windowedTable(final String > topic, > final Consumed > consumed, > final Materialized WindowStore> materialized); > The benefit of doing this is that we don’t need to worry about the existing > functionality of KTable. However, the cost is to introduce redundancy of > common operation logic. When upgrading common functionality, we need to take > care of both types. > We could fill in more details in the KIP. Right now I would like to hear some > feedbacks on the two approaches, thank you! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6867) Typos in upgrade.html
Jakob Homan created KAFKA-6867: -- Summary: Typos in upgrade.html Key: KAFKA-6867 URL: https://issues.apache.org/jira/browse/KAFKA-6867 Project: Kafka Issue Type: Improvement Components: documentation Reporter: Jakob Homan A couple typos to fix in the upgrade.html file. {noformat}C02Q7N23G8WP 13:55 $ ag swaping docs/upgrade.html 64:Hot-swaping the jar-file only might not work. 132:Hot-swaping the jar-file only might not work.{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6867) Typos in upgrade.html
[ https://issues.apache.org/jira/browse/KAFKA-6867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Surabhi Dixit reassigned KAFKA-6867: Assignee: Surabhi Dixit > Typos in upgrade.html > - > > Key: KAFKA-6867 > URL: https://issues.apache.org/jira/browse/KAFKA-6867 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: Jakob Homan >Assignee: Surabhi Dixit >Priority: Trivial > Labels: newbie > > A couple typos to fix in the upgrade.html file. > {noformat}C02Q7N23G8WP 13:55 $ ag swaping docs/upgrade.html > 64:Hot-swaping the jar-file only might not work. > 132:Hot-swaping the jar-file only might not work.{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6867) Typos in upgrade.html
[ https://issues.apache.org/jira/browse/KAFKA-6867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464431#comment-16464431 ] ASF GitHub Bot commented on KAFKA-6867: --- surabhidixit opened a new pull request #4970: KAFKA-6867: corrected the typos in upgrade.html URL: https://github.com/apache/kafka/pull/4970 Ran tests, documentation change, so no expected behavioral changes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Typos in upgrade.html > - > > Key: KAFKA-6867 > URL: https://issues.apache.org/jira/browse/KAFKA-6867 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: Jakob Homan >Assignee: Surabhi Dixit >Priority: Trivial > Labels: newbie > > A couple typos to fix in the upgrade.html file. > {noformat}C02Q7N23G8WP 13:55 $ ag swaping docs/upgrade.html > 64:Hot-swaping the jar-file only might not work. > 132:Hot-swaping the jar-file only might not work.{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6844) Race condition between StreamThread and GlobalStreamThread stopping
[ https://issues.apache.org/jira/browse/KAFKA-6844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464450#comment-16464450 ] ASF GitHub Bot commented on KAFKA-6844: --- guozhangwang closed pull request #4950: KAFKA-6844: Call shutdown on GlobalStreamThread after all StreamThreads have stopped URL: https://github.com/apache/kafka/pull/4950 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index eed12f15b4f..56d031b746e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -876,10 +876,6 @@ public void run() { thread.setStateListener(null); thread.shutdown(); } -if (globalStreamThread != null) { -globalStreamThread.setStateListener(null); -globalStreamThread.shutdown(); -} for (final StreamThread thread : threads) { try { @@ -890,6 +886,12 @@ public void run() { Thread.currentThread().interrupt(); } } + +if (globalStreamThread != null) { +globalStreamThread.setStateListener(null); +globalStreamThread.shutdown(); +} + if (globalStreamThread != null && !globalStreamThread.stillRunning()) { try { globalStreamThread.join(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java new file mode 100644 index 000..c7b63ad0b61 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.Consumed; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import kafka.utils.MockTime; + +import static org.junit.Ass
[jira] [Resolved] (KAFKA-6867) Typos in upgrade.html
[ https://issues.apache.org/jira/browse/KAFKA-6867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6867. Resolution: Fixed > Typos in upgrade.html > - > > Key: KAFKA-6867 > URL: https://issues.apache.org/jira/browse/KAFKA-6867 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: Jakob Homan >Assignee: Surabhi Dixit >Priority: Trivial > Labels: newbie > > A couple typos to fix in the upgrade.html file. > {noformat}C02Q7N23G8WP 13:55 $ ag swaping docs/upgrade.html > 64:Hot-swaping the jar-file only might not work. > 132:Hot-swaping the jar-file only might not work.{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6867) Typos in upgrade.html
[ https://issues.apache.org/jira/browse/KAFKA-6867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464474#comment-16464474 ] ASF GitHub Bot commented on KAFKA-6867: --- hachikuji closed pull request #4970: KAFKA-6867: corrected the typos in upgrade.html URL: https://github.com/apache/kafka/pull/4970 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/upgrade.html b/docs/upgrade.html index 4fe7e20794e..8bfc61ef480 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -61,7 +61,7 @@ Upgrading from 0.8.x, 0.9.x, 0.1 Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after. Similarly for the message format version. If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguties. -Hot-swaping the jar-file only might not work. +Hot-swapping the jar-file only might not work. Notable changes in 1.2.0 @@ -129,7 +129,7 @@ Upgrading from 0.8.x, 0.9.x, 0.1 Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after. Similarly for the message format version. If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguties. -Hot-swaping the jar-file only might not work. +Hot-swapping the jar-file only might not work. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Typos in upgrade.html > - > > Key: KAFKA-6867 > URL: https://issues.apache.org/jira/browse/KAFKA-6867 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: Jakob Homan >Assignee: Surabhi Dixit >Priority: Trivial > Labels: newbie > > A couple typos to fix in the upgrade.html file. > {noformat}C02Q7N23G8WP 13:55 $ ag swaping docs/upgrade.html > 64:Hot-swaping the jar-file only might not work. > 132:Hot-swaping the jar-file only might not work.{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6795) Add unit test for ReplicaAlterLogDirsThread
[ https://issues.apache.org/jira/browse/KAFKA-6795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anna Povzner resolved KAFKA-6795. - Resolution: Fixed Fix Version/s: 2.0.0 > Add unit test for ReplicaAlterLogDirsThread > --- > > Key: KAFKA-6795 > URL: https://issues.apache.org/jira/browse/KAFKA-6795 > Project: Kafka > Issue Type: Improvement >Reporter: Anna Povzner >Assignee: Anna Povzner >Priority: Major > Fix For: 2.0.0 > > > ReplicaAlterLogDirsThread was added as part of KIP-113 implementation, but > there is no unit test. > [~lindong] I assigned this to myself, since ideally I wanted to add unit > tests for KAFKA-6361 related changes (KIP-279), but feel free to re-assign. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6868) BufferUnderflowException in client when querying consumer group information
Xavier Léauté created KAFKA-6868: Summary: BufferUnderflowException in client when querying consumer group information Key: KAFKA-6868 URL: https://issues.apache.org/jira/browse/KAFKA-6868 Project: Kafka Issue Type: Bug Affects Versions: 2.0.0 Reporter: Xavier Léauté Exceptions get thrown when describing consumer group or querying group offsets. {code} org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76) at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105) at org.apache.kafka.clients.admin.KafkaAdminClient$21$1.handleResponse(KafkaAdminClient.java:2307) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:960) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1045) at java.lang.Thread.run(Thread.java:748) {code} {code} java.util.concurrent.ExecutionException: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274) at [snip] Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76) at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105) at org.apache.kafka.clients.admin.KafkaAdminClient$21$1.handleResponse(KafkaAdminClient.java:2307) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:960) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1045) ... 1 more {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6868) BufferUnderflowException in client when querying consumer group information
[ https://issues.apache.org/jira/browse/KAFKA-6868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xavier Léauté updated KAFKA-6868: - Description: Exceptions get thrown when describing consumer group or querying group offsets from a 1.0~ish cluster {code} org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76) at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105) at org.apache.kafka.clients.admin.KafkaAdminClient$21$1.handleResponse(KafkaAdminClient.java:2307) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:960) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1045) at java.lang.Thread.run(Thread.java:748) {code} {code} java.util.concurrent.ExecutionException: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274) at [snip] Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76) at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105) at org.apache.kafka.clients.admin.KafkaAdminClient$21$1.handleResponse(KafkaAdminClient.java:2307) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:960) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1045) ... 1 more {code} was: Exceptions get thrown when describing consumer group or querying group offsets. {code} org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76) at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105) at org.apache.kafka.clients.admin.KafkaAdminClient$21$1.handleResponse(KafkaAdminClient.java:2307) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:960) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1045) at java.lang.Thread.run(Thread.java:748) {code} {code} java.util.concurrent.ExecutionException: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274) at [snip] Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76) at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105) at org.apache.kafka.clients.admin.KafkaAdminClient$21$1.handleResponse(KafkaAdminClient.java:2307) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:960) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1045) ... 1 more {code} > BufferUnderflowException in client when querying consumer group information > --- > > Key: KAFKA-6868 > URL: https://issues.apache.org/jira/browse/KAFKA-6868 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.0 >Reporter: Xavier Léauté >Priority: Major > > Exceptions get thrown when describing consumer group or querying group > offsets from a 1.0~ish cluster > {code} > org.apache.kafka.comm
[jira] [Updated] (KAFKA-6868) BufferUnderflowException in client when querying consumer group information
[ https://issues.apache.org/jira/browse/KAFKA-6868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xavier Léauté updated KAFKA-6868: - Description: Exceptions get thrown when describing consumer group or querying group offsets from a 1.0 cluster Stacktrace is a result of calling {{AdminClient.describeConsumerGroups(Collection groupIds).describedGroups().entrySet()}} followed by {{KafkaFuture.whenComplete()}} {code} org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76) at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105) at org.apache.kafka.clients.admin.KafkaAdminClient$21$1.handleResponse(KafkaAdminClient.java:2307) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:960) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1045) at java.lang.Thread.run(Thread.java:748) {code} {code} java.util.concurrent.ExecutionException: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274) at [snip] Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76) at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105) at org.apache.kafka.clients.admin.KafkaAdminClient$21$1.handleResponse(KafkaAdminClient.java:2307) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:960) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1045) ... 1 more {code} was: Exceptions get thrown when describing consumer group or querying group offsets from a 1.0~ish cluster {code} org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76) at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105) at org.apache.kafka.clients.admin.KafkaAdminClient$21$1.handleResponse(KafkaAdminClient.java:2307) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:960) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1045) at java.lang.Thread.run(Thread.java:748) {code} {code} java.util.concurrent.ExecutionException: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274) at [snip] Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76) at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105) at org.apache.kafka.clients.admin.KafkaAdminClient$21$1.handleResponse(KafkaAdminClient.java:2307) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:960) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1045) ... 1 more {code} > BufferUnderflowException in client when querying consumer group information > --- > > Key: KAFKA-6868 > URL: https://issues.apache.org/jira/browse/KAFKA-6868 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.0 >Repor
[jira] [Updated] (KAFKA-6868) BufferUnderflowException in client when querying consumer group information
[ https://issues.apache.org/jira/browse/KAFKA-6868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xavier Léauté updated KAFKA-6868: - Description: Exceptions get thrown when describing consumer group or querying group offsets from a 1.0 cluster Stacktrace is a result of calling {{AdminClient.describeConsumerGroups(Collection groupIds).describedGroups().entrySet()}} followed by {{KafkaFuture.whenComplete()}} {code} java.util.concurrent.ExecutionException: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274) at [snip] Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76) at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105) at org.apache.kafka.clients.admin.KafkaAdminClient$21$1.handleResponse(KafkaAdminClient.java:2307) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:960) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1045) ... 1 more {code} was: Exceptions get thrown when describing consumer group or querying group offsets from a 1.0 cluster Stacktrace is a result of calling {{AdminClient.describeConsumerGroups(Collection groupIds).describedGroups().entrySet()}} followed by {{KafkaFuture.whenComplete()}} {code} org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76) at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105) at org.apache.kafka.clients.admin.KafkaAdminClient$21$1.handleResponse(KafkaAdminClient.java:2307) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:960) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1045) at java.lang.Thread.run(Thread.java:748) {code} {code} java.util.concurrent.ExecutionException: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274) at [snip] Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76) at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105) at org.apache.kafka.clients.admin.KafkaAdminClient$21$1.handleResponse(KafkaAdminClient.java:2307) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:960) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1045) ... 1 more {code} > BufferUnderflowException in client when querying consumer group information > --- > > Key: KAFKA-6868 > URL: https://issues.apache.org/jira/browse/KAFKA-6868 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.0 >Reporter: Xavier Léauté >Priority: Major > > Exceptions get thrown when describing consumer group or querying group > offsets from a 1.0 cluster > Stacktrace is a result of calling > {{AdminClient.describeConsumerGroups(Collection > groupIds).describedGroups().entrySet()}} followed by > {{KafkaFuture.whenComplete()}} > {code} > java.util.concurrent.ExecutionException: > org.apache.kafka.common.protocol.types.SchemaException: Error reading field > 'version': java.nio.BufferUnderflowException > at > org.apache.kafka.common.internals.KafkaF