[jira] [Commented] (KAFKA-10245) Using vulnerable log4j version
[ https://issues.apache.org/jira/browse/KAFKA-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153320#comment-17153320 ] Tom Bentley commented on KAFKA-10245: - This is essentially a dupe of https://issues.apache.org/jira/browse/KAFKA-9366 > Using vulnerable log4j version > -- > > Key: KAFKA-10245 > URL: https://issues.apache.org/jira/browse/KAFKA-10245 > Project: Kafka > Issue Type: Bug > Components: core, KafkaConnect >Affects Versions: 2.5.0 >Reporter: Pavel Kuznetsov >Priority: Major > Labels: security > > *Description* > I checked kafka_2.12-2.5.0.tgz distribution with WhiteSource and find out > that log4j version, that used in kafka-connect and kafka-brocker, has > vulnerabilities > * log4j-1.2.17.jar has > [CVE-2019-17571|https://github.com/advisories/GHSA-2qrg-x229-3v8q] and > [CVE-2020-9488|https://github.com/advisories/GHSA-vwqq-5vrc-xw9h] > vulnerabilities. The way to fix it is to upgrade to > org.apache.logging.log4j:log4j-core:2.13.2 > *To Reproduce* > Download kafka_2.12-2.5.0.tgz > Open libs folder in it and find log4j-1.2.17.jar. > Check [CVE-2019-17571|https://github.com/advisories/GHSA-2qrg-x229-3v8q] and > [CVE-2020-9488|https://github.com/advisories/GHSA-vwqq-5vrc-xw9h] to see that > log4j 1.2.17 is vulnerable. > *Expected* > * log4j is log4j-core 2.13.2 or higher > *Actual* > * log4j is 1.2.17 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10245) Using vulnerable log4j version
[ https://issues.apache.org/jira/browse/KAFKA-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tom Bentley resolved KAFKA-10245. - Resolution: Duplicate > Using vulnerable log4j version > -- > > Key: KAFKA-10245 > URL: https://issues.apache.org/jira/browse/KAFKA-10245 > Project: Kafka > Issue Type: Bug > Components: core, KafkaConnect >Affects Versions: 2.5.0 >Reporter: Pavel Kuznetsov >Priority: Major > Labels: security > > *Description* > I checked kafka_2.12-2.5.0.tgz distribution with WhiteSource and find out > that log4j version, that used in kafka-connect and kafka-brocker, has > vulnerabilities > * log4j-1.2.17.jar has > [CVE-2019-17571|https://github.com/advisories/GHSA-2qrg-x229-3v8q] and > [CVE-2020-9488|https://github.com/advisories/GHSA-vwqq-5vrc-xw9h] > vulnerabilities. The way to fix it is to upgrade to > org.apache.logging.log4j:log4j-core:2.13.2 > *To Reproduce* > Download kafka_2.12-2.5.0.tgz > Open libs folder in it and find log4j-1.2.17.jar. > Check [CVE-2019-17571|https://github.com/advisories/GHSA-2qrg-x229-3v8q] and > [CVE-2020-9488|https://github.com/advisories/GHSA-vwqq-5vrc-xw9h] to see that > log4j 1.2.17 is vulnerable. > *Expected* > * log4j is log4j-core 2.13.2 or higher > *Actual* > * log4j is 1.2.17 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9482) mirror maker 2.0 doesn't replicates (create topic) created automatically by producing messages
[ https://issues.apache.org/jira/browse/KAFKA-9482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153322#comment-17153322 ] Aljoscha Pörtner commented on KAFKA-9482: - Maybe our problem is related to this. We see that the topic is created in the target cluster but after that no replication of messages takes place. After a restart of the mm2 everything works fine. > mirror maker 2.0 doesn't replicates (create topic) created automatically by > producing messages > -- > > Key: KAFKA-9482 > URL: https://issues.apache.org/jira/browse/KAFKA-9482 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Mikhail Grinfeld >Priority: Minor > > I have 2 kafka (3 brokers each) clusters and MirrorMaker instance. (built > with docker and docker-compose) > Both cluster is defined to create topics automatically (no > auto.create.topics.enable in conf and default, at least as it appears in > docs, is true). > If topic created on source cluster before MirrorMaker starts, everything > works as expected: producig messages to source cluster, causes replication to > destination cluster. > If topic doesn't exist on source cluster, when starting to produce messages, > it created in source cluster, but not in destination - and no replication > performed. > > Using following mm2.properties: > {code:java} > # mm2.propertiesclusters=src,dest > src.bootstrap.servers=kafka-1:9092,kafka-2:19092,kafka-3:29092 > dest.bootstrap.servers=kafka-4:39092,kafka-5:49092,kafka-6:59092 > src->dest.enabled=true > src->dest.topics=.* > {code} > and running MirrorMaker with > {code:java} > connect-mirror-maker /etc/mm2.properties --clusters src dest > {code} > Note: > when I am using Kafka-Streams to read from initial topic, there are few > KTable topics created automatically by Kafka-Stream - and these topics > created OK (of course, when initial topic created at the beginning) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tombentley commented on pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand
tombentley commented on pull request #8808: URL: https://github.com/apache/kafka/pull/8808#issuecomment-655357176 @rajinisivaram @omkreddy please could you review or suggest another committer for review? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores
[ https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153365#comment-17153365 ] Di Campo commented on KAFKA-4273: - Just tried it, and WindowStoreIterator.remove is an unsupported operation. So, using WindowStore, the solution could be as suggested: iterate all of the events to get the last one to see if the last one is . This trades a nice self-removal mechanism for some inefficient querying. If you expect low update frequency, that may just be your game; but on high number of update on duplicates performance may be affected (note here: only the changes need to be actually stored). TTL in KV stores would be a good if not perfect fit for this case :) > Streams DSL - Add TTL / retention period support for intermediate topics and > state stores > - > > Key: KAFKA-4273 > URL: https://issues.apache.org/jira/browse/KAFKA-4273 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Davor Poldrugo >Priority: Major > > Hi! > I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state > as far as I know - it's not configurable. > In my use case my data has TTL / retnetion period. It's 48 hours. After that > - data can be discarded. > I join two topics: "messages" and "prices" using windowed inner join. > The two intermediate Kafka topics for this join are named: > * messages-prices-join-this-changelog > * messages-prices-join-other-changelog > Since these topics are created as compacted by Kafka Streams, and I don't > wan't to keep data forever, I have altered them to not use compaction. Right > now my RocksDB state stores grow indefinitely, and I don't have any options > to define TTL, or somehow periodically clean the older data. > A "hack" that I use to keep my disk usage low - I have schedulled a job to > periodically stop Kafka Streams instances - one at the time. This triggers a > rebalance, and partitions migrate to other instances. When the instance is > started again, there's another rebalance, and sometimes this instance starts > processing partitions that wasn't processing before the stop - which leads to > deletion of the RocksDB state store for those partitions > (state.cleanup.delay.ms). In the next rebalance the local store is recreated > with a restore consumer - which reads data from - as previously mentioned - a > non compacted topic. And this effectively leads to a "hacked TTL support" in > Kafka Streams DSL. > Questions: > * Do you think would be reasonable to add support in the DSL api to define > TTL for local store? > * Which opens another question - there are use cases which don't need the > intermediate topics to be created as "compact". Could also this be added to > the DSL api? Maybe only this could be added, and this flag should also be > used for the RocksDB TTL. Of course in this case another config would be > mandatory - the retention period or TTL for the intermediate topics and the > state stores. I saw there is a new cleanup.policy - compact_and_delete - > added with KAFKA-4015. > * Which also leads to another question, maybe some intermediate topics / > state stores need different TTL, so a it's not as simple as that. But after > KAFKA-3870, it will be easier. > RocksDB supports TTL: > * > https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166 > * https://github.com/facebook/rocksdb/wiki/Time-to-Live > * > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java > A somehow similar issue: KAFKA-4212 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram commented on pull request #8979: KAFKA-10223; Make ReplicaNotAvailableException retriable metadata exception
rajinisivaram commented on pull request #8979: URL: https://github.com/apache/kafka/pull/8979#issuecomment-655384902 @ijuma @hachikuji @edenhill Thanks for the responses. I will follow up on KIP-392 discussion thread and update the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores
[ https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153365#comment-17153365 ] Di Campo edited comment on KAFKA-4273 at 7/8/20, 8:56 AM: -- Just tried it, and WindowStoreIterator.remove is an unsupported operation. So, using WindowStore, the solution could be as suggested: iterate all of the events to get the last one to see if the last one is . This trades a nice self-removal mechanism for some inefficient querying. If you expect low update frequency, that may just be your game; but on high number of update on duplicates performance may be affected (note here: only the changes need to be actually stored). To implement some kind of TTL in KV, I suppose you can use punctuation to perform store deletion inside the scheduled punctuation. However that would make for a potentially long removal at a certain point in time. (BTW, does Punctuator execution stop the processing? In that case it would be dangerous to wait too long to do it ). All in all, TTL in KV stores would be a good if not perfect fit for this case :) was (Author: xmar): Just tried it, and WindowStoreIterator.remove is an unsupported operation. So, using WindowStore, the solution could be as suggested: iterate all of the events to get the last one to see if the last one is . This trades a nice self-removal mechanism for some inefficient querying. If you expect low update frequency, that may just be your game; but on high number of update on duplicates performance may be affected (note here: only the changes need to be actually stored). TTL in KV stores would be a good if not perfect fit for this case :) > Streams DSL - Add TTL / retention period support for intermediate topics and > state stores > - > > Key: KAFKA-4273 > URL: https://issues.apache.org/jira/browse/KAFKA-4273 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Davor Poldrugo >Priority: Major > > Hi! > I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state > as far as I know - it's not configurable. > In my use case my data has TTL / retnetion period. It's 48 hours. After that > - data can be discarded. > I join two topics: "messages" and "prices" using windowed inner join. > The two intermediate Kafka topics for this join are named: > * messages-prices-join-this-changelog > * messages-prices-join-other-changelog > Since these topics are created as compacted by Kafka Streams, and I don't > wan't to keep data forever, I have altered them to not use compaction. Right > now my RocksDB state stores grow indefinitely, and I don't have any options > to define TTL, or somehow periodically clean the older data. > A "hack" that I use to keep my disk usage low - I have schedulled a job to > periodically stop Kafka Streams instances - one at the time. This triggers a > rebalance, and partitions migrate to other instances. When the instance is > started again, there's another rebalance, and sometimes this instance starts > processing partitions that wasn't processing before the stop - which leads to > deletion of the RocksDB state store for those partitions > (state.cleanup.delay.ms). In the next rebalance the local store is recreated > with a restore consumer - which reads data from - as previously mentioned - a > non compacted topic. And this effectively leads to a "hacked TTL support" in > Kafka Streams DSL. > Questions: > * Do you think would be reasonable to add support in the DSL api to define > TTL for local store? > * Which opens another question - there are use cases which don't need the > intermediate topics to be created as "compact". Could also this be added to > the DSL api? Maybe only this could be added, and this flag should also be > used for the RocksDB TTL. Of course in this case another config would be > mandatory - the retention period or TTL for the intermediate topics and the > state stores. I saw there is a new cleanup.policy - compact_and_delete - > added with KAFKA-4015. > * Which also leads to another question, maybe some intermediate topics / > state stores need different TTL, so a it's not as simple as that. But after > KAFKA-3870, it will be easier. > RocksDB supports TTL: > * > https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166 > * https://github.com/facebook/rocksdb/wiki/Time-to-Live > * > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java > A somehow similar issue: KAFKA-4212 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] akatona84 opened a new pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest
akatona84 opened a new pull request #8992: URL: https://github.com/apache/kafka/pull/8992 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akatona84 commented on pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest
akatona84 commented on pull request #8992: URL: https://github.com/apache/kafka/pull/8992#issuecomment-655458870 Hi @ijuma , I've encountered a test failure here which caused unexpected threads in the following tests `@Before` phase. This fixes the issue in case failure happen, only that would be reported (and not together with .classMethod ending failures) Could you take a look on this please? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akatona84 edited a comment on pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest
akatona84 edited a comment on pull request #8992: URL: https://github.com/apache/kafka/pull/8992#issuecomment-655458870 Hi @ijuma , I've encountered a test failure here which caused unexpected threads in the following tests `@Before` phase. This fixes the issue in case failure happens, only that would be reported (and not together with .classMethod ending failures) Could you take a look on this please? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy edited a comment on pull request #8966: KAFKA-10220: add null check for configurationKey
omkreddy edited a comment on pull request #8966: URL: https://github.com/apache/kafka/pull/8966#issuecomment-655468333 @tombentley @showuon Can you help me to understand where are we calling `DescribeConfigsRequestData.setConfigurationKeys()` in AdminClient? looks like `ConfigurationKeys` are always null. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #8966: KAFKA-10220: add null check for configurationKey
omkreddy commented on pull request #8966: URL: https://github.com/apache/kafka/pull/8966#issuecomment-655468333 @tombentley @showuon Can you help me to understand where are we calling `DescribeConfigsRequestData.setConfigurationKeys()` in AdminClient? looks like we are always setting to null. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sasukerui commented on pull request #8742: KAFKA-10057 optimize class ConfigCommand method alterConfig parameters
sasukerui commented on pull request #8742: URL: https://github.com/apache/kafka/pull/8742#issuecomment-655474005 > @sasukerui Can you please change the PR to target `master`? ok,l see This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #8986: KAFKA-10233; Add backoff after AuthorizationExceptions in consumer
rajinisivaram commented on a change in pull request #8986: URL: https://github.com/apache/kafka/pull/8986#discussion_r451498688 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -460,15 +463,21 @@ boolean joinGroupIfNeeded(final Timer timer) { exception instanceof IllegalGenerationException || exception instanceof MemberIdRequiredException) continue; -else if (!future.isRetriable()) -throw exception; - -timer.sleep(rebalanceConfig.retryBackoffMs); +else { +handleFailure(future, timer); +} } } return true; } +protected void handleFailure(RequestFuture future, Timer timer) { +if (future.isRetriable() || future.exception() instanceof AuthorizationException) Review comment: @abbccdda Thanks for the review. The instance we saw was an authorization failure for groups, but since I was fixing handling of all responses, I used `AuthorizationException` in the check since we want to ensure we backoff for topic authorization exceptions too. We already backoff for `RetriableException` and I haven't changed that behaviour, this PR addresses exceptions that are not subclasses of `RetriableException`. Perhaps just updating `AuthorizationException` as is currently done in the PR is sufficient? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #8966: KAFKA-10220: add null check for configurationKey
showuon commented on pull request #8966: URL: https://github.com/apache/kafka/pull/8966#issuecomment-655510801 @omkreddy , I can take a look tomorrow ( my time). Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #8966: KAFKA-10220: add null check for configurationKey
tombentley commented on pull request #8966: URL: https://github.com/apache/kafka/pull/8966#issuecomment-655533627 @omkreddy that is... an excellent question. * In `HEAD` the only place it's set to non-null is in `RequestResponseTest` * In 9a4f00f78b (i.e. just before the change to use the generated message classes the situation is the same: Only non-null call sites are two tests in `RequestResponseTest` * ... and I chased it all the way back to 972b7545363ae, when the RPC was added, and it seems that this has always been the situation. One or two callers with non-null keys in `RequestResponseTest`. Thus supported (but without any test coverage) at the protocol level, but never exposed in the Java API. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-8794) Deprecate DescribeLogDirsResponse.[LogDirInfo, ReplicaInfo]
[ https://issues.apache.org/jira/browse/KAFKA-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjin Lee resolved KAFKA-8794. Resolution: Duplicate Duplication of KAFKA-10120. > Deprecate DescribeLogDirsResponse.[LogDirInfo, ReplicaInfo] > --- > > Key: KAFKA-8794 > URL: https://issues.apache.org/jira/browse/KAFKA-8794 > Project: Kafka > Issue Type: Improvement > Components: clients, documentation >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Minor > Labels: needs-kip > > As of 2.3.0, {{DescribeLogDirsResult}} returned by > {{AdminClient#describeLogDirs(Collection)}} is exposing the internal data > structure, {{DescribeLogDirsResponse.LogDirInfo}}. By doing so, its Javadoc > provides no documentation on it. Its imparity is clear when comparing with > {{DescribeReplicaLogDirsResult}}, returned by > {{AdminClient#describeReplicaLogDirs(Collection)}}. > To resolve this, {{DescribeLogDirsResponse.[LogDirInfo, ReplicaInfo]}} should > be deprecated and hided from the public later; instead, > {{org.apache.kafka.clients.admin.DescribeLogDirsResult}} should provide > {{[LogDirInfo, ReplicaInfo]}} as its internal class, like > {{DescribeReplicaLogDirsResult}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dongjinleekr closed pull request #7204: KAFKA-8794: Deprecate DescribeLogDirsResponse.[LogDirInfo, ReplicaInfo]
dongjinleekr closed pull request #7204: URL: https://github.com/apache/kafka/pull/7204 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
vvcephei commented on pull request #8902: URL: https://github.com/apache/kafka/pull/8902#issuecomment-64900 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
vvcephei commented on pull request #8902: URL: https://github.com/apache/kafka/pull/8902#issuecomment-65002 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
vvcephei commented on pull request #8902: URL: https://github.com/apache/kafka/pull/8902#issuecomment-65206 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand
omkreddy commented on pull request #8808: URL: https://github.com/apache/kafka/pull/8808#issuecomment-65622 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #8985: MINOR; KafkaAdminClient#alterReplicaLogDirs should not fail all the futures when only one call fails
dajac commented on pull request #8985: URL: https://github.com/apache/kafka/pull/8985#issuecomment-655567477 Jenkins looks good. There were one unrelated failed test during the JDK 14 and Scala 2.13 run. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10226) KStream without SASL information should return error in confluent cloud
[ https://issues.apache.org/jira/browse/KAFKA-10226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153667#comment-17153667 ] John Roesler commented on KAFKA-10226: -- Sounds good :) > KStream without SASL information should return error in confluent cloud > --- > > Key: KAFKA-10226 > URL: https://issues.apache.org/jira/browse/KAFKA-10226 > Project: Kafka > Issue Type: Bug > Components: clients, streams >Affects Versions: 2.5.0 >Reporter: Werner Daehn >Priority: Minor > > I have create a KStream against the Confluent cloud and wondered why no data > has been received from the source. Reason was that I forgot to add the SASL > api keys and secrets. > > For end users this might lead to usability issues. If the KStream wants to > read from a topic and is not allowed to, this should raise an error, not be > silently ignored. > > Hoe do producer/consumer clients handle that situation? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei opened a new pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)
vvcephei opened a new pull request #8993: URL: https://github.com/apache/kafka/pull/8993 Replaces the previous upgrade test's trivial Streams app with the commonly used SmokeTest, exercising many more features. Also adjust the test matrix to test upgrading from each released version since 2.0 to the current branch. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on a change in pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand
omkreddy commented on a change in pull request #8808: URL: https://github.com/apache/kafka/pull/8808#discussion_r451626581 ## File path: core/src/test/scala/unit/kafka/admin/AclCommandTest.scala ## @@ -161,10 +193,34 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { testProducerConsumerCli(adminArgs) } + @Test + def testAclCliWithClientId(): Unit = { +val adminClientConfig = File.createTempFile(classOf[AclCommandTest].getName, "createServer") Review comment: We can use `TestUtils.tempFile/TestUtils.tempFile(String)`. ## File path: core/src/test/scala/unit/kafka/admin/AclCommandTest.scala ## @@ -161,10 +193,34 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { testProducerConsumerCli(adminArgs) } + @Test + def testAclCliWithClientId(): Unit = { +val adminClientConfig = File.createTempFile(classOf[AclCommandTest].getName, "createServer") +adminClientConfig.deleteOnExit() +val pw = new PrintWriter(adminClientConfig) +pw.println("client.id=my-client") +pw.close() + +createServer(Some(adminClientConfig)) + +val appender = LogCaptureAppender.createAndRegister() +val previousLevel = LogCaptureAppender.setClassLoggerLevel(classOf[AppInfoParser], Level.WARN) + +testAclCli(adminArgs) + +LogCaptureAppender.setClassLoggerLevel(classOf[AppInfoParser], previousLevel) Review comment: we may want to reset it in `finally` block? ## File path: core/src/main/scala/kafka/admin/AclCommand.scala ## @@ -130,33 +130,39 @@ object AclCommand extends Logging { } } -listAcls() +listAcls(adminClient) } } def listAcls(): Unit = { withAdminClient(opts) { adminClient => -val filters = getResourceFilter(opts, dieIfNoResourceFound = false) -val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt) -val resourceToAcls = getAcls(adminClient, filters) +listAcls(adminClient) + } +} -if (listPrincipals.isEmpty) { - for ((resource, acls) <- resourceToAcls) -println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") -} else { - listPrincipals.foreach(principal => { -println(s"ACLs for principal `$principal`") -val filteredResourceToAcls = resourceToAcls.map { case (resource, acls) => - resource -> acls.filter(acl => principal.toString.equals(acl.principal)) -}.filter { case (_, acls) => acls.nonEmpty } +private def listAcls(adminClient: Admin) = { Review comment: nit: Can we add `: Unit` return type here and in other new methods? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)
vvcephei commented on a change in pull request #8993: URL: https://github.com/apache/kafka/pull/8993#discussion_r451634562 ## File path: build.gradle ## @@ -1508,6 +1508,18 @@ project(':streams:upgrade-system-tests-24') { } } +project(':streams:upgrade-system-tests-25') { Review comment: I realized this just now. We really should test upgrading from 2.5.0 to 2.5.1, and similar for every bugfix release. In other words, we should add/update "LATEST_X_Y" to the test matrix of the X.Y branch after each release. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java ## @@ -92,6 +94,13 @@ public void shouldWorkWithRebalance() throws InterruptedException { final Properties props = new Properties(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); +props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); +props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2); +props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100); +props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); +props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); +props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); +props.put(ProducerConfig.ACKS_CONFIG, "all"); Review comment: I'm not sure exactly what it was, but the integration test was failing after the (backported) change to move the property definitions into python. I'll send a new PR to trunk to add these configs to the integration test, just in case not having them is a source of flakiness. ## File path: tests/kafkatest/tests/streams/streams_application_upgrade_test.py ## @@ -0,0 +1,297 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +from ducktape.mark import matrix +from ducktape.tests.test import Test +from ducktape.utils.util import wait_until +from kafkatest.services.kafka import KafkaService +from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.version import LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_VERSION, KafkaVersion + +smoke_test_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)] Review comment: Note, in 2.5, I was able to add 2.0 and 2.1 to the upgrade matrix. This means we did break upgradability from anything 2.1- to 2.6+ in the 2.6 release. People would have to upgrade to 2.5 first and then upgrade to 2.6. Should we block the release and fix this for 2.6.0? ## File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java ## @@ -45,21 +47,48 @@ public Processor get() { return new AbstractProcessor() { private int numRecordsProcessed = 0; +private long smallestOffset = Long.MAX_VALUE; +private long largestOffset = Long.MIN_VALUE; @Override public void init(final ProcessorContext context) { super.init(context); +LOG.info("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId()); System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId()); +System.out.flush(); numRecordsProcessed = 0; +smallestOffset = Long.MAX_VALUE; +largestOffset = Long.MIN_VALUE; } @Override public void process(final Object key, final Object value) { numRecordsProcessed++; -if (numRecordsProcessed % 100 == 0) { Review comment: I had to take this out to make the upgrade test pass. The problem was that after upgrading, not all the instances would get a task for the input topic (`data`). When an instance has only to process repartition topics, it d
[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)
vvcephei commented on pull request #8993: URL: https://github.com/apache/kafka/pull/8993#issuecomment-655605721 Started the new test: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4015/ And all streams tests: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4016/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand
tombentley commented on pull request #8808: URL: https://github.com/apache/kafka/pull/8808#issuecomment-655617244 @omkreddy done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9144) Early expiration of producer state can cause coordinator epoch to regress
[ https://issues.apache.org/jira/browse/KAFKA-9144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-9144: --- Fix Version/s: 2.3.2 2.2.3 > Early expiration of producer state can cause coordinator epoch to regress > - > > Key: KAFKA-9144 > URL: https://issues.apache.org/jira/browse/KAFKA-9144 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.2.3, 2.3.2, 2.4.1 > > > Transaction markers are written by the transaction coordinator. In order to > fence zombie coordinators, we use the leader epoch associated with the > coordinator partition. Partition leaders verify the epoch in the > WriteTxnMarker request and ensure that it can only increase. However, when > producer state expires, we stop tracking the epoch and it is possible for > monotonicity to be violated. Generally we expect expiration to be on the > order of days, so it should be unlikely for this to be a problem. > At least that is the theory. We observed a case where a coordinator epoch > decreased between nearly consecutive writes within a couple minutes of each > other. Upon investigation, we found that producer state had been incorrectly > expired. We believe the sequence of events is the following: > # Producer writes transactional data and fails before committing > # Coordinator times out the transaction and writes ABORT markers > # Upon seeing the ABORT and the bumped epoch, the partition leader deletes > state from the last epoch, which effectively resets the last timestamp for > the producer to -1. > # The coordinator becomes a zombie before getting a successful response and > continues trying to send > # The new coordinator notices the incomplete transaction and also sends > markers > # The partition leader accepts the write from the new coordinator > # The producer state is expired because the last timestamp was -1 > # The partition leader accepts the write from the old coordinator > Basically it takes an alignment of planets to hit this bug, but it is > possible. If you hit it, then the broker may be unable to start because we > validate epoch monotonicity during log recovery. The problem is in 3 when the > timestamp gets reset. We should use the timestamp from the marker instead. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9144) Early expiration of producer state can cause coordinator epoch to regress
[ https://issues.apache.org/jira/browse/KAFKA-9144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-9144: --- Affects Version/s: 2.0.1 2.1.1 2.2.2 2.4.0 2.3.1 > Early expiration of producer state can cause coordinator epoch to regress > - > > Key: KAFKA-9144 > URL: https://issues.apache.org/jira/browse/KAFKA-9144 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.4.0, 2.3.1 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.2.3, 2.3.2, 2.4.1 > > > Transaction markers are written by the transaction coordinator. In order to > fence zombie coordinators, we use the leader epoch associated with the > coordinator partition. Partition leaders verify the epoch in the > WriteTxnMarker request and ensure that it can only increase. However, when > producer state expires, we stop tracking the epoch and it is possible for > monotonicity to be violated. Generally we expect expiration to be on the > order of days, so it should be unlikely for this to be a problem. > At least that is the theory. We observed a case where a coordinator epoch > decreased between nearly consecutive writes within a couple minutes of each > other. Upon investigation, we found that producer state had been incorrectly > expired. We believe the sequence of events is the following: > # Producer writes transactional data and fails before committing > # Coordinator times out the transaction and writes ABORT markers > # Upon seeing the ABORT and the bumped epoch, the partition leader deletes > state from the last epoch, which effectively resets the last timestamp for > the producer to -1. > # The coordinator becomes a zombie before getting a successful response and > continues trying to send > # The new coordinator notices the incomplete transaction and also sends > markers > # The partition leader accepts the write from the new coordinator > # The producer state is expired because the last timestamp was -1 > # The partition leader accepts the write from the old coordinator > Basically it takes an alignment of planets to hit this bug, but it is > possible. If you hit it, then the broker may be unable to start because we > validate epoch monotonicity during log recovery. The problem is in 3 when the > timestamp gets reset. We should use the timestamp from the marker instead. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] omkreddy commented on a change in pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand
omkreddy commented on a change in pull request #8808: URL: https://github.com/apache/kafka/pull/8808#discussion_r451674847 ## File path: core/src/test/scala/unit/kafka/admin/AclCommandTest.scala ## @@ -127,25 +131,53 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { testAclCli(adminArgs) } - private def createServer(): Unit = { + private def createServer(commandConfig: Option[File] = None): Unit = { servers = Seq(TestUtils.createServer(KafkaConfig.fromProps(brokerProps))) val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) -adminArgs = Array("--bootstrap-server", TestUtils.bootstrapServers(servers, listenerName)) + +var adminArgs = Array("--bootstrap-server", TestUtils.bootstrapServers(servers, listenerName)) +if (commandConfig.isDefined) { + adminArgs ++= Array("--command-config", commandConfig.get.getAbsolutePath) +} +this.adminArgs = adminArgs + } + + private def callMain(args: Array[String]): (String, String) = { +TestUtils.grabConsoleOutputAndError(AclCommand.main(args)) } private def testAclCli(cmdArgs: Array[String]): Unit = { for ((resources, resourceCmd) <- ResourceToCommand) { for (permissionType <- Set(ALLOW, DENY)) { val operationToCmd = ResourceToOperations(resources) val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1) - AclCommand.main(cmdArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add") - for (resource <- resources) { -withAuthorizer() { authorizer => - TestUtils.waitAndVerifyAcls(acls, authorizer, resource) -} +val (addOut, addErr) = callMain(cmdArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add") +assertOutputContains("Adding ACLs", resources, resourceCmd, addOut) +assertOutputContains("Current ACLs", resources, resourceCmd, addOut) +Assert.assertEquals("", addErr) + +for (resource <- resources) { + withAuthorizer() { authorizer => +TestUtils.waitAndVerifyAcls(acls, authorizer, resource) } +} - testRemove(cmdArgs, resources, resourceCmd) +val (listOut, listErr) = callMain(cmdArgs :+ "--list") +assertOutputContains("Current ACLs", resources, resourceCmd, listOut) +Assert.assertEquals("", listErr) + +testRemove(cmdArgs, resources, resourceCmd) + } +} + } + + private def assertOutputContains(prefix: String, resources: Set[ResourcePattern], resourceCmd: Array[String], output: String) = { Review comment: nit: missing return type ## File path: core/src/test/scala/unit/kafka/admin/AclCommandTest.scala ## @@ -161,10 +193,35 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { testProducerConsumerCli(adminArgs) } + @Test + def testAclCliWithClientId(): Unit = { +val adminClientConfig = TestUtils.tempFile() +adminClientConfig.deleteOnExit() Review comment: This is not required now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #8864: KAFKA-9274: Mark `retries` config as deprecated and add new `task.timeout.ms` config
hachikuji commented on a change in pull request #8864: URL: https://github.com/apache/kafka/pull/8864#discussion_r451682470 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -1447,6 +1451,7 @@ public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exceptio } } +@Deprecated Review comment: Fair enough. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #8934: KAFKA-10134: Use long poll if we do not have fetchable partitions
guozhangwang merged pull request #8934: URL: https://github.com/apache/kafka/pull/8934 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8934: KAFKA-10134: Use long poll if we do not have fetchable partitions
guozhangwang commented on pull request #8934: URL: https://github.com/apache/kafka/pull/8934#issuecomment-655636676 Cherry-picked to 2.6. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153757#comment-17153757 ] Guozhang Wang commented on KAFKA-10134: --- I've merged the PR and would like [~seanguo] [~neowu0] to verify if it has fixed their observed issue. > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tombentley commented on pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand
tombentley commented on pull request #8808: URL: https://github.com/apache/kafka/pull/8808#issuecomment-655641999 @omkreddy, sorry about that, now fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10226) KStream without SASL information should return error in confluent cloud
[ https://issues.apache.org/jira/browse/KAFKA-10226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153768#comment-17153768 ] Werner Daehn commented on KAFKA-10226: -- Here is my verdict after testing: Producer, Consumer, KStreams, AdminClient all do *not* throw an exception. I have not found a method to figure out if the provided values are valid. Test case: {{try {}} {{ Map producerprops = new HashMap<>();}} {{ producerprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 'wrong.aws.confluent.cloud:9092');}} {{ producerprops.put(ProducerConfig.ACKS_CONFIG, "all");}} {{ producerprops.put(ProducerConfig.RETRIES_CONFIG, 0);}} {{ producerprops.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);}} {{ producerprops.put(ProducerConfig.LINGER_MS_CONFIG, 1);}} {{ producerprops.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);}} {{ producerprops.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);}} {{ producerprops.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);}} {{ this.producer = new KafkaProducer(producerprops);}} {{} catch (Throwable e) {}} {{ e.printStacktrace();}} {{}}} If you execute this code segment it will not create an exception, although the hostname is not running a Kafka instance for sure. Just by looking at the this.producer I cannot tell if the connection is valid or not. When a end user configures the Kafka properties, the expectation is to get concise feedback. hostname cannot be resolved. server refuses connection at that port. Server does not respond. Endpoint is fine but the security settings are missing. As this is so basic, I wonder why nobody has ever raised that or rather I do not know something. Any ideas? > KStream without SASL information should return error in confluent cloud > --- > > Key: KAFKA-10226 > URL: https://issues.apache.org/jira/browse/KAFKA-10226 > Project: Kafka > Issue Type: Bug > Components: clients, streams >Affects Versions: 2.5.0 >Reporter: Werner Daehn >Priority: Minor > > I have create a KStream against the Confluent cloud and wondered why no data > has been received from the source. Reason was that I forgot to add the SASL > api keys and secrets. > > For end users this might lead to usability issues. If the KStream wants to > read from a topic and is not allowed to, this should raise an error, not be > silently ignored. > > Hoe do producer/consumer clients handle that situation? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
vvcephei commented on pull request #8902: URL: https://github.com/apache/kafka/pull/8902#issuecomment-655648704 Failure was unrelated: kafka.api.SaslMultiMechanismConsumerTest.testCoordinatorFailover This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand
omkreddy commented on pull request #8808: URL: https://github.com/apache/kafka/pull/8808#issuecomment-655651164 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10226) KStream without SASL information should return error in confluent cloud
[ https://issues.apache.org/jira/browse/KAFKA-10226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153768#comment-17153768 ] Werner Daehn edited comment on KAFKA-10226 at 7/8/20, 5:26 PM: --- Here is my verdict after testing: Producer, Consumer, KStreams, AdminClient all do *not* throw an exception. I have not found a method to figure out if the provided values are valid. Test case: {{try {}} {{ Map producerprops = new HashMap<>();}} {{ producerprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 'pkc-41wq6.eu-west-2.aws.confluent.cloud:1234');}} {{ producerprops.put(ProducerConfig.ACKS_CONFIG, "all");}} {{ producerprops.put(ProducerConfig.RETRIES_CONFIG, 0);}} {{ producerprops.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);}} {{ producerprops.put(ProducerConfig.LINGER_MS_CONFIG, 1);}} {{ producerprops.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);}} {{ producerprops.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);}} {{ producerprops.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);}} {{ this.producer = new KafkaProducer(producerprops);}} {{} catch (Throwable e) {}} {{ e.printStacktrace();}} {{}}} If you execute this code segment it will not create an exception, although the port is not valid for sure. Just by looking at the this.producer I cannot tell if the connection is valid or not. When a end user configures the Kafka properties, the expectation is to get concise feedback. Invalid hostname I get as feedback. Invalid port or security I do not. As this is so basic, I wonder why nobody has ever raised that or rather I do not know something. Any ideas? was (Author: wdaehn): Here is my verdict after testing: Producer, Consumer, KStreams, AdminClient all do *not* throw an exception. I have not found a method to figure out if the provided values are valid. Test case: {{try {}} {{ Map producerprops = new HashMap<>();}} {{ producerprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 'wrong.aws.confluent.cloud:9092');}} {{ producerprops.put(ProducerConfig.ACKS_CONFIG, "all");}} {{ producerprops.put(ProducerConfig.RETRIES_CONFIG, 0);}} {{ producerprops.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);}} {{ producerprops.put(ProducerConfig.LINGER_MS_CONFIG, 1);}} {{ producerprops.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);}} {{ producerprops.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);}} {{ producerprops.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);}} {{ this.producer = new KafkaProducer(producerprops);}} {{} catch (Throwable e) {}} {{ e.printStacktrace();}} {{}}} If you execute this code segment it will not create an exception, although the hostname is not running a Kafka instance for sure. Just by looking at the this.producer I cannot tell if the connection is valid or not. When a end user configures the Kafka properties, the expectation is to get concise feedback. hostname cannot be resolved. server refuses connection at that port. Server does not respond. Endpoint is fine but the security settings are missing. As this is so basic, I wonder why nobody has ever raised that or rather I do not know something. Any ideas? > KStream without SASL information should return error in confluent cloud > --- > > Key: KAFKA-10226 > URL: https://issues.apache.org/jira/browse/KAFKA-10226 > Project: Kafka > Issue Type: Bug > Components: clients, streams >Affects Versions: 2.5.0 >Reporter: Werner Daehn >Priority: Minor > > I have create a KStream against the Confluent cloud and wondered why no data > has been received from the source. Reason was that I forgot to add the SASL > api keys and secrets. > > For end users this might lead to usability issues. If the KStream wants to > read from a topic and is not allowed to, this should raise an error, not be > silently ignored. > > Hoe do producer/consumer clients handle that situation? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)
vvcephei commented on a change in pull request #8993: URL: https://github.com/apache/kafka/pull/8993#discussion_r451712293 ## File path: tests/kafkatest/tests/streams/streams_application_upgrade_test.py ## @@ -0,0 +1,297 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +from ducktape.mark import matrix +from ducktape.tests.test import Test +from ducktape.utils.util import wait_until +from kafkatest.services.kafka import KafkaService +from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.version import LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_VERSION, KafkaVersion + +smoke_test_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)] Review comment: Actually, I just tried 2.1 and 2.0 again (with a full bounce now, instead of a rolling bounce), and it works on 2.6/trunk. So, it seems only the rolling bounce path has been broken. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10247) Streams may attempt to process after closing a task
John Roesler created KAFKA-10247: Summary: Streams may attempt to process after closing a task Key: KAFKA-10247 URL: https://issues.apache.org/jira/browse/KAFKA-10247 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.0 Reporter: John Roesler Assignee: John Roesler Observed in a system test. A corrupted task was detected, and Stream properly closed it as dirty: {code:java} [2020-07-08 17:08:09,345] WARN stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records from restore consumer for partitions [SmokeTest-cntStoreName-changelog-1], it is likely that the consumer's position has fallen out of the topic partition offset range because the topic was truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing it later. (org.apache.kafka.streams.processor.internals.StoreChangelogReader) org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: null)], epoch=0}} is out of range for partition SmokeTest-cntStoreName-changelog-1 at org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344) at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507) [2020-07-08 17:08:09,345] WARN stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Detected the states of tasks {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted. Will close the task as dirty and re-create and bootstrap from scratch. (org.apache.kafka.streams.processor.internals.StreamThread) org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted and hence needs to be re-initialized at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507) Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: null)], epoch=0}} is out of range for partition SmokeTest-cntStoreName-changelog-1 at org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344) at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433) ... 3 more [2020-07-08 17:08:09,346] INFO stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] Suspended running (org.apache.kafka.streams.processor.internals.StreamTask) [2020-07-08 17:08:09,346] DEBUG stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] Closing its state manager and all the registered state stores: {sum-STATE-STORE-50=StateStoreMetadata (sum-STATE-STORE-50 : SmokeTest-sum-STATE-STORE-50-changelog-1 @ null, cntStoreName=StateStoreMetadata (cntStoreName : SmokeTest-cntStoreName-changelog-1 @ 0} (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2020-07-08 17:08:09,346] INFO [Consumer clientId=SmokeTest-66676ca
[GitHub] [kafka] vvcephei opened a new pull request #8994: KAFKA-10247: Skip processing if task isn't running
vvcephei opened a new pull request #8994: URL: https://github.com/apache/kafka/pull/8994 The definition of `Task#process` allows for returning `false` if the task isn't "processable" right now. We can conveniently resolve KAFKA-10247 by simply considering a task not processable when it is not running. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10247) Streams may attempt to process after closing a task
[ https://issues.apache.org/jira/browse/KAFKA-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10247: - Priority: Blocker (was: Major) > Streams may attempt to process after closing a task > --- > > Key: KAFKA-10247 > URL: https://issues.apache.org/jira/browse/KAFKA-10247 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > > Observed in a system test. A corrupted task was detected, and Stream properly > closed it as dirty: > {code:java} > [2020-07-08 17:08:09,345] WARN stream-thread > [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered > org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records > from restore consumer for partitions [SmokeTest-cntStoreName-changelog-1], it > is likely that the consumer's position has fallen out of the topic partition > offset range because the topic was truncated or compacted on the broker, > marking the corresponding tasks as corrupted and re-initializing it later. > (org.apache.kafka.streams.processor.internals.StoreChangelogReader) > org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position > FetchPosition{offset=1, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: > null)], epoch=0}} is out of range for partition > SmokeTest-cntStoreName-changelog-1 >at > org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344) >at > org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296) >at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611) >at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280) >at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) >at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) >at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433) >at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664) >at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548) >at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507) > [2020-07-08 17:08:09,345] WARN stream-thread > [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Detected the > states of tasks {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted. > Will close the task as dirty and re-create and bootstrap from scratch. > (org.apache.kafka.streams.processor.internals.StreamThread) > org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs > {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted and hence needs to > be re-initialized >at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) >at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664) >at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548) >at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507) > Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch > position FetchPosition{offset=1, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: > null)], epoch=0}} is out of range for partition > SmokeTest-cntStoreName-changelog-1 >at > org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344) >at > org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296) >at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611) >at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280) >at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) >at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) >at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433) >... 3 more > [2020-07-08 17:08:09,346] INFO stream-thread > [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] > Suspended running (org.apache.kafka.streams.processor.internals.StreamTask) > [2020-07-08 17:08:09,346] DEBUG stream-thread > [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] > Closing its state manager and a
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153852#comment-17153852 ] Neo Wu commented on KAFKA-10134: Hi, [~guozhang] Your latest change fixed all my issues, Thanks! > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-10134. --- Resolution: Fixed > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153861#comment-17153861 ] Guozhang Wang commented on KAFKA-10134: --- Thanks for the confirmation! I'm resolving this ticket then. > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153862#comment-17153862 ] Guozhang Wang commented on KAFKA-10134: --- cc 2.5.1 release manager [~vvcephei] I'm merging it to 2.5 branch too. > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #8994: KAFKA-10247: Skip processing if task isn't running
mjsax commented on pull request #8994: URL: https://github.com/apache/kafka/pull/8994#issuecomment-655701085 It seem the root cause of the issue is that we don't cleanup internal data structures correctly when closing a task dirty? Should the task not be removed from the list of active tasks instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #8985: MINOR; KafkaAdminClient#alterReplicaLogDirs should not fail all the futures when only one call fails
cmccabe merged pull request #8985: URL: https://github.com/apache/kafka/pull/8985 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JimGalasyn opened a new pull request #8995: Restore stream-table duality description
JimGalasyn opened a new pull request #8995: URL: https://github.com/apache/kafka/pull/8995 The stream-table duality section was dropped inadvertently sometime after version 0.11.0, so this PR restores it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10218) DistributedHerder's canReadConfigs field is never reset to true
[ https://issues.apache.org/jira/browse/KAFKA-10218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153936#comment-17153936 ] Chris Egerton commented on KAFKA-10218: --- [~rhauch] The primary impact here is that there are erroneous and confusing log messages that start to get emitted as soon as the worker enters a bad state and there is no fix except to restart the worker. The fix and the test are fairly simple and I think warrant review before the upcoming 2.7 release so that we can include them in upcoming bug fix releases before then. > DistributedHerder's canReadConfigs field is never reset to true > --- > > Key: KAFKA-10218 > URL: https://issues.apache.org/jira/browse/KAFKA-10218 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2, 2.3.0, 2.1.2, > 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.2.3, 2.5.0, 2.3.2, 2.4.1, 2.6.0, 2.4.2, 2.5.1, > 2.7.0, 2.5.2, 2.6.1 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > If the {{DistributedHerder}} encounters issues reading to the end of the > config topic, it [takes note of this > fact|https://github.com/apache/kafka/blob/7db52a46b00eed652e791dd4eae809d590626a1f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1109] > by setting a field {{canReadConfigs}} to {{false}} and then acts accordingly > at the [start of its tick > loop|https://github.com/apache/kafka/blob/7db52a46b00eed652e791dd4eae809d590626a1f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L319] > by trying again to read to the end of the config topic. However, if a > subsequent attempt to read to the end of the config topic succeeds, the > {{canReadConfigs}} field is never set back to {{true}} again, so no matter > what, the herder will always attempt to read to the end of the config topic > at the beginning of each tick. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #8994: KAFKA-10247: Skip processing if task isn't running
vvcephei commented on pull request #8994: URL: https://github.com/apache/kafka/pull/8994#issuecomment-655721490 Hey @abbccdda and @mjsax , thanks for taking a look! The PR is not complete yet. I'll let you know when it's ready for review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8770) Either switch to or add an option for emit-on-change
[ https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153975#comment-17153975 ] John Roesler commented on KAFKA-8770: - Hi [~davispw] , That's a good idea. I'll split it up. The portion implemented so far is this: {code:java} commit f54cece73e6116566979bcf6a865d803b7c18974 Author: Richard Yu Date: Tue May 12 11:19:32 2020 -0700KAFKA-8770: KIP-557: Drop idempotent KTable source updates (#8254) Drops idempotent updates from KTable source operators. Specifically, drop updates in which the value is unchanged, and the timestamp is the same or larger. Implements: KIP-557 Reviewers: Bruno Cadonna , John Roesler {code} https://github.com/apache/kafka/pull/8254 > Either switch to or add an option for emit-on-change > > > Key: KAFKA-8770 > URL: https://issues.apache.org/jira/browse/KAFKA-8770 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-kip > > Currently, Streams offers two emission models: > * emit-on-window-close: (using Suppression) > * emit-on-update: (i.e., emit a new result whenever a new record is > processed, regardless of whether the result has changed) > There is also an option to drop some intermediate results, either using > caching or suppression. > However, there is no support for emit-on-change, in which results would be > forwarded only if the result has changed. This has been reported to be > extremely valuable as a performance optimizations for some high-traffic > applications, and it reduces the computational burden both internally for > downstream Streams operations, as well as for external systems that consume > the results, and currently have to deal with a lot of "no-op" changes. > It would be pretty straightforward to implement this, by loading the prior > results before a stateful operation and comparing with the new result before > persisting or forwarding. In many cases, we load the prior result anyway, so > it may not be a significant performance impact either. > One design challenge is what to do with timestamps. If we get one record at > time 1 that produces a result, and then another at time 2 that produces a > no-op, what should be the timestamp of the result, 1 or 2? emit-on-change > would require us to say 1. > Clearly, we'd need to do some serious benchmarks to evaluate any potential > implementation of emit-on-change. > Another design challenge is to decide if we should just automatically provide > emit-on-change for stateful operators, or if it should be configurable. > Configuration increases complexity, so unless the performance impact is high, > we may just want to change the emission model without a configuration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10248) Drop idempotent KTable source updates
John Roesler created KAFKA-10248: Summary: Drop idempotent KTable source updates Key: KAFKA-10248 URL: https://issues.apache.org/jira/browse/KAFKA-10248 Project: Kafka Issue Type: Sub-task Reporter: John Roesler Assignee: Richard Yu Implement KIP-557 for KTable source nodes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10248) Drop idempotent KTable source updates
[ https://issues.apache.org/jira/browse/KAFKA-10248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153976#comment-17153976 ] John Roesler commented on KAFKA-10248: -- This was implemented in this commit: {code:java} commit f54cece73e6116566979bcf6a865d803b7c18974 Author: Richard Yu Date: Tue May 12 11:19:32 2020 -0700KAFKA-8770: KIP-557: Drop idempotent KTable source updates (#8254) Drops idempotent updates from KTable source operators. Specifically, drop updates in which the value is unchanged, and the timestamp is the same or larger. Implements: KIP-557 Reviewers: Bruno Cadonna , John Roesler {code} In this PR: https://github.com/apache/kafka/pull/8254 > Drop idempotent KTable source updates > - > > Key: KAFKA-10248 > URL: https://issues.apache.org/jira/browse/KAFKA-10248 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Assignee: Richard Yu >Priority: Major > > Implement KIP-557 for KTable source nodes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10248) Drop idempotent KTable source updates
[ https://issues.apache.org/jira/browse/KAFKA-10248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10248: - Fix Version/s: 2.6.0 > Drop idempotent KTable source updates > - > > Key: KAFKA-10248 > URL: https://issues.apache.org/jira/browse/KAFKA-10248 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Assignee: Richard Yu >Priority: Major > Fix For: 2.6.0 > > > Implement KIP-557 for KTable source nodes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10248) Drop idempotent KTable source updates
[ https://issues.apache.org/jira/browse/KAFKA-10248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-10248. -- Resolution: Fixed > Drop idempotent KTable source updates > - > > Key: KAFKA-10248 > URL: https://issues.apache.org/jira/browse/KAFKA-10248 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Assignee: Richard Yu >Priority: Major > Fix For: 2.6.0 > > > Implement KIP-557 for KTable source nodes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8770) Either switch to or add an option for emit-on-change
[ https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153977#comment-17153977 ] John Roesler commented on KAFKA-8770: - Ok, I created (and resolved) a subtask, which is properly labeled as implemented in 2.6.0: https://issues.apache.org/jira/browse/KAFKA-10248 [~Yohan123], can you create a subtask for the portion you're planning to do next? Then, we can target the PR at that ticket instead of this one to keep everything tidy. > Either switch to or add an option for emit-on-change > > > Key: KAFKA-8770 > URL: https://issues.apache.org/jira/browse/KAFKA-8770 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-kip > > Currently, Streams offers two emission models: > * emit-on-window-close: (using Suppression) > * emit-on-update: (i.e., emit a new result whenever a new record is > processed, regardless of whether the result has changed) > There is also an option to drop some intermediate results, either using > caching or suppression. > However, there is no support for emit-on-change, in which results would be > forwarded only if the result has changed. This has been reported to be > extremely valuable as a performance optimizations for some high-traffic > applications, and it reduces the computational burden both internally for > downstream Streams operations, as well as for external systems that consume > the results, and currently have to deal with a lot of "no-op" changes. > It would be pretty straightforward to implement this, by loading the prior > results before a stateful operation and comparing with the new result before > persisting or forwarding. In many cases, we load the prior result anyway, so > it may not be a significant performance impact either. > One design challenge is what to do with timestamps. If we get one record at > time 1 that produces a result, and then another at time 2 that produces a > no-op, what should be the timestamp of the result, 1 or 2? emit-on-change > would require us to say 1. > Clearly, we'd need to do some serious benchmarks to evaluate any potential > implementation of emit-on-change. > Another design challenge is to decide if we should just automatically provide > emit-on-change for stateful operators, or if it should be configurable. > Configuration increases complexity, so unless the performance impact is high, > we may just want to change the emission model without a configuration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #8995: Restore stream-table duality description
mjsax commented on a change in pull request #8995: URL: https://github.com/apache/kafka/pull/8995#discussion_r451796320 ## File path: docs/streams/core-concepts.html ## @@ -170,13 +150,59 @@ Duality of or to run interactive queries against your application's latest processing results. And, beyond its internal usage, the Kafka Streams API also allows developers to exploit this duality in their own applications. - + - + Before we discuss concepts such as aggregations in Kafka Streams, we must first introduce tables in more detail, and talk about the aforementioned stream-table duality. - Essentially, this duality means that a stream can be viewed as a table, and a table can be viewed as a stream. - + Essentially, this duality means that a stream can be viewed as a table, and a table can be viewed as a stream. Kafka's log compaction feature, for example, exploits this duality. + + + +A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows: + + + +The stream-table duality describes the close relationship between streams and tables. + +Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a "real" table by replaying the changelog from beginning to end to reconstruct the table. Similarly, in a more general analogy, aggregating data records in a stream - such as computing the total number of pageviews by user from a stream of pageview events - will return a table (here with the key and the value being the user and its corresponding pageview count, respectively). +Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream's data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a "real" stream by iterating over each key-value entry in the table. + + + +Let's illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time - and different revisions of the table - can be represented as a changelog stream (second column). + + + + +Interestingly, because of the stream-table duality, the same stream can be used to reconstruct the original table (third column): + + + + +The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault-tolerance. +The stream-table duality is such an important concept that Kafka Streams models it explicitly via the KStream, KTable, and GlobalKTable interfaces. + + +Aggregations + +An aggregation operation takes one input stream or table, and yields a new table by combining multiple input records into a single output record. Examples of aggregations are computing counts or sum. + + + +In the Kafka Streams DSL, an input stream of an aggregation can be a KStream or a KTable, but the output stream will always be a KTable. This allows Kafka Streams to update an aggregate value upon the out-of-order arrival of further records after the value was produced and emitted. When such out-of-order arrival happens, the aggregating KStream or KTable emits a new aggregate value. Because the output is a KTable, the new value is considered to overwrite the old value with the same key in subsequent processing steps. Review comment: Can we avoid those super long lines? Similar below. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8974: KAFKA-10225 Increase default zk session timeout for system tests
junrao commented on pull request #8974: URL: https://github.com/apache/kafka/pull/8974#issuecomment-655734288 System test results. All 9 failures seem unrelated to this PR. http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-07-07--001.1594186925--chia7712--KAFKA-10225--da9265760/report.html This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao merged pull request #8974: KAFKA-10225 Increase default zk session timeout for system tests
junrao merged pull request #8974: URL: https://github.com/apache/kafka/pull/8974 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10225) Increase default zk session timeout for system tests
[ https://issues.apache.org/jira/browse/KAFKA-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-10225. - Fix Version/s: 2.7.0 Resolution: Fixed merged the PR to trunk > Increase default zk session timeout for system tests > > > Key: KAFKA-10225 > URL: https://issues.apache.org/jira/browse/KAFKA-10225 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Fix For: 2.7.0 > > > I'm digging in the flaky system tests and then I noticed there are many flaky > caused by following check. > {code} > with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as > monitor: > node.account.ssh(cmd) > # Kafka 1.0.0 and higher don't have a space between "Kafka" and > "Server" > monitor.wait_until("Kafka\s*Server.*started", > timeout_sec=timeout_sec, backoff_sec=.25, >err_msg="Kafka server didn't finish startup in > %d seconds" % timeout_sec) > {code} > And the error message in broker log is shown below. > {quote} > kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for > connection while in state: CONNECTING > at > kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:262) > at kafka.zookeeper.ZooKeeperClient.(ZooKeeperClient.scala:119) > at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1880) > at kafka.server.KafkaServer.createZkClient$1(KafkaServer.scala:430) > at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:455) > at kafka.server.KafkaServer.startup(KafkaServer.scala:227) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) > at kafka.Kafka$.main(Kafka.scala:82) > at kafka.Kafka.main(Kafka.scala) > {quote} > I'm surprised the default timeout of zk connection in system test is only 2 > seconds as the default timeout in production is increased to 18s (see > https://github.com/apache/kafka/commit/4bde9bb3ccaf5571be76cb96ea051dadaeeaf5c7) > {code} > config_property.ZOOKEEPER_CONNECTION_TIMEOUT_MS: 2000 > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #8981: KAFKA-10235 Fix flaky transactions_test.py
junrao commented on a change in pull request #8981: URL: https://github.com/apache/kafka/pull/8981#discussion_r451224908 ## File path: tests/kafkatest/tests/core/transactions_test.py ## @@ -47,7 +47,10 @@ def __init__(self, test_context): self.num_output_partitions = 3 self.num_seed_messages = 10 self.transaction_size = 750 -self.transaction_timeout = 4 +# This is reducing the transaction cleanup interval. +# IN hard_bounce mode, transaction is broke ungracefully. Hence, it produces unstable +# offsets which obstructs TransactionalMessageCopier from receiving position of group. +self.transaction_timeout = 5000 Review comment: @chia7712 : Thanks for the reply. That makes sense. Perhaps we can adjust the comment to make it clear that the hard bounce is for the client. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8981: KAFKA-10235 Fix flaky transactions_test.py
junrao commented on pull request #8981: URL: https://github.com/apache/kafka/pull/8981#issuecomment-655737852 triggered a system test run This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JimGalasyn commented on a change in pull request #8995: Restore stream-table duality description
JimGalasyn commented on a change in pull request #8995: URL: https://github.com/apache/kafka/pull/8995#discussion_r451809936 ## File path: docs/streams/core-concepts.html ## @@ -170,13 +150,59 @@ Duality of or to run interactive queries against your application's latest processing results. And, beyond its internal usage, the Kafka Streams API also allows developers to exploit this duality in their own applications. - + - + Before we discuss concepts such as aggregations in Kafka Streams, we must first introduce tables in more detail, and talk about the aforementioned stream-table duality. - Essentially, this duality means that a stream can be viewed as a table, and a table can be viewed as a stream. - + Essentially, this duality means that a stream can be viewed as a table, and a table can be viewed as a stream. Kafka's log compaction feature, for example, exploits this duality. + + + +A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows: + + + +The stream-table duality describes the close relationship between streams and tables. + +Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a "real" table by replaying the changelog from beginning to end to reconstruct the table. Similarly, in a more general analogy, aggregating data records in a stream - such as computing the total number of pageviews by user from a stream of pageview events - will return a table (here with the key and the value being the user and its corresponding pageview count, respectively). +Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream's data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a "real" stream by iterating over each key-value entry in the table. + + + +Let's illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time - and different revisions of the table - can be represented as a changelog stream (second column). + + + + +Interestingly, because of the stream-table duality, the same stream can be used to reconstruct the original table (third column): + + + + +The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault-tolerance. +The stream-table duality is such an important concept that Kafka Streams models it explicitly via the KStream, KTable, and GlobalKTable interfaces. + + +Aggregations + +An aggregation operation takes one input stream or table, and yields a new table by combining multiple input records into a single output record. Examples of aggregations are computing counts or sum. + + + +In the Kafka Streams DSL, an input stream of an aggregation can be a KStream or a KTable, but the output stream will always be a KTable. This allows Kafka Streams to update an aggregate value upon the out-of-order arrival of further records after the value was produced and emitted. When such out-of-order arrival happens, the aggregating KStream or KTable emits a new aggregate value. Because the output is a KTable, the new value is considered to overwrite the old value with the same key in subsequent processing steps. Review comment: Sure, but that's the style throughout the Kafka docs. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10249) In-memory stores are skipped when checkpointing but not skipped when reading the checkpoint
Sophie Blee-Goldman created KAFKA-10249: --- Summary: In-memory stores are skipped when checkpointing but not skipped when reading the checkpoint Key: KAFKA-10249 URL: https://issues.apache.org/jira/browse/KAFKA-10249 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.0 Reporter: Sophie Blee-Goldman Assignee: Sophie Blee-Goldman As the title suggests, offsets for in-memory stores (including the suppression buffer) are not written to the checkpoint file. However, when reading from the checkpoint file during task initialization, we do not check StateStore#persistent. We attempt to look up the offsets for in-memory stores in the checkpoint file, and obviously do not find them. With eos we have to conclude that the existing state is dirty and thus throw a TaskCorruptedException. So pretty much any task with in-memory state will always hit this exception when reinitializing from the checkpoint, forcing it to clear the entire state directory and build up all of its state again from scratch (both persistent and in-memory). This is especially unfortunate for KIP-441, as we will hit this any time a task is moved from one thread to another. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on a change in pull request #8928: KAFKA-10192: Wait for REST API to become available before testing blocked connectors
kkonstantine commented on a change in pull request #8928: URL: https://github.com/apache/kafka/pull/8928#discussion_r451832657 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java ## @@ -76,6 +78,15 @@ public void setup() { // start the clusters connect.start(); + +// wait for the Connect REST API to become available. necessary because of the reduced REST +// request timeout; otherwise, we may get an unexpected 500 with our first real REST request +// if the worker is still getting on its feet. +waitForCondition( Review comment: Interesting observation. Of course, hitting the leader with a request doesn't tell you that other workers have started, so that's applicable in tests like this one, which start only one worker here, etc. This doesn't seem to be a race condition we encounter often, so I'm fine with an ad hoc specific fix here given the reduced timeout. I'd be surprised if it took noticeable time to load the services after the herder is submitted to its executor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8928: KAFKA-10192: Wait for REST API to become available before testing blocked connectors
kkonstantine commented on pull request #8928: URL: https://github.com/apache/kafka/pull/8928#issuecomment-655764773 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
ableegoldman opened a new pull request #8996: URL: https://github.com/apache/kafka/pull/8996 We have this asymmetry in how the ProcessorStateManager handles in-memory stores: we explicitly skip over them when writing offsets to the checkpoint file, but don't do the same when reading from the checkpoint file to initialize offsets. With eos, this is taken to mean that the state is dirty, and thus we mistakenly mark the entire task as corrupted. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10250) Kafka broker shrinks the ISRs and disconnects from other brokers for few seconds
Rishank Chandra Puram created KAFKA-10250: - Summary: Kafka broker shrinks the ISRs and disconnects from other brokers for few seconds Key: KAFKA-10250 URL: https://issues.apache.org/jira/browse/KAFKA-10250 Project: Kafka Issue Type: Bug Components: build, config, consumer, controller, log, producer Affects Versions: 2.0.0 Reporter: Rishank Chandra Puram The following the summary/overview of the whole issue. Can you please help us look into the below and let us know you thoughts on what caused the issue? And how to mitigate this in the future Issue: 1. All of a sudden all the other brokers in cluster report have issues with one of the broker as below Error for good broker [broker: broker2, brokerID: 1030] [2020-06-27 16:14:53,367] WARN [ReplicaFetcher replicaId=1030, leaderId=1029, fetcherId=13] Error in response for fetch request (type=FetchRequest, replicaId=1030, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1901394956, epoch=1018699)) (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 1029 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:240) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:43) at kafka.server.AbstractFetcherThread.prabcssFetchRequest(AbstractFetcherThread.scala:149) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:114) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) Error from affected broker [broker: broker6, brokerID: 1029] [2020-06-27 16:14:25,744] INFO [Partition topic_test2-33 broker=1029] Shrinking ISR from 1025,1029,1030 to 1025,1029 (kafka.cluster.Partition) [2020-06-27 16:14:25,752] INFO [Partition topic_a_restate-34 broker=1029] Shrinking ISR from 1027,1029,1028 to 1029 (kafka.cluster.Partition) [2020-06-27 16:14:25,760] INFO [Partition topic_f_restate-39 broker=1029] Shrinking ISR from 1026,1029,1025 to 1029,1025 (kafka.cluster.Partition) [2020-06-27 16:14:25,772] INFO [Partition topic_test2-16 broker=1029] Shrinking ISR from 1028,1029,1030 to 1029,1030 (kafka.cluster.Partition) [2020-06-27 16:14:26,683] INFO [ProducerStateManager partition=topic_abc_f-21] Writing producer snapshot at offset 1509742173 (kafka.log.ProducerStateManager) [2020-06-27 16:14:26,683] INFO [Log partition=topic_abc_f-21, dir=/hadoop-e/kafka/data1] Rolled new log segment at offset 1509742173 in 1 ms. (kafka.log.Log) [2020-06-27 16:14:55,375] INFO [Partition topic_test2-33 broker=1029] Expanding ISR from 1025,1029 to 1025,1029,1030 (kafka.cluster.Partition) [2020-06-27 16:14:55,387] INFO [Partition test-topic-analysis-9 broker=1029] Expanding ISR from 1028,1029 to 1028,1029,1030 (kafka.cluster.Partition) 2. Entire kafka cluster becomes stable within few minutes Trace for good broker [broker: broker2, brokerID: 1030] [2020-06-27 16:20:14,861] INFO Deleted time index /hadoop-g/kafka/data1/topic-analysis-0/009100172512.timeindex.deleted. (kafka.log.LogSegment) [2020-06-27 16:20:14,882] INFO [Log partition=topic-analysis-4, dir=/hadoop-b/kafka/data1] Deleting segment 9100010843 (kafka.log.Log) [2020-06-27 16:20:14,883] INFO Deleted log /hadoop-b/kafka/data1/topic-analysis-4/009100010843.log.deleted. (kafka.log.LogSegment) [2020-06-27 16:20:14,897] INFO Deleted offset index /hadoop-b/kafka/data1/topic-analysis-4/009100010843.index.deleted. (kafka.log.LogSegment) [2020-06-27 16:20:14,897] INFO Deleted time index /hadoop-b/kafka/data1/topic-analysis-4/009100010843.timeindex.deleted. (kafka.log.LogSegment) Trace from affected broker [broker: broker6, brokerID: 1029] [2020-06-27 16:21:01,552] INFO [Log partition=topic-analysis-22, dir=/hadoop-i/kafka/data1] Scheduling log segment [baseOffset 9099830344, size 1073733482] for deletion. (kafka.log.Log) [2020-06-27 16:21:01,553] INFO [Log partition=topic-analysis-22, dir=/hadoop-i/kafka/data1] Scheduling log segment [baseOffset 9100082067, size 1073738790] for deletion. (kafka.log.Log) [2020-06-27 16:21:01,553] INFO [Log partition=topic-analysis-22, dir=/hadoop-i/kafka/data1] Incrementing log start offset to 9100334713 (kafka.log.Log) [2020-06-27 16:21:01,581] INFO Cleared earliest 0 entries from epoch cache based on passed offset 9100334713 leaving 1 in EpochFile for partition topic-analysis-22 (kafka.server.epoch.LeaderEpochFileCache) [2020-06-27 16:22:00,175] INFO [Log partition=topic_def_c-1, dir=/hadoop-j/kafka/data1] Deleting segment 1628853412 (kafka.log.L
[jira] [Commented] (KAFKA-10250) Kafka broker shrinks the ISRs and disconnects from other brokers for few seconds
[ https://issues.apache.org/jira/browse/KAFKA-10250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154043#comment-17154043 ] Rishank Chandra Puram commented on KAFKA-10250: --- 1. When the issue occurs, they're able to produce/consume messages to/from Kafka using a simple command line. 2. The issue occurs suddenly in broker 1029 but also could occur in other broker, this behavior it's completely random, as we had issues with two different brokers at different times [2020-06-27 16:14:23,800] INFO [GroupCoordinator 1029]: Member consumer-1-3dfeffb9-c8ca-4840-878e-3126e9750015 in group authapiprod02062020 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [2020-06-27 16:14:23,802] INFO [GroupCoordinator 1029]: Stabilized group authapiprod02062020 generation 178 (__consumer_offsets-40) (kafka.coordinator.group.GroupCoordinator) [2020-06-27 16:14:23,805] INFO [GroupCoordinator 1029]: Assignment received from leader for group authapiprod02062020 for generation 178 (kafka.coordinator.group.GroupCoordinator) [2020-06-27 16:14:25,494] INFO [Partition topic_rejected_test-30 broker=1029] Shrinking ISR from 1025,1029,1030 to 1025,1029 (kafka.cluster.Partition) After 30 seconds we see expanding messages: [2020-06-27 16:14:25,772] INFO [Partition topic_test2-16 broker=1029] Shrinking ISR from 1028,1029,1030 to 1029,1030 (kafka.cluster.Partition) [2020-06-27 16:14:55,375] INFO [Partition topic_analysis-33 broker=1029] Expanding ISR from 1025,1029 to 1025,1029,1030 (kafka.cluster.Partition) [2020-06-27 16:14:55,387] INFO [Partition topic_rejected-9 broker=1029] Expanding ISR from 1028,1029 to 1028,1029,1030 (kafka.cluster.Partition) We reviewed during the issue: - lsof --> 75000 of 20 - No high CPU found in grafana during the issue - Thread dumps are not showing any stuck threads - iostat looks fine - During the issue we check for --under-replicated-partitions but after the broker 1029 it's expanding, no under replicated partitions seen - We reviewed gc log files and the memory looks fine, we currently have 12g configured and the liveset is about 4g - netstat output is not showing any abnormal behavior, this is from broker 1029: 50659539242 total packets received 52633502320 requests sent out 88095112 segments retransmited 0 receive buffer errors 0 send buffer errors 379 resets received for embryonic SYN_RECV sockets 629655 TCP sockets finished time wait in fast timer 550847 delayed acks further delayed because of locked socket 11151 times the listen queue of a socket overflowed 11151 SYNs to LISTEN sockets dropped > Kafka broker shrinks the ISRs and disconnects from other brokers for few > seconds > > > Key: KAFKA-10250 > URL: https://issues.apache.org/jira/browse/KAFKA-10250 > Project: Kafka > Issue Type: Bug > Components: build, config, consumer, controller, log, producer >Affects Versions: 2.0.0 >Reporter: Rishank Chandra Puram >Priority: Major > > The following the summary/overview of the whole issue. Can you please help us > look into the below and let us know you thoughts on what caused the issue? > And how to mitigate this in the future > Issue: > 1.All of a sudden all the other brokers in cluster report have issues > with one of the broker as below > > Error for good broker [broker: broker2, brokerID: 1030] > [2020-06-27 16:14:53,367] WARN [ReplicaFetcher replicaId=1030, leaderId=1029, > fetcherId=13] Error in response for fetch request (type=FetchRequest, > replicaId=1030, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1901394956, > epoch=1018699)) (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 1029 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:240) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:43) > at > kafka.server.AbstractFetcherThread.prabcssFetchRequest(AbstractFetcherThread.scala:149) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:114) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > > Error from affected broker [broker: broker6, brokerID: 1029] > [2020-06-27 16:14:25,744] INFO [Partition topic_test2-33 broker=1029] > Shrinking ISR from 1025,1029,1030 to 1025,1029 (kafka.cluster.Partition) > [2020-06-27 16:14:25,7
[jira] [Updated] (KAFKA-10250) Kafka broker shrinks the ISRs and disconnects from other brokers for few seconds
[ https://issues.apache.org/jira/browse/KAFKA-10250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rishank Chandra Puram updated KAFKA-10250: -- Description: The following the summary/overview of the whole issue. Can you please help us look into the below and let us know you thoughts on what caused the issue? And how to mitigate this in the future Issue: 1. All of a sudden all the other brokers in cluster report have issues with one of the broker as below Error for good broker [broker: broker2, brokerID: 1030] [2020-06-27 16:14:53,367] WARN [ReplicaFetcher replicaId=1030, leaderId=1029, fetcherId=13] Error in response for fetch request (type=FetchRequest, replicaId=1030, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1901394956, epoch=1018699)) (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 1029 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:240) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:43) at kafka.server.AbstractFetcherThread.prabcssFetchRequest(AbstractFetcherThread.scala:149) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:114) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) Error from affected broker [broker: broker6, brokerID: 1029] [2020-06-27 16:14:25,744] INFO [Partition topic_test2-33 broker=1029] Shrinking ISR from 1025,1029,1030 to 1025,1029 (kafka.cluster.Partition) [2020-06-27 16:14:25,752] INFO [Partition topic_a_restate-34 broker=1029] Shrinking ISR from 1027,1029,1028 to 1029 (kafka.cluster.Partition) [2020-06-27 16:14:25,760] INFO [Partition topic_f_restate-39 broker=1029] Shrinking ISR from 1026,1029,1025 to 1029,1025 (kafka.cluster.Partition) [2020-06-27 16:14:25,772] INFO [Partition topic_test2-16 broker=1029] Shrinking ISR from 1028,1029,1030 to 1029,1030 (kafka.cluster.Partition) [2020-06-27 16:14:26,683] INFO [ProducerStateManager partition=topic_abc_f-21] Writing producer snapshot at offset 1509742173 (kafka.log.ProducerStateManager) [2020-06-27 16:14:26,683] INFO [Log partition=topic_abc_f-21, dir=/hadoop-e/kafka/data1] Rolled new log segment at offset 1509742173 in 1 ms. (kafka.log.Log) [2020-06-27 16:14:55,375] INFO [Partition topic_test2-33 broker=1029] Expanding ISR from 1025,1029 to 1025,1029,1030 (kafka.cluster.Partition) [2020-06-27 16:14:55,387] INFO [Partition test-topic-analysis-9 broker=1029] Expanding ISR from 1028,1029 to 1028,1029,1030 (kafka.cluster.Partition) 2. Entire kafka cluster becomes stable within few minutes Trace for good broker [broker: broker2, brokerID: 1030] [2020-06-27 16:20:14,861] INFO Deleted time index /hadoop-g/kafka/data1/topic-analysis-0/009100172512.timeindex.deleted. (kafka.log.LogSegment) [2020-06-27 16:20:14,882] INFO [Log partition=topic-analysis-4, dir=/hadoop-b/kafka/data1] Deleting segment 9100010843 (kafka.log.Log) [2020-06-27 16:20:14,883] INFO Deleted log /hadoop-b/kafka/data1/topic-analysis-4/009100010843.log.deleted. (kafka.log.LogSegment) [2020-06-27 16:20:14,897] INFO Deleted offset index /hadoop-b/kafka/data1/topic-analysis-4/009100010843.index.deleted. (kafka.log.LogSegment) [2020-06-27 16:20:14,897] INFO Deleted time index /hadoop-b/kafka/data1/topic-analysis-4/009100010843.timeindex.deleted. (kafka.log.LogSegment) Trace from affected broker [broker: broker6, brokerID: 1029] [2020-06-27 16:21:01,552] INFO [Log partition=topic-analysis-22, dir=/hadoop-i/kafka/data1] Scheduling log segment [baseOffset 9099830344, size 1073733482] for deletion. (kafka.log.Log) [2020-06-27 16:21:01,553] INFO [Log partition=topic-analysis-22, dir=/hadoop-i/kafka/data1] Scheduling log segment [baseOffset 9100082067, size 1073738790] for deletion. (kafka.log.Log) [2020-06-27 16:21:01,553] INFO [Log partition=topic-analysis-22, dir=/hadoop-i/kafka/data1] Incrementing log start offset to 9100334713 (kafka.log.Log) [2020-06-27 16:21:01,581] INFO Cleared earliest 0 entries from epoch cache based on passed offset 9100334713 leaving 1 in EpochFile for partition topic-analysis-22 (kafka.server.epoch.LeaderEpochFileCache) [2020-06-27 16:22:00,175] INFO [Log partition=topic_def_c-1, dir=/hadoop-j/kafka/data1] Deleting segment 1628853412 (kafka.log.Log) [2020-06-27 16:22:00,175] INFO Deleted log /hadoop-j/kafka/data1/topic_def_c-1/001628853412.log.deleted. (kafka.log.LogSegment) 3. Once the disconnect would happen all the producer jobs will fail stating "Caused by: org.apache.kafka.common.errors
[GitHub] [kafka] mjsax commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
mjsax commented on pull request #8996: URL: https://github.com/apache/kafka/pull/8996#issuecomment-655783477 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10250) Kafka broker shrinks the ISRs and disconnects from other brokers for few seconds
[ https://issues.apache.org/jira/browse/KAFKA-10250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154043#comment-17154043 ] Rishank Chandra Puram edited comment on KAFKA-10250 at 7/8/20, 10:09 PM: - 1. When the issue occurs and once broker connects back with other broker, we're able to produce/consume messages to/from Kafka using a simple command line.[even though the affected broker is not restarted] 2. The issue occurs suddenly in broker 1029 but also could occur in other broker, this behavior it's completely random, as we had issues with two different brokers at different times [2020-06-27 16:14:23,800] INFO [GroupCoordinator 1029]: Member consumer-1-3dfeffb9-c8ca-4840-878e-3126e9750015 in group authapiprod02062020 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [2020-06-27 16:14:23,802] INFO [GroupCoordinator 1029]: Stabilized group authapiprod02062020 generation 178 (__consumer_offsets-40) (kafka.coordinator.group.GroupCoordinator) [2020-06-27 16:14:23,805] INFO [GroupCoordinator 1029]: Assignment received from leader for group authapiprod02062020 for generation 178 (kafka.coordinator.group.GroupCoordinator) [2020-06-27 16:14:25,494] INFO [Partition topic_rejected_test-30 broker=1029] Shrinking ISR from 1025,1029,1030 to 1025,1029 (kafka.cluster.Partition) After 30 seconds we see expanding messages: [2020-06-27 16:14:25,772] INFO [Partition topic_test2-16 broker=1029] Shrinking ISR from 1028,1029,1030 to 1029,1030 (kafka.cluster.Partition) [2020-06-27 16:14:55,375] INFO [Partition topic_analysis-33 broker=1029] Expanding ISR from 1025,1029 to 1025,1029,1030 (kafka.cluster.Partition) [2020-06-27 16:14:55,387] INFO [Partition topic_rejected-9 broker=1029] Expanding ISR from 1028,1029 to 1028,1029,1030 (kafka.cluster.Partition) We reviewed during the issue: - lsof --> 75000 of 20 - No high CPU found in grafana during the issue - Thread dumps are not showing any stuck threads - iostat looks fine - During the issue we check for --under-replicated-partitions but after the broker 1029 it's expanding, no under replicated partitions seen - We reviewed gc log files and the memory looks fine, we currently have 12g configured and the liveset is about 4g - netstat output is not showing any abnormal behavior, this is from broker 1029: 50659539242 total packets received 52633502320 requests sent out 88095112 segments retransmited 0 receive buffer errors 0 send buffer errors 379 resets received for embryonic SYN_RECV sockets 629655 TCP sockets finished time wait in fast timer 550847 delayed acks further delayed because of locked socket 11151 times the listen queue of a socket overflowed 11151 SYNs to LISTEN sockets dropped was (Author: rishankchandra): 1. When the issue occurs, they're able to produce/consume messages to/from Kafka using a simple command line. 2. The issue occurs suddenly in broker 1029 but also could occur in other broker, this behavior it's completely random, as we had issues with two different brokers at different times [2020-06-27 16:14:23,800] INFO [GroupCoordinator 1029]: Member consumer-1-3dfeffb9-c8ca-4840-878e-3126e9750015 in group authapiprod02062020 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [2020-06-27 16:14:23,802] INFO [GroupCoordinator 1029]: Stabilized group authapiprod02062020 generation 178 (__consumer_offsets-40) (kafka.coordinator.group.GroupCoordinator) [2020-06-27 16:14:23,805] INFO [GroupCoordinator 1029]: Assignment received from leader for group authapiprod02062020 for generation 178 (kafka.coordinator.group.GroupCoordinator) [2020-06-27 16:14:25,494] INFO [Partition topic_rejected_test-30 broker=1029] Shrinking ISR from 1025,1029,1030 to 1025,1029 (kafka.cluster.Partition) After 30 seconds we see expanding messages: [2020-06-27 16:14:25,772] INFO [Partition topic_test2-16 broker=1029] Shrinking ISR from 1028,1029,1030 to 1029,1030 (kafka.cluster.Partition) [2020-06-27 16:14:55,375] INFO [Partition topic_analysis-33 broker=1029] Expanding ISR from 1025,1029 to 1025,1029,1030 (kafka.cluster.Partition) [2020-06-27 16:14:55,387] INFO [Partition topic_rejected-9 broker=1029] Expanding ISR from 1028,1029 to 1028,1029,1030 (kafka.cluster.Partition) We reviewed during the issue: - lsof --> 75000 of 20 - No high CPU found in grafana during the issue - Thread dumps are not showing any stuck threads - iostat looks fine - During the issue we check for --under-replicated-partitions but after the broker 1029 it's expanding, no under replicated partitions seen - We reviewed gc log files and the memory looks fine, we currently have 12g configured and the liveset is about 4g - netstat output is not showing any abnormal behavior, this is from broker 1029: 50659539242 total packets received 52
[GitHub] [kafka] rhauch commented on a change in pull request #8864: KAFKA-9274: Mark `retries` config as deprecated and add new `task.timeout.ms` config
rhauch commented on a change in pull request #8864: URL: https://github.com/apache/kafka/pull/8864#discussion_r451856474 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java ## @@ -148,6 +148,7 @@ public KafkaStatusBackingStore(Time time, Converter converter) { this.statusTopic = statusTopic; } +@SuppressWarnings("deprecation") Review comment: > Not sure if we should rewrite this test right away First of all, this is not test code. It's actually part of the Connect runtime. Second, I'd have to look into what we would do differently and how we would set `deliver.timeout.ms` to work with the existing logic that handles retries. Since it's not clear, I suggest we keep the current behavior of setting `retries=0` as long as that still function. We're using the `ProducerConfig.RETRIES` constant, so that can't be removed without changing this code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8994: KAFKA-10247: Skip processing if task isn't running
vvcephei commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r451860750 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -766,6 +769,24 @@ void runOnce() { return records; } +private OffsetResetStrategy getResetStrategy(final TopicPartition partition) { +if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) { +return OffsetResetStrategy.EARLIEST; +} else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) { +return OffsetResetStrategy.LATEST; +} else { +if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) { +return OffsetResetStrategy.EARLIEST; +} Review comment: I honestly couldn't figure out what is the default default default reset strategy... It seems (from the behavior of the test when we first start up) that if there's no strategy set, and no committed offset, then the client starts at the beginning, but the ClientConfig has the default policy as "latest"... What gives? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -193,6 +197,28 @@ private void closeAndRevive(final Map> taskWith log.error("Error suspending corrupted task {} ", task.id(), swallow); } task.closeDirty(); +// Pause so we won't poll any more records for this task until it has been re-initialized +// Note, closeDirty already clears the partitiongroup for the task. +mainConsumer().pause(task.inputPartitions()); +final Map committed = mainConsumer().committed(task.inputPartitions()); +for (final TopicPartition topicPartition : task.inputPartitions()) { +final OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition); +if (offsetAndMetadata == null) { +final OffsetResetStrategy strategy = resetStrategy.apply(topicPartition); +switch (strategy) { +case EARLIEST: + mainConsumer().seekToBeginning(Collections.singleton(topicPartition)); +break; +case LATEST: + mainConsumer().seekToBeginning(Collections.singleton(topicPartition)); +break; +default: +throw new IllegalArgumentException("Unexpected reset strategy: " + strategy); +} +} else { +mainConsumer().seek(topicPartition, offsetAndMetadata); +} +} Review comment: This might be the worst thing I've ever proposed for AK... I can't figure out a better way to just "reset" the offset. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8994: KAFKA-10247: Skip processing if task isn't running
ableegoldman commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r451862588 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -766,6 +769,24 @@ void runOnce() { return records; } +private OffsetResetStrategy getResetStrategy(final TopicPartition partition) { +if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) { +return OffsetResetStrategy.EARLIEST; +} else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) { +return OffsetResetStrategy.LATEST; +} else { +if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) { +return OffsetResetStrategy.EARLIEST; +} Review comment: Does Streams override the client default..? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8994: KAFKA-10247: Skip processing if task isn't running
vvcephei commented on pull request #8994: URL: https://github.com/apache/kafka/pull/8994#issuecomment-655792712 Ok, @abbccdda and @mjsax , I think this is ready for a review now. I'm not very happy with the logic to reset to the last committed offset. Can you think of a better way to do it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #8977: KAFKA-10162; Quota Enhancements (KIP-599)
apovzner commented on a change in pull request #8977: URL: https://github.com/apache/kafka/pull/8977#discussion_r451871529 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ## @@ -97,7 +97,25 @@ public static RecordingLevel forName(String name) { public boolean shouldRecord(final int configId) { return configId == DEBUG.id || configId == this.id; } +} +public enum QuotaEnforcementType { +/** + * The quota is not enforced. + */ +NONE, + +/** + * The quota is enforced before updating the sensor. An update + * is rejected if the quota is already exhausted. Review comment: nit: somehow the first sentence is confusing because it seem to imply that the sensor gets updated in this case as well. Do you think maybe this sounds a bit clearer? "The check for quota violation is done prior to recording the value. If the quota is already exhausted, the value is not recorded into the sensor." Or perhaps some other Up to you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8994: KAFKA-10247: Skip processing if task isn't running
ableegoldman commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r451884449 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -95,7 +96,7 @@ * | | Assigned (3)| <+ * | +-+---+ | * || | - * || | + * ||--+ Review comment: What's up with this? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -193,6 +197,28 @@ private void closeAndRevive(final Map> taskWith log.error("Error suspending corrupted task {} ", task.id(), swallow); } task.closeDirty(); +// Pause so we won't poll any more records for this task until it has been re-initialized +// Note, closeDirty already clears the partitiongroup for the task. +mainConsumer().pause(task.inputPartitions()); +final Map committed = mainConsumer().committed(task.inputPartitions()); +for (final TopicPartition topicPartition : task.inputPartitions()) { +final OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition); +if (offsetAndMetadata == null) { +final OffsetResetStrategy strategy = resetStrategy.apply(topicPartition); +switch (strategy) { +case EARLIEST: + mainConsumer().seekToBeginning(Collections.singleton(topicPartition)); +break; +case LATEST: + mainConsumer().seekToBeginning(Collections.singleton(topicPartition)); Review comment: Should this be `seekToEnd` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest
ableegoldman commented on pull request #8913: URL: https://github.com/apache/kafka/pull/8913#issuecomment-655815316 Hey @vvcephei , do you think we should ask about cherrypicking this back to 2.6 to help stabilize the release? I thought failing system tests (or regularly failing tests of any kind) automatically block a release because it needs to get a green build to proceed. But I'm not familiar with the release/RM procedure. Maybe it's alright to just confirm that the test is failing because of the test itself? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
ableegoldman commented on pull request #8996: URL: https://github.com/apache/kafka/pull/8996#issuecomment-655825204 Two failures: `TransactionsBounceTest.testWithGroupMetadata` `SmokeTestDriverIntegrationTest.shouldWorkWithRebalance` The `shouldWorkWithRebalance` failure is actually pretty concerning, but I can't imagine how it could. possibly be related to this PR specifically. I'll see if I can reproduce it locally... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Skip processing if task isn't running
mjsax commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r451895176 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -193,6 +197,28 @@ private void closeAndRevive(final Map> taskWith log.error("Error suspending corrupted task {} ", task.id(), swallow); } task.closeDirty(); +// Pause so we won't poll any more records for this task until it has been re-initialized +// Note, closeDirty already clears the partitiongroup for the task. +mainConsumer().pause(task.inputPartitions()); +final Map committed = mainConsumer().committed(task.inputPartitions()); +for (final TopicPartition topicPartition : task.inputPartitions()) { +final OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition); +if (offsetAndMetadata == null) { +final OffsetResetStrategy strategy = resetStrategy.apply(topicPartition); +switch (strategy) { +case EARLIEST: + mainConsumer().seekToBeginning(Collections.singleton(topicPartition)); +break; +case LATEST: + mainConsumer().seekToBeginning(Collections.singleton(topicPartition)); +break; +default: +throw new IllegalArgumentException("Unexpected reset strategy: " + strategy); +} +} else { +mainConsumer().seek(topicPartition, offsetAndMetadata); +} +} Review comment: Why do you think is bad? That is just how the API works... Cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L769-L802 that does the same thing. What make we wonder, if we can share common code for both cases? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -137,7 +138,7 @@ STARTING(2, 3, 5),// 1 PARTITIONS_REVOKED(2, 3, 5), // 2 PARTITIONS_ASSIGNED(2, 3, 4, 5), // 3 -RUNNING(2, 3, 5), // 4 +RUNNING(2, 3, 4, 5), // 4 Review comment: Why do we need this? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -766,6 +769,24 @@ void runOnce() { return records; } +private OffsetResetStrategy getResetStrategy(final TopicPartition partition) { +if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) { +return OffsetResetStrategy.EARLIEST; +} else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) { +return OffsetResetStrategy.LATEST; +} else { +if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) { +return OffsetResetStrategy.EARLIEST; +} Review comment: > Does Streams override the client default..? Yes. The client default is "latest" but we use "earliest" by default (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L857). Of course, users can also change the default via StreamsConfig. Note that the consumer client can only apply a single strategy to all topics it subscribed to. Hence, if all topics use the same reset policy, we can rely on the consumer configures policy. However, if users specify different reset policies in their code via `Consumed` for individual topics, the consumer is re-configured to use "none" (cf. https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L362-L366) and we do a manual seekToBeginning/seekToEnd according to the user define strategy for the corresponding topic (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L762-L764) because we need to make a per-topic decision that the consumer cannot make for us. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -766,6 +769,24 @@ void runOnce() { return records; } +private OffsetResetStrategy getResetStrategy(final TopicPartition partition) { Review comment: Wondering if we should reuse this method within `StreamThread#resetInvalidOffsets`? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -1140,4 +1166,1
[GitHub] [kafka] chia7712 commented on a change in pull request #8981: KAFKA-10235 Fix flaky transactions_test.py
chia7712 commented on a change in pull request #8981: URL: https://github.com/apache/kafka/pull/8981#discussion_r451898731 ## File path: tests/kafkatest/tests/core/transactions_test.py ## @@ -47,7 +47,10 @@ def __init__(self, test_context): self.num_output_partitions = 3 self.num_seed_messages = 10 self.transaction_size = 750 -self.transaction_timeout = 4 +# This is reducing the transaction cleanup interval. +# IN hard_bounce mode, transaction is broke ungracefully. Hence, it produces unstable +# offsets which obstructs TransactionalMessageCopier from receiving position of group. +self.transaction_timeout = 5000 Review comment: > we can adjust the comment to make it clear that the hard bounce is for the client. done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
mjsax commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r451899726 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -224,6 +224,9 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { for (final StateStoreMetadata store : stores.values()) { if (store.changelogPartition == null) { log.info("State store {} is not logged and hence would not be restored", store.stateStore.name()); +} else if (!store.stateStore.persistent()) { +log.info("Initializing to the starting offset for changelog {} of in-memory state store {}", + store.changelogPartition, store.stateStore.name()); Review comment: Don't we need to call `store.setOffset(null)` or this case? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
mjsax commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r451900686 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -224,6 +224,9 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { for (final StateStoreMetadata store : stores.values()) { if (store.changelogPartition == null) { log.info("State store {} is not logged and hence would not be restored", store.stateStore.name()); +} else if (!store.stateStore.persistent()) { +log.info("Initializing to the starting offset for changelog {} of in-memory state store {}", + store.changelogPartition, store.stateStore.name()); Review comment: Just wondering about https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L769-L802 (can't comment below). For this case, if should hold that `store.offset() == changelogOffsetFromCheckpointedOffset(loadedCheckpoints.remove(store.changelogPartition))` ? Or maybe `>=`? Should we add a sanity check? (Not related to this PR itself actually. -- Just wondering.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
mjsax commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r451901975 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ## @@ -784,7 +784,7 @@ public void close() { } @Test -public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws IOException { +public void shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws IOException { final long checkpointOffset = 10L; final Map offsets = mkMap( Review comment: Not sure if I understand the test: don't we write a checkpoint in L795 for the persistent store? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
mjsax commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r451902131 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ## @@ -813,6 +813,62 @@ public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws IOExce } } +@Test +public void shouldNotThrowTaskCorruptedWithWithoutInMemoryStoreCheckpointAndNonEmptyDir() throws IOException { Review comment: `WithWithout` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
mjsax commented on a change in pull request #8902: URL: https://github.com/apache/kafka/pull/8902#discussion_r451903126 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,24 +45,23 @@ protected ProcessorNode currentNode; private long currentSystemTimeMs; -final StateManager stateManager; Review comment: Not sure why we need to move this from the abstract class into the child classes? ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -103,8 +104,12 @@ public void init(final ProcessorContext context, @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { +final String storeName = name(); +final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); serdes = new StateSerdes<>( -ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), + changelogTopic != null ? +changelogTopic : + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName), Review comment: It seems we don't need this if-then-else any longer as it's already taken care of within `ProcessorContextUtils.changelogFor`? (Similar for other classes.) ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -73,22 +73,23 @@ private final Set globalNonPersistentStoresTopics = new HashSet<>(); private final OffsetCheckpoint checkpointFile; private final Map checkpointFileCache; +private final Map storeToChangelogTopic; public GlobalStateManagerImpl(final LogContext logContext, final ProcessorTopology topology, final Consumer globalConsumer, final StateDirectory stateDirectory, final StateRestoreListener stateRestoreListener, final StreamsConfig config) { +storeToChangelogTopic = topology.storeToChangelogTopic(); Review comment: Wondering if we should pass `storeToChangelogTopic()` and `.globalStateStores()` into the constructor instead of `ProcessorTopology` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax closed pull request #8855: DO NOT MERGE
mjsax closed pull request #8855: URL: https://github.com/apache/kafka/pull/8855 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
Sophie Blee-Goldman created KAFKA-10251: --- Summary: Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata Key: KAFKA-10251 URL: https://issues.apache.org/jira/browse/KAFKA-10251 Project: Kafka Issue Type: Bug Components: core Reporter: Sophie Blee-Goldman h3. Stacktrace org.scalatest.exceptions.TestFailedException: Consumed 0 records before timeout instead of the expected 200 records at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at org.scalatest.Assertions.fail(Assertions.scala:1091) at org.scalatest.Assertions.fail$(Assertions.scala:1087) at org.scalatest.Assertions$.fail(Assertions.scala:1389) at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109) The logs are pretty much just this on repeat: {code:java} [2020-07-08 23:41:04,034] ERROR Error when sending message to topic output-topic with key: 9955, value: 9955 with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error when sending message to topic output-topic with key: 9959, value: 9959 with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
[ https://issues.apache.org/jira/browse/KAFKA-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154110#comment-17154110 ] Sophie Blee-Goldman commented on KAFKA-10251: - cc [~bchen225242] – [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3318/testReport/junit/kafka.api/TransactionsBounceTest/testWithGroupMetadata/] > Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata > - > > Key: KAFKA-10251 > URL: https://issues.apache.org/jira/browse/KAFKA-10251 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Sophie Blee-Goldman >Priority: Major > > h3. Stacktrace > org.scalatest.exceptions.TestFailedException: Consumed 0 records before > timeout instead of the expected 200 records at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1091) at > org.scalatest.Assertions.fail$(Assertions.scala:1087) at > org.scalatest.Assertions$.fail(Assertions.scala:1389) at > kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at > kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109) > > > The logs are pretty much just this on repeat: > {code:java} > [2020-07-08 23:41:04,034] ERROR Error when sending message to topic > output-topic with key: 9955, value: 9955 with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error > when sending message to topic output-topic with key: 9959, value: 9959 with > error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang opened a new pull request #8997: MINOR: Improve log4j for per-consumer assignment
guozhangwang opened a new pull request #8997: URL: https://github.com/apache/kafka/pull/8997 Add log4j entry summarizing the assignment (prev owned and assigned) at the consumer level. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8997: MINOR: Improve log4j for per-consumer assignment
guozhangwang commented on pull request #8997: URL: https://github.com/apache/kafka/pull/8997#issuecomment-655841394 @ableegoldman @abbccdda This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8997: MINOR: Improve log4j for per-consumer assignment
guozhangwang commented on a change in pull request #8997: URL: https://github.com/apache/kafka/pull/8997#discussion_r451912298 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java ## @@ -66,32 +72,17 @@ public ClientState() { prevActiveTasks = new TreeSet<>(); prevStandbyTasks = new TreeSet<>(); consumerToPreviousStatefulTaskIds = new TreeMap<>(); +consumerToPreviousActiveTaskIds = new TreeMap<>(); +consumerToAssignedActiveTaskIds = new TreeMap<>(); +consumerToAssignedStandbyTaskIds = new TreeMap<>(); +consumerToRevokingActiveTaskIds = new TreeMap<>(); ownedPartitions = new TreeMap<>(TOPIC_PARTITION_COMPARATOR); taskOffsetSums = new TreeMap<>(); taskLagTotals = new TreeMap<>(); this.capacity = capacity; } -private ClientState(final Set activeTasks, Review comment: This is not used anywhere. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -725,8 +725,10 @@ private boolean assignTasksToClients(final Set allSourceTopics, statefulTasks, assignmentConfigs); -log.info("Assigned tasks to clients as {}{}.", -Utils.NL, clientStates.entrySet().stream().map(Map.Entry::toString).collect(Collectors.joining(Utils.NL))); +log.info("Assigned tasks to clients as: {}{}.", Utils.NL, Review comment: This is unnecessarily verbose, plus part of it is replaced by line 960 below, so I trimmed a bit here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
[ https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154113#comment-17154113 ] Sophie Blee-Goldman commented on KAFKA-8940: Failed again: h3. Error Message java.lang.AssertionError: verifying tagg fail: key=133 tagg=[ConsumerRecord(topic = tagg, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1594252802631, serialized key size = 3, serialized value size = 8, headers = RecordHeaders(headers = [], isReadOnly = false), key = 133, value = 1)] expected=0 taggEvents: [ConsumerRecord(topic = tagg, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1594252802631, serialized key size = 3, serialized value size = 8, headers = RecordHeaders(headers = [], isReadOnly = false), key = 133, value = 1)] verifying suppressed min-suppressed verifying min-suppressed with 20 keys fail: resultCount=20 expectedCount=10 result=[[5-1004@159425280/159433920], [0-999@159425280/159433920], [8-1007@159416640/159425280], [2-1001@159416640/159425280], [7-1006@159425280/159433920], [1-1000@159425280/159433920], [3-1002@159416640/159425280], [9-1008@159416640/159425280], [4-1003@159416640/159425280], [6-1005@159425280/159433920], [8-1007@159425280/159433920], [5-1004@159416640/159425280], [2-1001@159425280/159433920], [4-1003@159425280/159433920], [6-1005@159416640/159425280], [0-999@159416640/159425280], [7-1006@159416640/159425280], [1-1000@159416640/159425280], [9-1008@159425280/159433920], [3-1002@159425280/159433920]] expected=[7-1006, 3-1002, 0-999, 1-1000, 4-1003, 2-1001, 5-1004, 8-1007, 6-1005, 9-1008] verifying suppressed sws-suppressed verifying min with 10 keys min fail: key=7-1006 actual=844 expected=7 > Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance > - > > Key: KAFKA-8940 > URL: https://issues.apache.org/jira/browse/KAFKA-8940 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Labels: flaky-test > Fix For: 2.5.0 > > > I lost the screen shot unfortunately... it reports the set of expected > records does not match the received records. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
ableegoldman commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r451922040 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -224,6 +224,9 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { for (final StateStoreMetadata store : stores.values()) { if (store.changelogPartition == null) { log.info("State store {} is not logged and hence would not be restored", store.stateStore.name()); +} else if (!store.stateStore.persistent()) { +log.info("Initializing to the starting offset for changelog {} of in-memory state store {}", + store.changelogPartition, store.stateStore.name()); Review comment: > Don't we need to call store.setOffset(null) or this case? Well, either it's a recycled task in which case no, we don't want to wipe out the existing offset, or it's a new task in which case it's initialized to `null` anyway. I'm also not sure what you mean in the second comment. Did you maybe paste the link to the wrong code? (just guessing since you linked to that same code earlier in John's PR) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
ableegoldman commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r451922421 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ## @@ -784,7 +784,7 @@ public void close() { } @Test -public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws IOException { +public void shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws IOException { final long checkpointOffset = 10L; final Map offsets = mkMap( Review comment: Yeah, I think that's the point of the test: we wrote a checkpoint but it was missing the offset for one of the persistent stores, thus, we should throw a TaskCorruptedException in `initializeStoreOffsetsFromCheckpoint` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org