[jira] [Commented] (KAFKA-10245) Using vulnerable log4j version

2020-07-08 Thread Tom Bentley (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153320#comment-17153320
 ] 

Tom Bentley commented on KAFKA-10245:
-

This is essentially a dupe of https://issues.apache.org/jira/browse/KAFKA-9366

> Using vulnerable log4j version
> --
>
> Key: KAFKA-10245
> URL: https://issues.apache.org/jira/browse/KAFKA-10245
> Project: Kafka
>  Issue Type: Bug
>  Components: core, KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Pavel Kuznetsov
>Priority: Major
>  Labels: security
>
> *Description*
> I checked kafka_2.12-2.5.0.tgz distribution with WhiteSource and find out 
> that log4j version, that used in kafka-connect and kafka-brocker, has 
> vulnerabilities
>  * log4j-1.2.17.jar has 
> [CVE-2019-17571|https://github.com/advisories/GHSA-2qrg-x229-3v8q] and 
> [CVE-2020-9488|https://github.com/advisories/GHSA-vwqq-5vrc-xw9h] 
> vulnerabilities. The way to fix it is to upgrade to 
> org.apache.logging.log4j:log4j-core:2.13.2
> *To Reproduce*
> Download kafka_2.12-2.5.0.tgz
> Open libs folder in it and find log4j-1.2.17.jar.
> Check [CVE-2019-17571|https://github.com/advisories/GHSA-2qrg-x229-3v8q] and 
> [CVE-2020-9488|https://github.com/advisories/GHSA-vwqq-5vrc-xw9h] to see that 
> log4j 1.2.17 is vulnerable.
> *Expected*
>  * log4j is log4j-core 2.13.2 or higher
> *Actual*
>  * log4j is 1.2.17



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10245) Using vulnerable log4j version

2020-07-08 Thread Tom Bentley (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Bentley resolved KAFKA-10245.
-
Resolution: Duplicate

> Using vulnerable log4j version
> --
>
> Key: KAFKA-10245
> URL: https://issues.apache.org/jira/browse/KAFKA-10245
> Project: Kafka
>  Issue Type: Bug
>  Components: core, KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Pavel Kuznetsov
>Priority: Major
>  Labels: security
>
> *Description*
> I checked kafka_2.12-2.5.0.tgz distribution with WhiteSource and find out 
> that log4j version, that used in kafka-connect and kafka-brocker, has 
> vulnerabilities
>  * log4j-1.2.17.jar has 
> [CVE-2019-17571|https://github.com/advisories/GHSA-2qrg-x229-3v8q] and 
> [CVE-2020-9488|https://github.com/advisories/GHSA-vwqq-5vrc-xw9h] 
> vulnerabilities. The way to fix it is to upgrade to 
> org.apache.logging.log4j:log4j-core:2.13.2
> *To Reproduce*
> Download kafka_2.12-2.5.0.tgz
> Open libs folder in it and find log4j-1.2.17.jar.
> Check [CVE-2019-17571|https://github.com/advisories/GHSA-2qrg-x229-3v8q] and 
> [CVE-2020-9488|https://github.com/advisories/GHSA-vwqq-5vrc-xw9h] to see that 
> log4j 1.2.17 is vulnerable.
> *Expected*
>  * log4j is log4j-core 2.13.2 or higher
> *Actual*
>  * log4j is 1.2.17



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9482) mirror maker 2.0 doesn't replicates (create topic) created automatically by producing messages

2020-07-08 Thread Jira

[ 
https://issues.apache.org/jira/browse/KAFKA-9482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153322#comment-17153322
 ] 

Aljoscha Pörtner commented on KAFKA-9482:
-

Maybe our problem is related to this. We see that the topic is created in the 
target cluster but after that no replication of messages takes place. After a 
restart of the mm2 everything works fine.

> mirror maker 2.0 doesn't replicates (create topic) created automatically by 
> producing messages
> --
>
> Key: KAFKA-9482
> URL: https://issues.apache.org/jira/browse/KAFKA-9482
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Mikhail Grinfeld
>Priority: Minor
>
> I have 2 kafka (3 brokers each) clusters and MirrorMaker instance.  (built 
> with docker and docker-compose)
> Both cluster is defined to create topics automatically (no 
> auto.create.topics.enable in conf and default, at least as it appears in 
> docs, is true).
> If topic created on source cluster before MirrorMaker starts, everything 
> works as expected: producig messages to source cluster, causes replication to 
> destination cluster.
> If topic doesn't exist on source cluster, when starting to produce messages, 
> it created in source cluster, but not in destination - and no replication 
> performed.
>  
> Using following mm2.properties:
> {code:java}
> # mm2.propertiesclusters=src,dest
> src.bootstrap.servers=kafka-1:9092,kafka-2:19092,kafka-3:29092
> dest.bootstrap.servers=kafka-4:39092,kafka-5:49092,kafka-6:59092
> src->dest.enabled=true
> src->dest.topics=.*
> {code}
> and running MirrorMaker with
> {code:java}
> connect-mirror-maker /etc/mm2.properties --clusters src dest
> {code}
> Note:
> when I am using Kafka-Streams to read from initial topic, there are few 
> KTable topics created automatically by Kafka-Stream - and these topics 
> created OK (of course, when initial topic created at the beginning)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand

2020-07-08 Thread GitBox

tombentley commented on pull request #8808:
URL: https://github.com/apache/kafka/pull/8808#issuecomment-655357176


   @rajinisivaram @omkreddy please could you review or suggest another 
committer for review?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-08 Thread Di Campo (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153365#comment-17153365
 ] 

Di Campo commented on KAFKA-4273:
-

Just tried it, and WindowStoreIterator.remove is an unsupported operation. 

So, using WindowStore, the solution could be as suggested: iterate all of the 
events to get the last one to see if the last one is . This trades a nice 
self-removal mechanism for some inefficient querying. If you expect low update 
frequency, that may just be your game; but on high number of update on 
duplicates performance may be affected (note here: only the changes need to be 
actually stored).

TTL in KV stores would be a good if not perfect fit for this case :)

> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on pull request #8979: KAFKA-10223; Make ReplicaNotAvailableException retriable metadata exception

2020-07-08 Thread GitBox

rajinisivaram commented on pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#issuecomment-655384902


   @ijuma @hachikuji @edenhill Thanks for the responses. I will follow up on 
KIP-392 discussion thread and update the PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-08 Thread Di Campo (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153365#comment-17153365
 ] 

Di Campo edited comment on KAFKA-4273 at 7/8/20, 8:56 AM:
--

Just tried it, and WindowStoreIterator.remove is an unsupported operation. 

So, using WindowStore, the solution could be as suggested: iterate all of the 
events to get the last one to see if the last one is . This trades a nice 
self-removal mechanism for some inefficient querying. If you expect low update 
frequency, that may just be your game; but on high number of update on 
duplicates performance may be affected (note here: only the changes need to be 
actually stored).

To implement some kind of TTL in KV, I suppose you can use punctuation to 
perform store deletion inside the scheduled punctuation. However that would 
make for a potentially long removal at a certain point in time. (BTW, does 
Punctuator execution stop the processing? In that case it would be dangerous to 
wait too long to do it ).

All in all, TTL in KV stores would be a good if not perfect fit for this case :)


was (Author: xmar):
Just tried it, and WindowStoreIterator.remove is an unsupported operation. 

So, using WindowStore, the solution could be as suggested: iterate all of the 
events to get the last one to see if the last one is . This trades a nice 
self-removal mechanism for some inefficient querying. If you expect low update 
frequency, that may just be your game; but on high number of update on 
duplicates performance may be affected (note here: only the changes need to be 
actually stored).

TTL in KV stores would be a good if not perfect fit for this case :)

> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] akatona84 opened a new pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest

2020-07-08 Thread GitBox

akatona84 opened a new pull request #8992:
URL: https://github.com/apache/kafka/pull/8992


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] akatona84 commented on pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest

2020-07-08 Thread GitBox

akatona84 commented on pull request #8992:
URL: https://github.com/apache/kafka/pull/8992#issuecomment-655458870


   Hi @ijuma , I've encountered a test failure here which caused unexpected 
threads in the following tests `@Before` phase.
   This fixes the issue in case failure happen, only that would be reported 
(and not together with .classMethod ending failures)
   
   Could you take a look on this please?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] akatona84 edited a comment on pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest

2020-07-08 Thread GitBox

akatona84 edited a comment on pull request #8992:
URL: https://github.com/apache/kafka/pull/8992#issuecomment-655458870


   Hi @ijuma , I've encountered a test failure here which caused unexpected 
threads in the following tests `@Before` phase.
   This fixes the issue in case failure happens, only that would be reported 
(and not together with .classMethod ending failures)
   
   Could you take a look on this please?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy edited a comment on pull request #8966: KAFKA-10220: add null check for configurationKey

2020-07-08 Thread GitBox

omkreddy edited a comment on pull request #8966:
URL: https://github.com/apache/kafka/pull/8966#issuecomment-655468333


   @tombentley @showuon Can you help me to understand where are we calling 
`DescribeConfigsRequestData.setConfigurationKeys()` in AdminClient?  looks like 
`ConfigurationKeys` are always null.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #8966: KAFKA-10220: add null check for configurationKey

2020-07-08 Thread GitBox

omkreddy commented on pull request #8966:
URL: https://github.com/apache/kafka/pull/8966#issuecomment-655468333


   @tombentley @showuon Can you help me to understand where are we calling 
`DescribeConfigsRequestData.setConfigurationKeys()` in AdminClient?  looks like 
we are always setting to null.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sasukerui commented on pull request #8742: KAFKA-10057 optimize class ConfigCommand method alterConfig parameters

2020-07-08 Thread GitBox

sasukerui commented on pull request #8742:
URL: https://github.com/apache/kafka/pull/8742#issuecomment-655474005


   > @sasukerui Can you please change the PR to target `master`?
   
   ok,l see



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #8986: KAFKA-10233; Add backoff after AuthorizationExceptions in consumer

2020-07-08 Thread GitBox

rajinisivaram commented on a change in pull request #8986:
URL: https://github.com/apache/kafka/pull/8986#discussion_r451498688



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -460,15 +463,21 @@ boolean joinGroupIfNeeded(final Timer timer) {
 exception instanceof IllegalGenerationException ||
 exception instanceof MemberIdRequiredException)
 continue;
-else if (!future.isRetriable())
-throw exception;
-
-timer.sleep(rebalanceConfig.retryBackoffMs);
+else {
+handleFailure(future, timer);
+}
 }
 }
 return true;
 }
 
+protected void handleFailure(RequestFuture future, Timer timer) {
+if (future.isRetriable() || future.exception() instanceof 
AuthorizationException)

Review comment:
   @abbccdda Thanks for the review. The instance we saw was an 
authorization failure for groups, but since I was fixing handling of all 
responses, I used `AuthorizationException` in the check since we want to ensure 
we backoff for topic authorization exceptions too. We already backoff for 
`RetriableException` and I haven't changed that behaviour, this PR addresses 
exceptions that are not subclasses of `RetriableException`. Perhaps just 
updating `AuthorizationException` as is currently done in the PR is sufficient?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8966: KAFKA-10220: add null check for configurationKey

2020-07-08 Thread GitBox

showuon commented on pull request #8966:
URL: https://github.com/apache/kafka/pull/8966#issuecomment-655510801


   @omkreddy , I can take a look tomorrow ( my time). Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8966: KAFKA-10220: add null check for configurationKey

2020-07-08 Thread GitBox

tombentley commented on pull request #8966:
URL: https://github.com/apache/kafka/pull/8966#issuecomment-655533627


   @omkreddy that is... an excellent question. 
   
   * In `HEAD` the only place it's set to non-null is in `RequestResponseTest`
   * In 9a4f00f78b (i.e. just before the change to use the generated message 
classes the situation is the same: Only non-null call sites are two tests in 
`RequestResponseTest`
   * ... and I chased it all the way back to 972b7545363ae, when the RPC was 
added, and it seems that this has always been the situation. One or two callers 
with non-null keys in `RequestResponseTest`. Thus supported (but without any 
test coverage) at the protocol level, but never exposed in the Java API. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-8794) Deprecate DescribeLogDirsResponse.[LogDirInfo, ReplicaInfo]

2020-07-08 Thread Dongjin Lee (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjin Lee resolved KAFKA-8794.

Resolution: Duplicate

Duplication of KAFKA-10120.

> Deprecate DescribeLogDirsResponse.[LogDirInfo, ReplicaInfo]
> ---
>
> Key: KAFKA-8794
> URL: https://issues.apache.org/jira/browse/KAFKA-8794
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, documentation
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Minor
>  Labels: needs-kip
>
> As of 2.3.0, {{DescribeLogDirsResult}} returned by 
> {{AdminClient#describeLogDirs(Collection)}} is exposing the internal data 
> structure, {{DescribeLogDirsResponse.LogDirInfo}}. By doing so, its Javadoc 
> provides no documentation on it. Its imparity is clear when comparing with 
> {{DescribeReplicaLogDirsResult}}, returned by 
> {{AdminClient#describeReplicaLogDirs(Collection)}}.
> To resolve this, {{DescribeLogDirsResponse.[LogDirInfo, ReplicaInfo]}} should 
> be deprecated and hided from the public later; instead, 
> {{org.apache.kafka.clients.admin.DescribeLogDirsResult}} should provide 
> {{[LogDirInfo, ReplicaInfo]}} as its internal class, like 
> {{DescribeReplicaLogDirsResult}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dongjinleekr closed pull request #7204: KAFKA-8794: Deprecate DescribeLogDirsResponse.[LogDirInfo, ReplicaInfo]

2020-07-08 Thread GitBox

dongjinleekr closed pull request #7204:
URL: https://github.com/apache/kafka/pull/7204


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-08 Thread GitBox

vvcephei commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-64900


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-08 Thread GitBox

vvcephei commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-65002







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-08 Thread GitBox

vvcephei commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-65206


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand

2020-07-08 Thread GitBox

omkreddy commented on pull request #8808:
URL: https://github.com/apache/kafka/pull/8808#issuecomment-65622


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #8985: MINOR; KafkaAdminClient#alterReplicaLogDirs should not fail all the futures when only one call fails

2020-07-08 Thread GitBox

dajac commented on pull request #8985:
URL: https://github.com/apache/kafka/pull/8985#issuecomment-655567477


   Jenkins looks good. There were one unrelated failed test during the JDK 14 
and Scala 2.13 run.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10226) KStream without SASL information should return error in confluent cloud

2020-07-08 Thread John Roesler (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153667#comment-17153667
 ] 

John Roesler commented on KAFKA-10226:
--

Sounds good :)

> KStream without SASL information should return error in confluent cloud
> ---
>
> Key: KAFKA-10226
> URL: https://issues.apache.org/jira/browse/KAFKA-10226
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, streams
>Affects Versions: 2.5.0
>Reporter: Werner Daehn
>Priority: Minor
>
> I have create a KStream against the Confluent cloud and wondered why no data 
> has been received from the source. Reason was that I forgot to add the SASL 
> api keys and secrets.
>  
> For end users this might lead to usability issues. If the KStream wants to 
> read from a topic and is not allowed to, this should raise an error, not be 
> silently ignored.
>  
> Hoe do producer/consumer clients handle that situation?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei opened a new pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)

2020-07-08 Thread GitBox

vvcephei opened a new pull request #8993:
URL: https://github.com/apache/kafka/pull/8993


   Replaces the previous upgrade test's trivial Streams app
   with the commonly used SmokeTest, exercising many more
   features. Also adjust the test matrix to test upgrading
   from each released version since 2.0 to the current branch.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on a change in pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand

2020-07-08 Thread GitBox

omkreddy commented on a change in pull request #8808:
URL: https://github.com/apache/kafka/pull/8808#discussion_r451626581



##
File path: core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
##
@@ -161,10 +193,34 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
 testProducerConsumerCli(adminArgs)
   }
 
+  @Test
+  def testAclCliWithClientId(): Unit = {
+val adminClientConfig = 
File.createTempFile(classOf[AclCommandTest].getName, "createServer")

Review comment:
   We can use `TestUtils.tempFile/TestUtils.tempFile(String)`.

##
File path: core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
##
@@ -161,10 +193,34 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
 testProducerConsumerCli(adminArgs)
   }
 
+  @Test
+  def testAclCliWithClientId(): Unit = {
+val adminClientConfig = 
File.createTempFile(classOf[AclCommandTest].getName, "createServer")
+adminClientConfig.deleteOnExit()
+val pw = new PrintWriter(adminClientConfig)
+pw.println("client.id=my-client")
+pw.close()
+
+createServer(Some(adminClientConfig))
+
+val appender = LogCaptureAppender.createAndRegister()
+val previousLevel = 
LogCaptureAppender.setClassLoggerLevel(classOf[AppInfoParser], Level.WARN)
+
+testAclCli(adminArgs)
+
+LogCaptureAppender.setClassLoggerLevel(classOf[AppInfoParser], 
previousLevel)

Review comment:
   we may  want to reset it in `finally` block?

##
File path: core/src/main/scala/kafka/admin/AclCommand.scala
##
@@ -130,33 +130,39 @@ object AclCommand extends Logging {
   }
 }
 
-listAcls()
+listAcls(adminClient)
   }
 }
 
 def listAcls(): Unit = {
   withAdminClient(opts) { adminClient =>
-val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
-val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt)
-val resourceToAcls = getAcls(adminClient, filters)
+listAcls(adminClient)
+  }
+}
 
-if (listPrincipals.isEmpty) {
-  for ((resource, acls) <- resourceToAcls)
-println(s"Current ACLs for resource `$resource`: $Newline 
${acls.map("\t" + _).mkString(Newline)} $Newline")
-} else {
-  listPrincipals.foreach(principal => {
-println(s"ACLs for principal `$principal`")
-val filteredResourceToAcls =  resourceToAcls.map { case (resource, 
acls) =>
-  resource -> acls.filter(acl => 
principal.toString.equals(acl.principal))
-}.filter { case (_, acls) => acls.nonEmpty }
+private def listAcls(adminClient: Admin) = {

Review comment:
   nit: Can we add `: Unit` return type here and  in other new methods?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)

2020-07-08 Thread GitBox

vvcephei commented on a change in pull request #8993:
URL: https://github.com/apache/kafka/pull/8993#discussion_r451634562



##
File path: build.gradle
##
@@ -1508,6 +1508,18 @@ project(':streams:upgrade-system-tests-24') {
   }
 }
 
+project(':streams:upgrade-system-tests-25') {

Review comment:
   I realized this just now. We really should test upgrading from 2.5.0 to 
2.5.1, and similar for every bugfix release. In other words, we should 
add/update "LATEST_X_Y" to the test matrix of the X.Y branch after each release.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
##
@@ -92,6 +94,13 @@ public void shouldWorkWithRebalance() throws 
InterruptedException {
 
 final Properties props = new Properties();
 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
+props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
+props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
+props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
+props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+props.put(ProducerConfig.ACKS_CONFIG, "all");

Review comment:
   I'm not sure exactly what it was, but the integration test was failing 
after the (backported) change to move the property definitions into python. 
I'll send a new PR to trunk to add these configs to the integration test, just 
in case not having them is a source of flakiness.

##
File path: tests/kafkatest/tests/streams/streams_application_upgrade_test.py
##
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import random
+from ducktape.mark import matrix
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.streams import StreamsSmokeTestDriverService, 
StreamsSmokeTestJobRunnerService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.version import LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, 
LATEST_2_4, LATEST_2_5, DEV_VERSION, KafkaVersion
+
+smoke_test_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), 
str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)]

Review comment:
   Note, in 2.5, I was able to add 2.0 and 2.1 to the upgrade matrix. This 
means we did break upgradability from anything 2.1- to 2.6+ in the 2.6 release. 
People would have to upgrade to 2.5 first and then upgrade to 2.6.
   
   Should we block the release and fix this for 2.6.0?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
##
@@ -45,21 +47,48 @@
 public Processor get() {
 return new AbstractProcessor() {
 private int numRecordsProcessed = 0;
+private long smallestOffset = Long.MAX_VALUE;
+private long largestOffset = Long.MIN_VALUE;
 
 @Override
 public void init(final ProcessorContext context) {
 super.init(context);
+LOG.info("[DEV] initializing processor: topic=" + 
topic + " taskId=" + context.taskId());
 System.out.println("[DEV] initializing processor: 
topic=" + topic + " taskId=" + context.taskId());
+System.out.flush();
 numRecordsProcessed = 0;
+smallestOffset = Long.MAX_VALUE;
+largestOffset = Long.MIN_VALUE;
 }
 
 @Override
 public void process(final Object key, final Object value) {
 numRecordsProcessed++;
-if (numRecordsProcessed % 100 == 0) {

Review comment:
   I had to take this out to make the upgrade test pass. The problem was 
that after upgrading, not all the instances would get a task for the input 
topic (`data`). When an instance has only to process repartition topics, it 
d

[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)

2020-07-08 Thread GitBox

vvcephei commented on pull request #8993:
URL: https://github.com/apache/kafka/pull/8993#issuecomment-655605721


   Started the new test: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4015/
   And all streams tests: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4016/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand

2020-07-08 Thread GitBox

tombentley commented on pull request #8808:
URL: https://github.com/apache/kafka/pull/8808#issuecomment-655617244


   @omkreddy done



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9144) Early expiration of producer state can cause coordinator epoch to regress

2020-07-08 Thread Jason Gustafson (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-9144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-9144:
---
Fix Version/s: 2.3.2
   2.2.3

> Early expiration of producer state can cause coordinator epoch to regress
> -
>
> Key: KAFKA-9144
> URL: https://issues.apache.org/jira/browse/KAFKA-9144
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.2.3, 2.3.2, 2.4.1
>
>
> Transaction markers are written by the transaction coordinator. In order to 
> fence zombie coordinators, we use the leader epoch associated with the 
> coordinator partition. Partition leaders verify the epoch in the 
> WriteTxnMarker request and ensure that it can only increase. However, when 
> producer state expires, we stop tracking the epoch and it is possible for 
> monotonicity to be violated. Generally we expect expiration to be on the 
> order of days, so it should be unlikely for this to be a problem.
> At least that is the theory. We observed a case where a coordinator epoch 
> decreased between nearly consecutive writes within a couple minutes of each 
> other. Upon investigation, we found that producer state had been incorrectly 
> expired. We believe the sequence of events is the following:
>  # Producer writes transactional data and fails before committing
>  # Coordinator times out the transaction and writes ABORT markers
>  # Upon seeing the ABORT and the bumped epoch, the partition leader deletes 
> state from the last epoch, which effectively resets the last timestamp for 
> the producer to -1.
>  # The coordinator becomes a zombie before getting a successful response and 
> continues trying to send
>  # The new coordinator notices the incomplete transaction and also sends 
> markers
>  # The partition leader accepts the write from the new coordinator
>  # The producer state is expired because the last timestamp was -1
>  # The partition leader accepts the write from the old coordinator
> Basically it takes an alignment of planets to hit this bug, but it is 
> possible. If you hit it, then the broker may be unable to start because we 
> validate epoch monotonicity during log recovery. The problem is in 3 when the 
> timestamp gets reset. We should use the timestamp from the marker instead.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9144) Early expiration of producer state can cause coordinator epoch to regress

2020-07-08 Thread Jason Gustafson (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-9144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-9144:
---
Affects Version/s: 2.0.1
   2.1.1
   2.2.2
   2.4.0
   2.3.1

> Early expiration of producer state can cause coordinator epoch to regress
> -
>
> Key: KAFKA-9144
> URL: https://issues.apache.org/jira/browse/KAFKA-9144
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.4.0, 2.3.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.2.3, 2.3.2, 2.4.1
>
>
> Transaction markers are written by the transaction coordinator. In order to 
> fence zombie coordinators, we use the leader epoch associated with the 
> coordinator partition. Partition leaders verify the epoch in the 
> WriteTxnMarker request and ensure that it can only increase. However, when 
> producer state expires, we stop tracking the epoch and it is possible for 
> monotonicity to be violated. Generally we expect expiration to be on the 
> order of days, so it should be unlikely for this to be a problem.
> At least that is the theory. We observed a case where a coordinator epoch 
> decreased between nearly consecutive writes within a couple minutes of each 
> other. Upon investigation, we found that producer state had been incorrectly 
> expired. We believe the sequence of events is the following:
>  # Producer writes transactional data and fails before committing
>  # Coordinator times out the transaction and writes ABORT markers
>  # Upon seeing the ABORT and the bumped epoch, the partition leader deletes 
> state from the last epoch, which effectively resets the last timestamp for 
> the producer to -1.
>  # The coordinator becomes a zombie before getting a successful response and 
> continues trying to send
>  # The new coordinator notices the incomplete transaction and also sends 
> markers
>  # The partition leader accepts the write from the new coordinator
>  # The producer state is expired because the last timestamp was -1
>  # The partition leader accepts the write from the old coordinator
> Basically it takes an alignment of planets to hit this bug, but it is 
> possible. If you hit it, then the broker may be unable to start because we 
> validate epoch monotonicity during log recovery. The problem is in 3 when the 
> timestamp gets reset. We should use the timestamp from the marker instead.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] omkreddy commented on a change in pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand

2020-07-08 Thread GitBox

omkreddy commented on a change in pull request #8808:
URL: https://github.com/apache/kafka/pull/8808#discussion_r451674847



##
File path: core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
##
@@ -127,25 +131,53 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
 testAclCli(adminArgs)
   }
 
-  private def createServer(): Unit = {
+  private def createServer(commandConfig: Option[File] = None): Unit = {
 servers = Seq(TestUtils.createServer(KafkaConfig.fromProps(brokerProps)))
 val listenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
-adminArgs = Array("--bootstrap-server", 
TestUtils.bootstrapServers(servers, listenerName))
+
+var adminArgs = Array("--bootstrap-server", 
TestUtils.bootstrapServers(servers, listenerName))
+if (commandConfig.isDefined) {
+  adminArgs ++= Array("--command-config", 
commandConfig.get.getAbsolutePath)
+}
+this.adminArgs = adminArgs
+  }
+
+  private def callMain(args: Array[String]): (String, String) = {
+TestUtils.grabConsoleOutputAndError(AclCommand.main(args))
   }
 
   private def testAclCli(cmdArgs: Array[String]): Unit = {
 for ((resources, resourceCmd) <- ResourceToCommand) {
   for (permissionType <- Set(ALLOW, DENY)) {
 val operationToCmd = ResourceToOperations(resources)
 val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1)
-  AclCommand.main(cmdArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 
:+ "--add")
-  for (resource <- resources) {
-withAuthorizer() { authorizer =>
-  TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
-}
+val (addOut, addErr) = callMain(cmdArgs ++ cmd ++ resourceCmd ++ 
operationToCmd._2 :+ "--add")
+assertOutputContains("Adding ACLs", resources, resourceCmd, addOut)
+assertOutputContains("Current ACLs", resources, resourceCmd, addOut)
+Assert.assertEquals("", addErr)
+
+for (resource <- resources) {
+  withAuthorizer() { authorizer =>
+TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
   }
+}
 
-  testRemove(cmdArgs, resources, resourceCmd)
+val (listOut, listErr) = callMain(cmdArgs :+ "--list")
+assertOutputContains("Current ACLs", resources, resourceCmd, listOut)
+Assert.assertEquals("", listErr)
+
+testRemove(cmdArgs, resources, resourceCmd)
+  }
+}
+  }
+
+  private def assertOutputContains(prefix: String, resources: 
Set[ResourcePattern], resourceCmd: Array[String], output: String) = {

Review comment:
   nit: missing return type

##
File path: core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
##
@@ -161,10 +193,35 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
 testProducerConsumerCli(adminArgs)
   }
 
+  @Test
+  def testAclCliWithClientId(): Unit = {
+val adminClientConfig = TestUtils.tempFile()
+adminClientConfig.deleteOnExit()

Review comment:
   This is not required now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8864: KAFKA-9274: Mark `retries` config as deprecated and add new `task.timeout.ms` config

2020-07-08 Thread GitBox

hachikuji commented on a change in pull request #8864:
URL: https://github.com/apache/kafka/pull/8864#discussion_r451682470



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -1447,6 +1451,7 @@ public void 
testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exceptio
 }
 }
 
+@Deprecated

Review comment:
   Fair enough.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #8934: KAFKA-10134: Use long poll if we do not have fetchable partitions

2020-07-08 Thread GitBox

guozhangwang merged pull request #8934:
URL: https://github.com/apache/kafka/pull/8934


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8934: KAFKA-10134: Use long poll if we do not have fetchable partitions

2020-07-08 Thread GitBox

guozhangwang commented on pull request #8934:
URL: https://github.com/apache/kafka/pull/8934#issuecomment-655636676


   Cherry-picked to 2.6.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-07-08 Thread Guozhang Wang (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153757#comment-17153757
 ] 

Guozhang Wang commented on KAFKA-10134:
---

I've merged the PR and would like [~seanguo] [~neowu0] to verify if it has 
fixed their observed issue.

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand

2020-07-08 Thread GitBox

tombentley commented on pull request #8808:
URL: https://github.com/apache/kafka/pull/8808#issuecomment-655641999


   @omkreddy, sorry about that, now fixed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10226) KStream without SASL information should return error in confluent cloud

2020-07-08 Thread Werner Daehn (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153768#comment-17153768
 ] 

Werner Daehn commented on KAFKA-10226:
--

Here is my verdict after testing:

Producer, Consumer, KStreams, AdminClient all do *not* throw an exception. I 
have not found a method to figure out if the provided values are valid.

 

Test case:

{{try {}}

{{    Map producerprops = new HashMap<>();}}
{{    producerprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
'wrong.aws.confluent.cloud:9092');}}
{{    producerprops.put(ProducerConfig.ACKS_CONFIG, "all");}}
{{    producerprops.put(ProducerConfig.RETRIES_CONFIG, 0);}}
{{    producerprops.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);}}
{{    producerprops.put(ProducerConfig.LINGER_MS_CONFIG, 1);}}
{{    producerprops.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);}}
{{    producerprops.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);}}
{{    producerprops.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);}}
{{    this.producer = new KafkaProducer(producerprops);}}

{{} catch (Throwable e) {}}
{{    e.printStacktrace();}}
{{}}}

 

If you execute this code segment it will not create an exception, although the 
hostname is not running a Kafka instance for sure. Just by looking at the 
this.producer I cannot tell if the connection is valid or not.
When a end user configures the Kafka properties, the expectation is to get 
concise feedback. hostname cannot be resolved. server refuses connection at 
that port. Server does not respond. Endpoint is fine but the security settings 
are missing.

 

As this is so basic, I wonder why nobody has ever raised that or rather I do 
not know something. Any ideas?

 

> KStream without SASL information should return error in confluent cloud
> ---
>
> Key: KAFKA-10226
> URL: https://issues.apache.org/jira/browse/KAFKA-10226
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, streams
>Affects Versions: 2.5.0
>Reporter: Werner Daehn
>Priority: Minor
>
> I have create a KStream against the Confluent cloud and wondered why no data 
> has been received from the source. Reason was that I forgot to add the SASL 
> api keys and secrets.
>  
> For end users this might lead to usability issues. If the KStream wants to 
> read from a topic and is not allowed to, this should raise an error, not be 
> silently ignored.
>  
> Hoe do producer/consumer clients handle that situation?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-08 Thread GitBox

vvcephei commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-655648704


   Failure was unrelated: 
kafka.api.SaslMultiMechanismConsumerTest.testCoordinatorFailover



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand

2020-07-08 Thread GitBox

omkreddy commented on pull request #8808:
URL: https://github.com/apache/kafka/pull/8808#issuecomment-655651164


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10226) KStream without SASL information should return error in confluent cloud

2020-07-08 Thread Werner Daehn (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153768#comment-17153768
 ] 

Werner Daehn edited comment on KAFKA-10226 at 7/8/20, 5:26 PM:
---

Here is my verdict after testing:

Producer, Consumer, KStreams, AdminClient all do *not* throw an exception. I 
have not found a method to figure out if the provided values are valid.

 

Test case:

{{try {}}

{{    Map producerprops = new HashMap<>();}}
 {{    producerprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
'pkc-41wq6.eu-west-2.aws.confluent.cloud:1234');}}
 {{    producerprops.put(ProducerConfig.ACKS_CONFIG, "all");}}
 {{    producerprops.put(ProducerConfig.RETRIES_CONFIG, 0);}}
 {{    producerprops.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);}}
 {{    producerprops.put(ProducerConfig.LINGER_MS_CONFIG, 1);}}
 {{    producerprops.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);}}
 {{    producerprops.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);}}
 {{    producerprops.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);}}
 {{    this.producer = new KafkaProducer(producerprops);}}

{{} catch (Throwable e) {}}
 {{    e.printStacktrace();}}
 {{}}}

 

If you execute this code segment it will not create an exception, although the 
port is not valid for sure. Just by looking at the this.producer I cannot tell 
if the connection is valid or not.
 When a end user configures the Kafka properties, the expectation is to get 
concise feedback.

Invalid hostname I get as feedback. Invalid port or security I do not.

 

As this is so basic, I wonder why nobody has ever raised that or rather I do 
not know something. Any ideas?

 


was (Author: wdaehn):
Here is my verdict after testing:

Producer, Consumer, KStreams, AdminClient all do *not* throw an exception. I 
have not found a method to figure out if the provided values are valid.

 

Test case:

{{try {}}

{{    Map producerprops = new HashMap<>();}}
{{    producerprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
'wrong.aws.confluent.cloud:9092');}}
{{    producerprops.put(ProducerConfig.ACKS_CONFIG, "all");}}
{{    producerprops.put(ProducerConfig.RETRIES_CONFIG, 0);}}
{{    producerprops.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);}}
{{    producerprops.put(ProducerConfig.LINGER_MS_CONFIG, 1);}}
{{    producerprops.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);}}
{{    producerprops.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);}}
{{    producerprops.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);}}
{{    this.producer = new KafkaProducer(producerprops);}}

{{} catch (Throwable e) {}}
{{    e.printStacktrace();}}
{{}}}

 

If you execute this code segment it will not create an exception, although the 
hostname is not running a Kafka instance for sure. Just by looking at the 
this.producer I cannot tell if the connection is valid or not.
When a end user configures the Kafka properties, the expectation is to get 
concise feedback. hostname cannot be resolved. server refuses connection at 
that port. Server does not respond. Endpoint is fine but the security settings 
are missing.

 

As this is so basic, I wonder why nobody has ever raised that or rather I do 
not know something. Any ideas?

 

> KStream without SASL information should return error in confluent cloud
> ---
>
> Key: KAFKA-10226
> URL: https://issues.apache.org/jira/browse/KAFKA-10226
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, streams
>Affects Versions: 2.5.0
>Reporter: Werner Daehn
>Priority: Minor
>
> I have create a KStream against the Confluent cloud and wondered why no data 
> has been received from the source. Reason was that I forgot to add the SASL 
> api keys and secrets.
>  
> For end users this might lead to usability issues. If the KStream wants to 
> read from a topic and is not allowed to, this should raise an error, not be 
> silently ignored.
>  
> Hoe do producer/consumer clients handle that situation?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)

2020-07-08 Thread GitBox

vvcephei commented on a change in pull request #8993:
URL: https://github.com/apache/kafka/pull/8993#discussion_r451712293



##
File path: tests/kafkatest/tests/streams/streams_application_upgrade_test.py
##
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import random
+from ducktape.mark import matrix
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.streams import StreamsSmokeTestDriverService, 
StreamsSmokeTestJobRunnerService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.version import LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, 
LATEST_2_4, LATEST_2_5, DEV_VERSION, KafkaVersion
+
+smoke_test_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), 
str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)]

Review comment:
   Actually, I just tried 2.1 and 2.0 again (with a full bounce now, 
instead of a rolling bounce), and it works on 2.6/trunk. So, it seems only the 
rolling bounce path has been broken.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10247) Streams may attempt to process after closing a task

2020-07-08 Thread John Roesler (Jira)
John Roesler created KAFKA-10247:


 Summary: Streams may attempt to process after closing a task
 Key: KAFKA-10247
 URL: https://issues.apache.org/jira/browse/KAFKA-10247
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.0
Reporter: John Roesler
Assignee: John Roesler


Observed in a system test. A corrupted task was detected, and Stream properly 
closed it as dirty:
{code:java}
[2020-07-08 17:08:09,345] WARN stream-thread 
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered 
org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records 
from restore consumer for partitions [SmokeTest-cntStoreName-changelog-1], it 
is likely that the consumer's position has fallen out of the topic partition 
offset range because the topic was truncated or compacted on the broker, 
marking the corresponding tasks as corrupted and re-initializing it later. 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader)
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position 
FetchPosition{offset=1, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: null)], 
epoch=0}} is out of range for partition SmokeTest-cntStoreName-changelog-1
   at 
org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
   at 
org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
   at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
   at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
[2020-07-08 17:08:09,345] WARN stream-thread 
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Detected the 
states of tasks {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted. Will 
close the task as dirty and re-create and bootstrap from scratch. 
(org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs 
{2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted and hence needs to be 
re-initialized
   at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch 
position FetchPosition{offset=1, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: null)], 
epoch=0}} is out of range for partition SmokeTest-cntStoreName-changelog-1
   at 
org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
   at 
org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
   at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
   at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
   ... 3 more
[2020-07-08 17:08:09,346] INFO stream-thread 
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] 
Suspended running (org.apache.kafka.streams.processor.internals.StreamTask)
[2020-07-08 17:08:09,346] DEBUG stream-thread 
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] 
Closing its state manager and all the registered state stores: 
{sum-STATE-STORE-50=StateStoreMetadata (sum-STATE-STORE-50 : 
SmokeTest-sum-STATE-STORE-50-changelog-1 @ null, 
cntStoreName=StateStoreMetadata (cntStoreName : 
SmokeTest-cntStoreName-changelog-1 @ 0} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)
[2020-07-08 17:08:09,346] INFO [Consumer 
clientId=SmokeTest-66676ca

[GitHub] [kafka] vvcephei opened a new pull request #8994: KAFKA-10247: Skip processing if task isn't running

2020-07-08 Thread GitBox

vvcephei opened a new pull request #8994:
URL: https://github.com/apache/kafka/pull/8994


   The definition of `Task#process` allows for returning `false`
   if the task isn't "processable" right now. We can conveniently
   resolve KAFKA-10247 by simply considering a task not
   processable when it is not running.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10247) Streams may attempt to process after closing a task

2020-07-08 Thread John Roesler (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10247:
-
Priority: Blocker  (was: Major)

> Streams may attempt to process after closing a task
> ---
>
> Key: KAFKA-10247
> URL: https://issues.apache.org/jira/browse/KAFKA-10247
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
>
> Observed in a system test. A corrupted task was detected, and Stream properly 
> closed it as dirty:
> {code:java}
> [2020-07-08 17:08:09,345] WARN stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered 
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records 
> from restore consumer for partitions [SmokeTest-cntStoreName-changelog-1], it 
> is likely that the consumer's position has fallen out of the topic partition 
> offset range because the topic was truncated or compacted on the broker, 
> marking the corresponding tasks as corrupted and re-initializing it later. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position 
> FetchPosition{offset=1, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: 
> null)], epoch=0}} is out of range for partition 
> SmokeTest-cntStoreName-changelog-1
>at 
> org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
>at 
> org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
>at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
>at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
>at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
>at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
>at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
>at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
>at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
> [2020-07-08 17:08:09,345] WARN stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Detected the 
> states of tasks {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted. 
> Will close the task as dirty and re-create and bootstrap from scratch. 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs 
> {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted and hence needs to 
> be re-initialized
>at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
>at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
>at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
> Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch 
> position FetchPosition{offset=1, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: 
> null)], epoch=0}} is out of range for partition 
> SmokeTest-cntStoreName-changelog-1
>at 
> org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
>at 
> org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
>at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
>at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
>at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
>at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
>at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
>... 3 more
> [2020-07-08 17:08:09,346] INFO stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] 
> Suspended running (org.apache.kafka.streams.processor.internals.StreamTask)
> [2020-07-08 17:08:09,346] DEBUG stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] 
> Closing its state manager and a

[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-07-08 Thread Neo Wu (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153852#comment-17153852
 ] 

Neo Wu commented on KAFKA-10134:


Hi, [~guozhang]

Your latest change fixed all my issues, Thanks!

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-07-08 Thread Guozhang Wang (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-10134.
---
Resolution: Fixed

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-07-08 Thread Guozhang Wang (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153861#comment-17153861
 ] 

Guozhang Wang commented on KAFKA-10134:
---

Thanks for the confirmation! I'm resolving this ticket then.

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-07-08 Thread Guozhang Wang (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153862#comment-17153862
 ] 

Guozhang Wang commented on KAFKA-10134:
---

cc 2.5.1 release manager [~vvcephei] I'm merging it to 2.5 branch too.

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #8994: KAFKA-10247: Skip processing if task isn't running

2020-07-08 Thread GitBox

mjsax commented on pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#issuecomment-655701085


   It seem the root cause of the issue is that we don't cleanup internal data 
structures correctly when closing a task dirty? Should the task not be removed 
from the list of active tasks instead?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #8985: MINOR; KafkaAdminClient#alterReplicaLogDirs should not fail all the futures when only one call fails

2020-07-08 Thread GitBox

cmccabe merged pull request #8985:
URL: https://github.com/apache/kafka/pull/8985


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JimGalasyn opened a new pull request #8995: Restore stream-table duality description

2020-07-08 Thread GitBox

JimGalasyn opened a new pull request #8995:
URL: https://github.com/apache/kafka/pull/8995


   The stream-table duality section was dropped inadvertently sometime after 
version 0.11.0, so this PR restores it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10218) DistributedHerder's canReadConfigs field is never reset to true

2020-07-08 Thread Chris Egerton (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153936#comment-17153936
 ] 

Chris Egerton commented on KAFKA-10218:
---

[~rhauch] The primary impact here is that there are erroneous and confusing log 
messages that start to get emitted as soon as the worker enters a bad state and 
there is no fix except to restart the worker. The fix and the test are fairly 
simple and I think warrant review before the upcoming 2.7 release so that we 
can include them in upcoming bug fix releases before then.

> DistributedHerder's canReadConfigs field is never reset to true
> ---
>
> Key: KAFKA-10218
> URL: https://issues.apache.org/jira/browse/KAFKA-10218
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2, 2.3.0, 2.1.2, 
> 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.2.3, 2.5.0, 2.3.2, 2.4.1, 2.6.0, 2.4.2, 2.5.1, 
> 2.7.0, 2.5.2, 2.6.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> If the {{DistributedHerder}} encounters issues reading to the end of the 
> config topic, it [takes note of this 
> fact|https://github.com/apache/kafka/blob/7db52a46b00eed652e791dd4eae809d590626a1f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1109]
>  by setting a field {{canReadConfigs}} to {{false}} and then acts accordingly 
> at the [start of its tick 
> loop|https://github.com/apache/kafka/blob/7db52a46b00eed652e791dd4eae809d590626a1f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L319]
>  by trying again to read to the end of the config topic. However, if a 
> subsequent attempt to read to the end of the config topic succeeds, the 
> {{canReadConfigs}} field is never set back to {{true}} again, so no matter 
> what, the herder will always attempt to read to the end of the config topic 
> at the beginning of each tick.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #8994: KAFKA-10247: Skip processing if task isn't running

2020-07-08 Thread GitBox

vvcephei commented on pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#issuecomment-655721490


   Hey @abbccdda and @mjsax , thanks for taking a look! The PR is not complete 
yet. I'll let you know when it's ready for review.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8770) Either switch to or add an option for emit-on-change

2020-07-08 Thread John Roesler (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153975#comment-17153975
 ] 

John Roesler commented on KAFKA-8770:
-

Hi [~davispw] ,

That's a good idea. I'll split it up.

The portion implemented so far is this:
{code:java}
commit f54cece73e6116566979bcf6a865d803b7c18974
Author: Richard Yu 
Date:   Tue May 12 11:19:32 2020 -0700KAFKA-8770: KIP-557: Drop idempotent 
KTable source updates (#8254)

Drops idempotent updates from KTable source operators.
Specifically, drop updates in which the value is unchanged,
and the timestamp is the same or larger.

Implements: KIP-557
Reviewers: Bruno Cadonna , John Roesler 

{code}
https://github.com/apache/kafka/pull/8254

> Either switch to or add an option for emit-on-change
> 
>
> Key: KAFKA-8770
> URL: https://issues.apache.org/jira/browse/KAFKA-8770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Currently, Streams offers two emission models:
> * emit-on-window-close: (using Suppression)
> * emit-on-update: (i.e., emit a new result whenever a new record is 
> processed, regardless of whether the result has changed)
> There is also an option to drop some intermediate results, either using 
> caching or suppression.
> However, there is no support for emit-on-change, in which results would be 
> forwarded only if the result has changed. This has been reported to be 
> extremely valuable as a performance optimizations for some high-traffic 
> applications, and it reduces the computational burden both internally for 
> downstream Streams operations, as well as for external systems that consume 
> the results, and currently have to deal with a lot of "no-op" changes.
> It would be pretty straightforward to implement this, by loading the prior 
> results before a stateful operation and comparing with the new result before 
> persisting or forwarding. In many cases, we load the prior result anyway, so 
> it may not be a significant performance impact either.
> One design challenge is what to do with timestamps. If we get one record at 
> time 1 that produces a result, and then another at time 2 that produces a 
> no-op, what should be the timestamp of the result, 1 or 2? emit-on-change 
> would require us to say 1.
> Clearly, we'd need to do some serious benchmarks to evaluate any potential 
> implementation of emit-on-change.
> Another design challenge is to decide if we should just automatically provide 
> emit-on-change for stateful operators, or if it should be configurable. 
> Configuration increases complexity, so unless the performance impact is high, 
> we may just want to change the emission model without a configuration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10248) Drop idempotent KTable source updates

2020-07-08 Thread John Roesler (Jira)
John Roesler created KAFKA-10248:


 Summary: Drop idempotent KTable source updates
 Key: KAFKA-10248
 URL: https://issues.apache.org/jira/browse/KAFKA-10248
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: Richard Yu


Implement KIP-557 for KTable source nodes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10248) Drop idempotent KTable source updates

2020-07-08 Thread John Roesler (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153976#comment-17153976
 ] 

John Roesler commented on KAFKA-10248:
--

This was implemented in this commit:
{code:java}
commit f54cece73e6116566979bcf6a865d803b7c18974
Author: Richard Yu 
Date:   Tue May 12 11:19:32 2020 -0700KAFKA-8770: KIP-557: Drop idempotent 
KTable source updates (#8254)

Drops idempotent updates from KTable source operators.
Specifically, drop updates in which the value is unchanged,
and the timestamp is the same or larger.

Implements: KIP-557
Reviewers: Bruno Cadonna , John Roesler 
{code}
In this PR: https://github.com/apache/kafka/pull/8254

> Drop idempotent KTable source updates
> -
>
> Key: KAFKA-10248
> URL: https://issues.apache.org/jira/browse/KAFKA-10248
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: Richard Yu
>Priority: Major
>
> Implement KIP-557 for KTable source nodes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10248) Drop idempotent KTable source updates

2020-07-08 Thread John Roesler (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-10248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10248:
-
Fix Version/s: 2.6.0

> Drop idempotent KTable source updates
> -
>
> Key: KAFKA-10248
> URL: https://issues.apache.org/jira/browse/KAFKA-10248
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: Richard Yu
>Priority: Major
> Fix For: 2.6.0
>
>
> Implement KIP-557 for KTable source nodes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10248) Drop idempotent KTable source updates

2020-07-08 Thread John Roesler (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-10248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10248.
--
Resolution: Fixed

> Drop idempotent KTable source updates
> -
>
> Key: KAFKA-10248
> URL: https://issues.apache.org/jira/browse/KAFKA-10248
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: Richard Yu
>Priority: Major
> Fix For: 2.6.0
>
>
> Implement KIP-557 for KTable source nodes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8770) Either switch to or add an option for emit-on-change

2020-07-08 Thread John Roesler (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-8770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153977#comment-17153977
 ] 

John Roesler commented on KAFKA-8770:
-

Ok, I created (and resolved) a subtask, which is properly labeled as 
implemented in 2.6.0: https://issues.apache.org/jira/browse/KAFKA-10248

[~Yohan123], can you create a subtask for the portion you're planning to do 
next? Then, we can target the PR at that ticket instead of this one to keep 
everything tidy.

> Either switch to or add an option for emit-on-change
> 
>
> Key: KAFKA-8770
> URL: https://issues.apache.org/jira/browse/KAFKA-8770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Currently, Streams offers two emission models:
> * emit-on-window-close: (using Suppression)
> * emit-on-update: (i.e., emit a new result whenever a new record is 
> processed, regardless of whether the result has changed)
> There is also an option to drop some intermediate results, either using 
> caching or suppression.
> However, there is no support for emit-on-change, in which results would be 
> forwarded only if the result has changed. This has been reported to be 
> extremely valuable as a performance optimizations for some high-traffic 
> applications, and it reduces the computational burden both internally for 
> downstream Streams operations, as well as for external systems that consume 
> the results, and currently have to deal with a lot of "no-op" changes.
> It would be pretty straightforward to implement this, by loading the prior 
> results before a stateful operation and comparing with the new result before 
> persisting or forwarding. In many cases, we load the prior result anyway, so 
> it may not be a significant performance impact either.
> One design challenge is what to do with timestamps. If we get one record at 
> time 1 that produces a result, and then another at time 2 that produces a 
> no-op, what should be the timestamp of the result, 1 or 2? emit-on-change 
> would require us to say 1.
> Clearly, we'd need to do some serious benchmarks to evaluate any potential 
> implementation of emit-on-change.
> Another design challenge is to decide if we should just automatically provide 
> emit-on-change for stateful operators, or if it should be configurable. 
> Configuration increases complexity, so unless the performance impact is high, 
> we may just want to change the emission model without a configuration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #8995: Restore stream-table duality description

2020-07-08 Thread GitBox

mjsax commented on a change in pull request #8995:
URL: https://github.com/apache/kafka/pull/8995#discussion_r451796320



##
File path: docs/streams/core-concepts.html
##
@@ -170,13 +150,59 @@ Duality of
   or to run interactive
 queries
   against your application's latest processing results. And, beyond its 
internal usage, the Kafka Streams API
   also allows developers to exploit this duality in their own applications.
-  
+
 
-  
+
   Before we discuss concepts such as aggregations
   in Kafka Streams, we must first introduce tables in 
more detail, and talk about the aforementioned stream-table duality.
-  Essentially, this duality means that a stream can be viewed as a table, 
and a table can be viewed as a stream.
-  
+  Essentially, this duality means that a stream can be viewed as a table, 
and a table can be viewed as a stream. Kafka's log compaction feature, for 
example, exploits this duality.
+
+
+
+A simple form of a table is a collection of key-value pairs, also 
called a map or associative array. Such a table may look as follows:
+
+
+
+The stream-table duality describes the close relationship between 
streams and tables.
+
+Stream as Table: A stream can be considered a changelog of 
a table, where each data record in the stream captures a state change of the 
table. A stream is thus a table in disguise, and it can be easily turned into a 
"real" table by replaying the changelog from beginning to end to reconstruct 
the table. Similarly, in a more general analogy, aggregating data records in a 
stream - such as computing the total number of pageviews by user from a stream 
of pageview events - will return a table (here with the key and the value being 
the user and its corresponding pageview count, respectively).
+Table as Stream: A table can be considered a snapshot, at a 
point in time, of the latest value for each key in a stream (a stream's data 
records are key-value pairs). A table is thus a stream in disguise, and it can 
be easily turned into a "real" stream by iterating over each key-value entry in 
the table.
+
+
+
+Let's illustrate this with an example. Imagine a table that tracks the 
total number of pageviews by user (first column of diagram below). Over time, 
whenever a new pageview event is processed, the state of the table is updated 
accordingly. Here, the state changes between different points in time - and 
different revisions of the table - can be represented as a changelog stream 
(second column).
+
+
+
+
+Interestingly, because of the stream-table duality, the same stream 
can be used to reconstruct the original table (third column):
+
+
+
+
+The same mechanism is used, for example, to replicate databases via 
change data capture (CDC) and, within Kafka Streams, to replicate its so-called 
state stores across machines for fault-tolerance.
+The stream-table duality is such an important concept that Kafka 
Streams models it explicitly via the KStream, 
KTable, and GlobalKTable interfaces.
+
+
+Aggregations
+
+An aggregation operation takes one input stream or 
table, and yields a new table by combining multiple input records into a single 
output record. Examples of aggregations are computing counts or sum.
+
+
+
+In the Kafka Streams DSL, an input stream of an 
aggregation can be a KStream or a KTable, but the output stream 
will always be a KTable. This allows Kafka Streams to update an aggregate value 
upon the out-of-order arrival of further records after the value was produced 
and emitted. When such out-of-order arrival happens, the aggregating KStream or 
KTable emits a new aggregate value. Because the output is a KTable, the new 
value is considered to overwrite the old value with the same key in subsequent 
processing steps.

Review comment:
   Can we avoid those super long lines? Similar below.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8974: KAFKA-10225 Increase default zk session timeout for system tests

2020-07-08 Thread GitBox

junrao commented on pull request #8974:
URL: https://github.com/apache/kafka/pull/8974#issuecomment-655734288


   System test results. All 9 failures seem unrelated to this PR. 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-07-07--001.1594186925--chia7712--KAFKA-10225--da9265760/report.html



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao merged pull request #8974: KAFKA-10225 Increase default zk session timeout for system tests

2020-07-08 Thread GitBox

junrao merged pull request #8974:
URL: https://github.com/apache/kafka/pull/8974


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10225) Increase default zk session timeout for system tests

2020-07-08 Thread Jun Rao (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-10225.
-
Fix Version/s: 2.7.0
   Resolution: Fixed

merged the PR to trunk

> Increase default zk session timeout for system tests
> 
>
> Key: KAFKA-10225
> URL: https://issues.apache.org/jira/browse/KAFKA-10225
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
> Fix For: 2.7.0
>
>
> I'm digging in the flaky system tests and then I noticed there are many flaky 
> caused by following check.
> {code}
> with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as 
> monitor:
> node.account.ssh(cmd)
> # Kafka 1.0.0 and higher don't have a space between "Kafka" and 
> "Server"
> monitor.wait_until("Kafka\s*Server.*started", 
> timeout_sec=timeout_sec, backoff_sec=.25,
>err_msg="Kafka server didn't finish startup in 
> %d seconds" % timeout_sec)
> {code}
> And the error message in broker log is shown below.
> {quote}
> kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for 
> connection while in state: CONNECTING
>   at 
> kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:262)
>   at kafka.zookeeper.ZooKeeperClient.(ZooKeeperClient.scala:119)
>   at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1880)
>   at kafka.server.KafkaServer.createZkClient$1(KafkaServer.scala:430)
>   at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:455)
>   at kafka.server.KafkaServer.startup(KafkaServer.scala:227)
>   at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
>   at kafka.Kafka$.main(Kafka.scala:82)
>   at kafka.Kafka.main(Kafka.scala)
> {quote}
> I'm surprised the default timeout of zk connection in system test is only 2 
> seconds as the default timeout in production is increased to 18s (see 
> https://github.com/apache/kafka/commit/4bde9bb3ccaf5571be76cb96ea051dadaeeaf5c7)
> {code}
> config_property.ZOOKEEPER_CONNECTION_TIMEOUT_MS: 2000
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #8981: KAFKA-10235 Fix flaky transactions_test.py

2020-07-08 Thread GitBox

junrao commented on a change in pull request #8981:
URL: https://github.com/apache/kafka/pull/8981#discussion_r451224908



##
File path: tests/kafkatest/tests/core/transactions_test.py
##
@@ -47,7 +47,10 @@ def __init__(self, test_context):
 self.num_output_partitions = 3
 self.num_seed_messages = 10
 self.transaction_size = 750
-self.transaction_timeout = 4
+# This is reducing the transaction cleanup interval.
+# IN hard_bounce mode, transaction is broke ungracefully. Hence, it 
produces unstable
+# offsets which obstructs TransactionalMessageCopier from receiving 
position of group.
+self.transaction_timeout = 5000

Review comment:
   @chia7712 : Thanks for the reply. That makes sense. Perhaps we can 
adjust the comment to make it clear that the hard bounce is for the client.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8981: KAFKA-10235 Fix flaky transactions_test.py

2020-07-08 Thread GitBox

junrao commented on pull request #8981:
URL: https://github.com/apache/kafka/pull/8981#issuecomment-655737852


   triggered a system test run



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JimGalasyn commented on a change in pull request #8995: Restore stream-table duality description

2020-07-08 Thread GitBox

JimGalasyn commented on a change in pull request #8995:
URL: https://github.com/apache/kafka/pull/8995#discussion_r451809936



##
File path: docs/streams/core-concepts.html
##
@@ -170,13 +150,59 @@ Duality of
   or to run interactive
 queries
   against your application's latest processing results. And, beyond its 
internal usage, the Kafka Streams API
   also allows developers to exploit this duality in their own applications.
-  
+
 
-  
+
   Before we discuss concepts such as aggregations
   in Kafka Streams, we must first introduce tables in 
more detail, and talk about the aforementioned stream-table duality.
-  Essentially, this duality means that a stream can be viewed as a table, 
and a table can be viewed as a stream.
-  
+  Essentially, this duality means that a stream can be viewed as a table, 
and a table can be viewed as a stream. Kafka's log compaction feature, for 
example, exploits this duality.
+
+
+
+A simple form of a table is a collection of key-value pairs, also 
called a map or associative array. Such a table may look as follows:
+
+
+
+The stream-table duality describes the close relationship between 
streams and tables.
+
+Stream as Table: A stream can be considered a changelog of 
a table, where each data record in the stream captures a state change of the 
table. A stream is thus a table in disguise, and it can be easily turned into a 
"real" table by replaying the changelog from beginning to end to reconstruct 
the table. Similarly, in a more general analogy, aggregating data records in a 
stream - such as computing the total number of pageviews by user from a stream 
of pageview events - will return a table (here with the key and the value being 
the user and its corresponding pageview count, respectively).
+Table as Stream: A table can be considered a snapshot, at a 
point in time, of the latest value for each key in a stream (a stream's data 
records are key-value pairs). A table is thus a stream in disguise, and it can 
be easily turned into a "real" stream by iterating over each key-value entry in 
the table.
+
+
+
+Let's illustrate this with an example. Imagine a table that tracks the 
total number of pageviews by user (first column of diagram below). Over time, 
whenever a new pageview event is processed, the state of the table is updated 
accordingly. Here, the state changes between different points in time - and 
different revisions of the table - can be represented as a changelog stream 
(second column).
+
+
+
+
+Interestingly, because of the stream-table duality, the same stream 
can be used to reconstruct the original table (third column):
+
+
+
+
+The same mechanism is used, for example, to replicate databases via 
change data capture (CDC) and, within Kafka Streams, to replicate its so-called 
state stores across machines for fault-tolerance.
+The stream-table duality is such an important concept that Kafka 
Streams models it explicitly via the KStream, 
KTable, and GlobalKTable interfaces.
+
+
+Aggregations
+
+An aggregation operation takes one input stream or 
table, and yields a new table by combining multiple input records into a single 
output record. Examples of aggregations are computing counts or sum.
+
+
+
+In the Kafka Streams DSL, an input stream of an 
aggregation can be a KStream or a KTable, but the output stream 
will always be a KTable. This allows Kafka Streams to update an aggregate value 
upon the out-of-order arrival of further records after the value was produced 
and emitted. When such out-of-order arrival happens, the aggregating KStream or 
KTable emits a new aggregate value. Because the output is a KTable, the new 
value is considered to overwrite the old value with the same key in subsequent 
processing steps.

Review comment:
   Sure, but that's the style throughout the Kafka docs. :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10249) In-memory stores are skipped when checkpointing but not skipped when reading the checkpoint

2020-07-08 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10249:
---

 Summary: In-memory stores are skipped when checkpointing but not 
skipped when reading the checkpoint
 Key: KAFKA-10249
 URL: https://issues.apache.org/jira/browse/KAFKA-10249
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.0
Reporter: Sophie Blee-Goldman
Assignee: Sophie Blee-Goldman


As the title suggests, offsets for in-memory stores (including the suppression 
buffer) are not written to the checkpoint file. However, when reading from the 
checkpoint file during task initialization, we do not check 
StateStore#persistent. We attempt to look up the offsets for in-memory stores 
in the checkpoint file, and obviously do not find them.

With eos we have to conclude that the existing state is dirty and thus throw a 
TaskCorruptedException. So pretty much any task with in-memory state will 
always hit this exception when reinitializing from the checkpoint, forcing it 
to clear the entire state directory and build up all of its state again from 
scratch (both persistent and in-memory).

This is especially unfortunate for KIP-441, as we will hit this any time a task 
is moved from one thread to another.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on a change in pull request #8928: KAFKA-10192: Wait for REST API to become available before testing blocked connectors

2020-07-08 Thread GitBox

kkonstantine commented on a change in pull request #8928:
URL: https://github.com/apache/kafka/pull/8928#discussion_r451832657



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
##
@@ -76,6 +78,15 @@ public void setup() {
 
 // start the clusters
 connect.start();
+
+// wait for the Connect REST API to become available. necessary 
because of the reduced REST
+// request timeout; otherwise, we may get an unexpected 500 with our 
first real REST request
+// if the worker is still getting on its feet.
+waitForCondition(

Review comment:
   Interesting observation. Of course, hitting the leader with a request 
doesn't tell you that other workers have started, so that's applicable in tests 
like this one, which start only one worker here, etc. This doesn't seem to be a 
race condition we encounter often, so I'm fine with an ad hoc specific fix here 
given the reduced timeout. I'd be surprised if it took noticeable time to load 
the services after the herder is submitted to its executor. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8928: KAFKA-10192: Wait for REST API to become available before testing blocked connectors

2020-07-08 Thread GitBox

kkonstantine commented on pull request #8928:
URL: https://github.com/apache/kafka/pull/8928#issuecomment-655764773


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-08 Thread GitBox

ableegoldman opened a new pull request #8996:
URL: https://github.com/apache/kafka/pull/8996


   We have this asymmetry in how the ProcessorStateManager handles in-memory 
stores: we explicitly skip over them when writing offsets to the checkpoint 
file, but don't do the same when reading from the checkpoint file to initialize 
offsets. With eos, this is taken to mean that the state is dirty, and thus we 
mistakenly mark the entire task as corrupted.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10250) Kafka broker shrinks the ISRs and disconnects from other brokers for few seconds

2020-07-08 Thread Rishank Chandra Puram (Jira)
Rishank Chandra Puram created KAFKA-10250:
-

 Summary: Kafka broker shrinks the ISRs and disconnects from other 
brokers for few seconds
 Key: KAFKA-10250
 URL: https://issues.apache.org/jira/browse/KAFKA-10250
 Project: Kafka
  Issue Type: Bug
  Components: build, config, consumer, controller, log, producer 
Affects Versions: 2.0.0
Reporter: Rishank Chandra Puram


The following the summary/overview of the whole issue. Can you please help us 
look into the below and let us know you thoughts on what caused the issue? And 
how to mitigate this in the future

Issue:
1.  All of a sudden all the other brokers in cluster report have issues 
with one of the broker as below

Error for good broker [broker: broker2, brokerID: 1030]
[2020-06-27 16:14:53,367] WARN [ReplicaFetcher replicaId=1030, leaderId=1029, 
fetcherId=13] Error in response for fetch request (type=FetchRequest, 
replicaId=1030, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1901394956, 
epoch=1018699)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 1029 was disconnected before the response 
was read
at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:240)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:43)
at 
kafka.server.AbstractFetcherThread.prabcssFetchRequest(AbstractFetcherThread.scala:149)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:114)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
 
Error from affected broker [broker: broker6, brokerID: 1029]
[2020-06-27 16:14:25,744] INFO [Partition topic_test2-33 broker=1029] Shrinking 
ISR from 1025,1029,1030 to 1025,1029 (kafka.cluster.Partition)
[2020-06-27 16:14:25,752] INFO [Partition topic_a_restate-34 broker=1029] 
Shrinking ISR from 1027,1029,1028 to 1029 (kafka.cluster.Partition)
[2020-06-27 16:14:25,760] INFO [Partition topic_f_restate-39 broker=1029] 
Shrinking ISR from 1026,1029,1025 to 1029,1025 (kafka.cluster.Partition)
[2020-06-27 16:14:25,772] INFO [Partition topic_test2-16 broker=1029] Shrinking 
ISR from 1028,1029,1030 to 1029,1030 (kafka.cluster.Partition)
[2020-06-27 16:14:26,683] INFO [ProducerStateManager partition=topic_abc_f-21] 
Writing producer snapshot at offset 1509742173 (kafka.log.ProducerStateManager)
[2020-06-27 16:14:26,683] INFO [Log partition=topic_abc_f-21, 
dir=/hadoop-e/kafka/data1] Rolled new log segment at offset 1509742173 in 1 ms. 
(kafka.log.Log)
[2020-06-27 16:14:55,375] INFO [Partition topic_test2-33 broker=1029] Expanding 
ISR from 1025,1029 to 1025,1029,1030 (kafka.cluster.Partition)
[2020-06-27 16:14:55,387] INFO [Partition test-topic-analysis-9 broker=1029] 
Expanding ISR from 1028,1029 to 1028,1029,1030 (kafka.cluster.Partition)
 
2.  Entire kafka cluster becomes stable within few minutes
 
Trace for good broker [broker: broker2, brokerID: 1030]
[2020-06-27 16:20:14,861] INFO Deleted time index 
/hadoop-g/kafka/data1/topic-analysis-0/009100172512.timeindex.deleted. 
(kafka.log.LogSegment)
[2020-06-27 16:20:14,882] INFO [Log partition=topic-analysis-4, 
dir=/hadoop-b/kafka/data1] Deleting segment 9100010843 (kafka.log.Log)
[2020-06-27 16:20:14,883] INFO Deleted log 
/hadoop-b/kafka/data1/topic-analysis-4/009100010843.log.deleted. 
(kafka.log.LogSegment)
[2020-06-27 16:20:14,897] INFO Deleted offset index 
/hadoop-b/kafka/data1/topic-analysis-4/009100010843.index.deleted. 
(kafka.log.LogSegment)
[2020-06-27 16:20:14,897] INFO Deleted time index 
/hadoop-b/kafka/data1/topic-analysis-4/009100010843.timeindex.deleted. 
(kafka.log.LogSegment)
 
Trace from affected broker [broker: broker6, brokerID: 1029]
[2020-06-27 16:21:01,552] INFO [Log partition=topic-analysis-22, 
dir=/hadoop-i/kafka/data1] Scheduling log segment [baseOffset 9099830344, size 
1073733482] for deletion. (kafka.log.Log)
[2020-06-27 16:21:01,553] INFO [Log partition=topic-analysis-22, 
dir=/hadoop-i/kafka/data1] Scheduling log segment [baseOffset 9100082067, size 
1073738790] for deletion. (kafka.log.Log)
[2020-06-27 16:21:01,553] INFO [Log partition=topic-analysis-22, 
dir=/hadoop-i/kafka/data1] Incrementing log start offset to 9100334713 
(kafka.log.Log)
[2020-06-27 16:21:01,581] INFO Cleared earliest 0 entries from epoch cache 
based on passed offset 9100334713 leaving 1 in EpochFile for partition 
topic-analysis-22 (kafka.server.epoch.LeaderEpochFileCache)
[2020-06-27 16:22:00,175] INFO [Log partition=topic_def_c-1, 
dir=/hadoop-j/kafka/data1] Deleting segment 1628853412 (kafka.log.L

[jira] [Commented] (KAFKA-10250) Kafka broker shrinks the ISRs and disconnects from other brokers for few seconds

2020-07-08 Thread Rishank Chandra Puram (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154043#comment-17154043
 ] 

Rishank Chandra Puram commented on KAFKA-10250:
---

1. When the issue occurs, they're able to produce/consume messages to/from 
Kafka using a simple command line.
2. The issue occurs suddenly in broker 1029 but also could occur in other 
broker, this behavior it's completely random, as we had issues with two 
different brokers at different times

[2020-06-27 16:14:23,800] INFO [GroupCoordinator 1029]: Member 
consumer-1-3dfeffb9-c8ca-4840-878e-3126e9750015 in group authapiprod02062020 
has failed, removing it from the group 
(kafka.coordinator.group.GroupCoordinator)
[2020-06-27 16:14:23,802] INFO [GroupCoordinator 1029]: Stabilized group 
authapiprod02062020 generation 178 (__consumer_offsets-40) 
(kafka.coordinator.group.GroupCoordinator)
[2020-06-27 16:14:23,805] INFO [GroupCoordinator 1029]: Assignment received 
from leader for group authapiprod02062020 for generation 178 
(kafka.coordinator.group.GroupCoordinator)
[2020-06-27 16:14:25,494] INFO [Partition topic_rejected_test-30 broker=1029] 
Shrinking ISR from 1025,1029,1030 to 1025,1029 (kafka.cluster.Partition)

After 30 seconds we see expanding messages:

[2020-06-27 16:14:25,772] INFO [Partition topic_test2-16 broker=1029] Shrinking 
ISR from 1028,1029,1030 to 1029,1030 (kafka.cluster.Partition)
[2020-06-27 16:14:55,375] INFO [Partition topic_analysis-33 broker=1029] 
Expanding ISR from 1025,1029 to 1025,1029,1030 (kafka.cluster.Partition)
[2020-06-27 16:14:55,387] INFO [Partition topic_rejected-9 broker=1029] 
Expanding ISR from 1028,1029 to 1028,1029,1030 (kafka.cluster.Partition)


We reviewed during the issue: 

- lsof --> 75000 of 20
- No high CPU found in grafana during the issue
- Thread dumps are not showing any stuck threads
- iostat looks fine
- During the issue we check for --under-replicated-partitions but after the 
broker 1029 it's expanding, no under replicated partitions seen
- We reviewed gc log files and the memory looks fine, we currently have 12g 
configured and the liveset is about 4g
- netstat output is not showing any abnormal behavior, this is from broker 1029:

50659539242 total packets received
52633502320 requests sent out
88095112 segments retransmited
0 receive buffer errors
0 send buffer errors
379 resets received for embryonic SYN_RECV sockets
629655 TCP sockets finished time wait in fast timer
550847 delayed acks further delayed because of locked socket
11151 times the listen queue of a socket overflowed
11151 SYNs to LISTEN sockets dropped

> Kafka broker shrinks the ISRs and disconnects from other brokers for few 
> seconds
> 
>
> Key: KAFKA-10250
> URL: https://issues.apache.org/jira/browse/KAFKA-10250
> Project: Kafka
>  Issue Type: Bug
>  Components: build, config, consumer, controller, log, producer 
>Affects Versions: 2.0.0
>Reporter: Rishank Chandra Puram
>Priority: Major
>
> The following the summary/overview of the whole issue. Can you please help us 
> look into the below and let us know you thoughts on what caused the issue? 
> And how to mitigate this in the future
> Issue:
> 1.All of a sudden all the other brokers in cluster report have issues 
> with one of the broker as below
> 
> Error for good broker [broker: broker2, brokerID: 1030]
> [2020-06-27 16:14:53,367] WARN [ReplicaFetcher replicaId=1030, leaderId=1029, 
> fetcherId=13] Error in response for fetch request (type=FetchRequest, 
> replicaId=1030, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1901394956, 
> epoch=1018699)) (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 1029 was disconnected before the response 
> was read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:240)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:43)
> at 
> kafka.server.AbstractFetcherThread.prabcssFetchRequest(AbstractFetcherThread.scala:149)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:114)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>  
> Error from affected broker [broker: broker6, brokerID: 1029]
> [2020-06-27 16:14:25,744] INFO [Partition topic_test2-33 broker=1029] 
> Shrinking ISR from 1025,1029,1030 to 1025,1029 (kafka.cluster.Partition)
> [2020-06-27 16:14:25,7

[jira] [Updated] (KAFKA-10250) Kafka broker shrinks the ISRs and disconnects from other brokers for few seconds

2020-07-08 Thread Rishank Chandra Puram (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-10250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rishank Chandra Puram updated KAFKA-10250:
--
Description: 
The following the summary/overview of the whole issue. Can you please help us 
look into the below and let us know you thoughts on what caused the issue? And 
how to mitigate this in the future

Issue:
1.  All of a sudden all the other brokers in cluster report have issues 
with one of the broker as below

Error for good broker [broker: broker2, brokerID: 1030]
[2020-06-27 16:14:53,367] WARN [ReplicaFetcher replicaId=1030, leaderId=1029, 
fetcherId=13] Error in response for fetch request (type=FetchRequest, 
replicaId=1030, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1901394956, 
epoch=1018699)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 1029 was disconnected before the response 
was read
at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:240)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:43)
at 
kafka.server.AbstractFetcherThread.prabcssFetchRequest(AbstractFetcherThread.scala:149)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:114)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
 
Error from affected broker [broker: broker6, brokerID: 1029]
[2020-06-27 16:14:25,744] INFO [Partition topic_test2-33 broker=1029] Shrinking 
ISR from 1025,1029,1030 to 1025,1029 (kafka.cluster.Partition)
[2020-06-27 16:14:25,752] INFO [Partition topic_a_restate-34 broker=1029] 
Shrinking ISR from 1027,1029,1028 to 1029 (kafka.cluster.Partition)
[2020-06-27 16:14:25,760] INFO [Partition topic_f_restate-39 broker=1029] 
Shrinking ISR from 1026,1029,1025 to 1029,1025 (kafka.cluster.Partition)
[2020-06-27 16:14:25,772] INFO [Partition topic_test2-16 broker=1029] Shrinking 
ISR from 1028,1029,1030 to 1029,1030 (kafka.cluster.Partition)
[2020-06-27 16:14:26,683] INFO [ProducerStateManager partition=topic_abc_f-21] 
Writing producer snapshot at offset 1509742173 (kafka.log.ProducerStateManager)
[2020-06-27 16:14:26,683] INFO [Log partition=topic_abc_f-21, 
dir=/hadoop-e/kafka/data1] Rolled new log segment at offset 1509742173 in 1 ms. 
(kafka.log.Log)
[2020-06-27 16:14:55,375] INFO [Partition topic_test2-33 broker=1029] Expanding 
ISR from 1025,1029 to 1025,1029,1030 (kafka.cluster.Partition)
[2020-06-27 16:14:55,387] INFO [Partition test-topic-analysis-9 broker=1029] 
Expanding ISR from 1028,1029 to 1028,1029,1030 (kafka.cluster.Partition)
 
2.  Entire kafka cluster becomes stable within few minutes
 
Trace for good broker [broker: broker2, brokerID: 1030]
[2020-06-27 16:20:14,861] INFO Deleted time index 
/hadoop-g/kafka/data1/topic-analysis-0/009100172512.timeindex.deleted. 
(kafka.log.LogSegment)
[2020-06-27 16:20:14,882] INFO [Log partition=topic-analysis-4, 
dir=/hadoop-b/kafka/data1] Deleting segment 9100010843 (kafka.log.Log)
[2020-06-27 16:20:14,883] INFO Deleted log 
/hadoop-b/kafka/data1/topic-analysis-4/009100010843.log.deleted. 
(kafka.log.LogSegment)
[2020-06-27 16:20:14,897] INFO Deleted offset index 
/hadoop-b/kafka/data1/topic-analysis-4/009100010843.index.deleted. 
(kafka.log.LogSegment)
[2020-06-27 16:20:14,897] INFO Deleted time index 
/hadoop-b/kafka/data1/topic-analysis-4/009100010843.timeindex.deleted. 
(kafka.log.LogSegment)
 
Trace from affected broker [broker: broker6, brokerID: 1029]
[2020-06-27 16:21:01,552] INFO [Log partition=topic-analysis-22, 
dir=/hadoop-i/kafka/data1] Scheduling log segment [baseOffset 9099830344, size 
1073733482] for deletion. (kafka.log.Log)
[2020-06-27 16:21:01,553] INFO [Log partition=topic-analysis-22, 
dir=/hadoop-i/kafka/data1] Scheduling log segment [baseOffset 9100082067, size 
1073738790] for deletion. (kafka.log.Log)
[2020-06-27 16:21:01,553] INFO [Log partition=topic-analysis-22, 
dir=/hadoop-i/kafka/data1] Incrementing log start offset to 9100334713 
(kafka.log.Log)
[2020-06-27 16:21:01,581] INFO Cleared earliest 0 entries from epoch cache 
based on passed offset 9100334713 leaving 1 in EpochFile for partition 
topic-analysis-22 (kafka.server.epoch.LeaderEpochFileCache)
[2020-06-27 16:22:00,175] INFO [Log partition=topic_def_c-1, 
dir=/hadoop-j/kafka/data1] Deleting segment 1628853412 (kafka.log.Log)
[2020-06-27 16:22:00,175] INFO Deleted log 
/hadoop-j/kafka/data1/topic_def_c-1/001628853412.log.deleted. 
(kafka.log.LogSegment)
 
3.  Once the disconnect would happen all the producer jobs will fail 
stating "Caused by: org.apache.kafka.common.errors

[GitHub] [kafka] mjsax commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-08 Thread GitBox

mjsax commented on pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#issuecomment-655783477


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10250) Kafka broker shrinks the ISRs and disconnects from other brokers for few seconds

2020-07-08 Thread Rishank Chandra Puram (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154043#comment-17154043
 ] 

Rishank Chandra Puram edited comment on KAFKA-10250 at 7/8/20, 10:09 PM:
-

1. When the issue occurs and once broker connects back with other broker, we're 
able to produce/consume messages to/from Kafka using a simple command 
line.[even though the affected broker is not restarted]
2. The issue occurs suddenly in broker 1029 but also could occur in other 
broker, this behavior it's completely random, as we had issues with two 
different brokers at different times

[2020-06-27 16:14:23,800] INFO [GroupCoordinator 1029]: Member 
consumer-1-3dfeffb9-c8ca-4840-878e-3126e9750015 in group authapiprod02062020 
has failed, removing it from the group 
(kafka.coordinator.group.GroupCoordinator)
[2020-06-27 16:14:23,802] INFO [GroupCoordinator 1029]: Stabilized group 
authapiprod02062020 generation 178 (__consumer_offsets-40) 
(kafka.coordinator.group.GroupCoordinator)
[2020-06-27 16:14:23,805] INFO [GroupCoordinator 1029]: Assignment received 
from leader for group authapiprod02062020 for generation 178 
(kafka.coordinator.group.GroupCoordinator)
[2020-06-27 16:14:25,494] INFO [Partition topic_rejected_test-30 broker=1029] 
Shrinking ISR from 1025,1029,1030 to 1025,1029 (kafka.cluster.Partition)

After 30 seconds we see expanding messages:

[2020-06-27 16:14:25,772] INFO [Partition topic_test2-16 broker=1029] Shrinking 
ISR from 1028,1029,1030 to 1029,1030 (kafka.cluster.Partition)
[2020-06-27 16:14:55,375] INFO [Partition topic_analysis-33 broker=1029] 
Expanding ISR from 1025,1029 to 1025,1029,1030 (kafka.cluster.Partition)
[2020-06-27 16:14:55,387] INFO [Partition topic_rejected-9 broker=1029] 
Expanding ISR from 1028,1029 to 1028,1029,1030 (kafka.cluster.Partition)


We reviewed during the issue: 

- lsof --> 75000 of 20
- No high CPU found in grafana during the issue
- Thread dumps are not showing any stuck threads
- iostat looks fine
- During the issue we check for --under-replicated-partitions but after the 
broker 1029 it's expanding, no under replicated partitions seen
- We reviewed gc log files and the memory looks fine, we currently have 12g 
configured and the liveset is about 4g
- netstat output is not showing any abnormal behavior, this is from broker 1029:

50659539242 total packets received
52633502320 requests sent out
88095112 segments retransmited
0 receive buffer errors
0 send buffer errors
379 resets received for embryonic SYN_RECV sockets
629655 TCP sockets finished time wait in fast timer
550847 delayed acks further delayed because of locked socket
11151 times the listen queue of a socket overflowed
11151 SYNs to LISTEN sockets dropped


was (Author: rishankchandra):
1. When the issue occurs, they're able to produce/consume messages to/from 
Kafka using a simple command line.
2. The issue occurs suddenly in broker 1029 but also could occur in other 
broker, this behavior it's completely random, as we had issues with two 
different brokers at different times

[2020-06-27 16:14:23,800] INFO [GroupCoordinator 1029]: Member 
consumer-1-3dfeffb9-c8ca-4840-878e-3126e9750015 in group authapiprod02062020 
has failed, removing it from the group 
(kafka.coordinator.group.GroupCoordinator)
[2020-06-27 16:14:23,802] INFO [GroupCoordinator 1029]: Stabilized group 
authapiprod02062020 generation 178 (__consumer_offsets-40) 
(kafka.coordinator.group.GroupCoordinator)
[2020-06-27 16:14:23,805] INFO [GroupCoordinator 1029]: Assignment received 
from leader for group authapiprod02062020 for generation 178 
(kafka.coordinator.group.GroupCoordinator)
[2020-06-27 16:14:25,494] INFO [Partition topic_rejected_test-30 broker=1029] 
Shrinking ISR from 1025,1029,1030 to 1025,1029 (kafka.cluster.Partition)

After 30 seconds we see expanding messages:

[2020-06-27 16:14:25,772] INFO [Partition topic_test2-16 broker=1029] Shrinking 
ISR from 1028,1029,1030 to 1029,1030 (kafka.cluster.Partition)
[2020-06-27 16:14:55,375] INFO [Partition topic_analysis-33 broker=1029] 
Expanding ISR from 1025,1029 to 1025,1029,1030 (kafka.cluster.Partition)
[2020-06-27 16:14:55,387] INFO [Partition topic_rejected-9 broker=1029] 
Expanding ISR from 1028,1029 to 1028,1029,1030 (kafka.cluster.Partition)


We reviewed during the issue: 

- lsof --> 75000 of 20
- No high CPU found in grafana during the issue
- Thread dumps are not showing any stuck threads
- iostat looks fine
- During the issue we check for --under-replicated-partitions but after the 
broker 1029 it's expanding, no under replicated partitions seen
- We reviewed gc log files and the memory looks fine, we currently have 12g 
configured and the liveset is about 4g
- netstat output is not showing any abnormal behavior, this is from broker 1029:

50659539242 total packets received
52

[GitHub] [kafka] rhauch commented on a change in pull request #8864: KAFKA-9274: Mark `retries` config as deprecated and add new `task.timeout.ms` config

2020-07-08 Thread GitBox

rhauch commented on a change in pull request #8864:
URL: https://github.com/apache/kafka/pull/8864#discussion_r451856474



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
##
@@ -148,6 +148,7 @@ public KafkaStatusBackingStore(Time time, Converter 
converter) {
 this.statusTopic = statusTopic;
 }
 
+@SuppressWarnings("deprecation")

Review comment:
   > Not sure if we should rewrite this test right away
   
   First of all, this is not test code. It's actually part of the Connect 
runtime.
   
   Second, I'd have to look into what we would do differently and how we would 
set `deliver.timeout.ms` to work with the existing logic that handles retries. 
Since it's not clear, I suggest we keep the current behavior of setting 
`retries=0` as long as that still function. We're using the 
`ProducerConfig.RETRIES` constant, so that can't be removed without changing 
this code.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8994: KAFKA-10247: Skip processing if task isn't running

2020-07-08 Thread GitBox

vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r451860750



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -766,6 +769,24 @@ void runOnce() {
 return records;
 }
 
+private OffsetResetStrategy getResetStrategy(final TopicPartition 
partition) {
+if 
(builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
+return OffsetResetStrategy.EARLIEST;
+} else if 
(builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
+return OffsetResetStrategy.LATEST;
+} else {
+if (originalReset == null || (!originalReset.equals("earliest") && 
!originalReset.equals("latest"))) {
+return OffsetResetStrategy.EARLIEST;
+}

Review comment:
   I honestly couldn't figure out what is the default default default reset 
strategy... It seems (from the behavior of the test when we first start up) 
that if there's no strategy set, and no committed offset, then the client 
starts at the beginning, but the ClientConfig has the default policy as 
"latest"... What gives?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -193,6 +197,28 @@ private void closeAndRevive(final Map> taskWith
 log.error("Error suspending corrupted task {} ", task.id(), 
swallow);
 }
 task.closeDirty();
+// Pause so we won't poll any more records for this task until it 
has been re-initialized
+// Note, closeDirty already clears the partitiongroup for the task.
+mainConsumer().pause(task.inputPartitions());
+final Map committed = 
mainConsumer().committed(task.inputPartitions());
+for (final TopicPartition topicPartition : task.inputPartitions()) 
{
+final OffsetAndMetadata offsetAndMetadata = 
committed.get(topicPartition);
+if (offsetAndMetadata == null) {
+final OffsetResetStrategy strategy = 
resetStrategy.apply(topicPartition);
+switch (strategy) {
+case EARLIEST:
+
mainConsumer().seekToBeginning(Collections.singleton(topicPartition));
+break;
+case LATEST:
+
mainConsumer().seekToBeginning(Collections.singleton(topicPartition));
+break;
+default:
+throw new IllegalArgumentException("Unexpected 
reset strategy: " + strategy);
+}
+} else {
+mainConsumer().seek(topicPartition, offsetAndMetadata);
+}
+}

Review comment:
   This might be the worst thing I've ever proposed for AK... I can't 
figure out a better way to just "reset" the offset.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8994: KAFKA-10247: Skip processing if task isn't running

2020-07-08 Thread GitBox

ableegoldman commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r451862588



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -766,6 +769,24 @@ void runOnce() {
 return records;
 }
 
+private OffsetResetStrategy getResetStrategy(final TopicPartition 
partition) {
+if 
(builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
+return OffsetResetStrategy.EARLIEST;
+} else if 
(builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
+return OffsetResetStrategy.LATEST;
+} else {
+if (originalReset == null || (!originalReset.equals("earliest") && 
!originalReset.equals("latest"))) {
+return OffsetResetStrategy.EARLIEST;
+}

Review comment:
   Does Streams override the client default..?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8994: KAFKA-10247: Skip processing if task isn't running

2020-07-08 Thread GitBox

vvcephei commented on pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#issuecomment-655792712


   Ok, @abbccdda and @mjsax , I think this is ready for a review now. I'm not 
very happy with the logic to reset to the last committed offset. Can you think 
of a better way to do it?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #8977: KAFKA-10162; Quota Enhancements (KIP-599)

2020-07-08 Thread GitBox

apovzner commented on a change in pull request #8977:
URL: https://github.com/apache/kafka/pull/8977#discussion_r451871529



##
File path: clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
##
@@ -97,7 +97,25 @@ public static RecordingLevel forName(String name) {
 public boolean shouldRecord(final int configId) {
 return configId == DEBUG.id || configId == this.id;
 }
+}
 
+public enum QuotaEnforcementType {
+/**
+ * The quota is not enforced.
+ */
+NONE,
+
+/**
+ * The quota is enforced before updating the sensor. An update
+ * is rejected if the quota is already exhausted.

Review comment:
   nit: somehow the first sentence is confusing because it seem to imply 
that the sensor gets updated in this case as well. Do you think maybe this 
sounds a bit clearer? "The check for quota violation is done prior to recording 
the value. If the quota is already exhausted, the value is not recorded into 
the sensor." Or perhaps some other  Up to you. 
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8994: KAFKA-10247: Skip processing if task isn't running

2020-07-08 Thread GitBox

ableegoldman commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r451884449



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -95,7 +96,7 @@
  *  |  | Assigned (3)| <+
  *  |  +-+---+  |
  *  ||  |
- *  ||  |
+ *  ||--+

Review comment:
   What's up with this?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -193,6 +197,28 @@ private void closeAndRevive(final Map> taskWith
 log.error("Error suspending corrupted task {} ", task.id(), 
swallow);
 }
 task.closeDirty();
+// Pause so we won't poll any more records for this task until it 
has been re-initialized
+// Note, closeDirty already clears the partitiongroup for the task.
+mainConsumer().pause(task.inputPartitions());
+final Map committed = 
mainConsumer().committed(task.inputPartitions());
+for (final TopicPartition topicPartition : task.inputPartitions()) 
{
+final OffsetAndMetadata offsetAndMetadata = 
committed.get(topicPartition);
+if (offsetAndMetadata == null) {
+final OffsetResetStrategy strategy = 
resetStrategy.apply(topicPartition);
+switch (strategy) {
+case EARLIEST:
+
mainConsumer().seekToBeginning(Collections.singleton(topicPartition));
+break;
+case LATEST:
+
mainConsumer().seekToBeginning(Collections.singleton(topicPartition));

Review comment:
   Should this be `seekToEnd` ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest

2020-07-08 Thread GitBox

ableegoldman commented on pull request #8913:
URL: https://github.com/apache/kafka/pull/8913#issuecomment-655815316


   Hey @vvcephei , do you think we should ask about cherrypicking this back to 
2.6 to help stabilize the release? I thought failing system tests (or regularly 
failing tests of any kind) automatically block a release because it needs to 
get a green build to proceed. But I'm not familiar with the release/RM 
procedure. Maybe it's alright to just confirm that the test is failing because 
of the test itself?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-08 Thread GitBox

ableegoldman commented on pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#issuecomment-655825204


   Two failures:
   `TransactionsBounceTest.testWithGroupMetadata`
   `SmokeTestDriverIntegrationTest.shouldWorkWithRebalance`
   
   The `shouldWorkWithRebalance` failure is actually pretty concerning, but I 
can't imagine how it could. possibly be related to this PR specifically. I'll 
see if I can reproduce it locally...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Skip processing if task isn't running

2020-07-08 Thread GitBox

mjsax commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r451895176



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -193,6 +197,28 @@ private void closeAndRevive(final Map> taskWith
 log.error("Error suspending corrupted task {} ", task.id(), 
swallow);
 }
 task.closeDirty();
+// Pause so we won't poll any more records for this task until it 
has been re-initialized
+// Note, closeDirty already clears the partitiongroup for the task.
+mainConsumer().pause(task.inputPartitions());
+final Map committed = 
mainConsumer().committed(task.inputPartitions());
+for (final TopicPartition topicPartition : task.inputPartitions()) 
{
+final OffsetAndMetadata offsetAndMetadata = 
committed.get(topicPartition);
+if (offsetAndMetadata == null) {
+final OffsetResetStrategy strategy = 
resetStrategy.apply(topicPartition);
+switch (strategy) {
+case EARLIEST:
+
mainConsumer().seekToBeginning(Collections.singleton(topicPartition));
+break;
+case LATEST:
+
mainConsumer().seekToBeginning(Collections.singleton(topicPartition));
+break;
+default:
+throw new IllegalArgumentException("Unexpected 
reset strategy: " + strategy);
+}
+} else {
+mainConsumer().seek(topicPartition, offsetAndMetadata);
+}
+}

Review comment:
   Why do you think is bad? That is just how the API works... Cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L769-L802
 that does the same thing.
   
   What make we wonder, if we can share common code for both cases?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -137,7 +138,7 @@
 STARTING(2, 3, 5),// 1
 PARTITIONS_REVOKED(2, 3, 5),  // 2
 PARTITIONS_ASSIGNED(2, 3, 4, 5),  // 3
-RUNNING(2, 3, 5), // 4
+RUNNING(2, 3, 4, 5),  // 4

Review comment:
   Why do we need this?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -766,6 +769,24 @@ void runOnce() {
 return records;
 }
 
+private OffsetResetStrategy getResetStrategy(final TopicPartition 
partition) {
+if 
(builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
+return OffsetResetStrategy.EARLIEST;
+} else if 
(builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
+return OffsetResetStrategy.LATEST;
+} else {
+if (originalReset == null || (!originalReset.equals("earliest") && 
!originalReset.equals("latest"))) {
+return OffsetResetStrategy.EARLIEST;
+}

Review comment:
   > Does Streams override the client default..?
   
   Yes. The client default is "latest" but we use "earliest" by default (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L857).
 Of course, users can also change the default via StreamsConfig.
   
   Note that the consumer client can only apply a single strategy to all topics 
it subscribed to. Hence, if all topics use the same reset policy, we can rely 
on the consumer configures policy. However, if users specify different reset 
policies in their code via `Consumed` for individual topics, the consumer is 
re-configured to use "none" (cf. 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L362-L366)
 and we do a manual seekToBeginning/seekToEnd according to the user define 
strategy for the corresponding topic (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L762-L764)
 because we need to make a per-topic decision that the consumer cannot make for 
us.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -766,6 +769,24 @@ void runOnce() {
 return records;
 }
 
+private OffsetResetStrategy getResetStrategy(final TopicPartition 
partition) {

Review comment:
   Wondering if we should reuse this method within 
`StreamThread#resetInvalidOffsets`?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -1140,4 +1166,1

[GitHub] [kafka] chia7712 commented on a change in pull request #8981: KAFKA-10235 Fix flaky transactions_test.py

2020-07-08 Thread GitBox

chia7712 commented on a change in pull request #8981:
URL: https://github.com/apache/kafka/pull/8981#discussion_r451898731



##
File path: tests/kafkatest/tests/core/transactions_test.py
##
@@ -47,7 +47,10 @@ def __init__(self, test_context):
 self.num_output_partitions = 3
 self.num_seed_messages = 10
 self.transaction_size = 750
-self.transaction_timeout = 4
+# This is reducing the transaction cleanup interval.
+# IN hard_bounce mode, transaction is broke ungracefully. Hence, it 
produces unstable
+# offsets which obstructs TransactionalMessageCopier from receiving 
position of group.
+self.transaction_timeout = 5000

Review comment:
   > we can adjust the comment to make it clear that the hard bounce is for 
the client.
   
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-08 Thread GitBox

mjsax commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r451899726



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -224,6 +224,9 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
 for (final StateStoreMetadata store : stores.values()) {
 if (store.changelogPartition == null) {
 log.info("State store {} is not logged and hence would not 
be restored", store.stateStore.name());
+} else if (!store.stateStore.persistent()) {
+log.info("Initializing to the starting offset for 
changelog {} of in-memory state store {}",
+ store.changelogPartition, 
store.stateStore.name());

Review comment:
   Don't we need to call `store.setOffset(null)` or this case?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-08 Thread GitBox

mjsax commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r451900686



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -224,6 +224,9 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
 for (final StateStoreMetadata store : stores.values()) {
 if (store.changelogPartition == null) {
 log.info("State store {} is not logged and hence would not 
be restored", store.stateStore.name());
+} else if (!store.stateStore.persistent()) {
+log.info("Initializing to the starting offset for 
changelog {} of in-memory state store {}",
+ store.changelogPartition, 
store.stateStore.name());

Review comment:
   Just wondering about 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L769-L802
 (can't comment below).
   
   For this case, if should hold that `store.offset() == 
changelogOffsetFromCheckpointedOffset(loadedCheckpoints.remove(store.changelogPartition))`
 ?
   
   Or maybe `>=`?
   
   Should we add a sanity check? (Not related to this PR itself actually. -- 
Just wondering.)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-08 Thread GitBox

mjsax commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r451901975



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##
@@ -784,7 +784,7 @@ public void close() {
 }
 
 @Test
-public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws 
IOException {
+public void 
shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws 
IOException {
 final long checkpointOffset = 10L;
 
 final Map offsets = mkMap(

Review comment:
   Not sure if I understand the test: don't we write a checkpoint in L795 
for the persistent store?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-08 Thread GitBox

mjsax commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r451902131



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##
@@ -813,6 +813,62 @@ public void 
shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws IOExce
 }
 }
 
+@Test
+public void 
shouldNotThrowTaskCorruptedWithWithoutInMemoryStoreCheckpointAndNonEmptyDir() 
throws IOException {

Review comment:
   `WithWithout` ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-08 Thread GitBox

mjsax commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r451903126



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,24 +45,23 @@
 protected ProcessorNode currentNode;
 private long currentSystemTimeMs;
 
-final StateManager stateManager;

Review comment:
   Not sure why we need to move this from the abstract class into the child 
classes?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -103,8 +104,12 @@ public void init(final ProcessorContext context,
 
 @SuppressWarnings("unchecked")
 void initStoreSerde(final ProcessorContext context) {
+final String storeName = name();
+final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName);
 serdes = new StateSerdes<>(
-ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name()),
+ changelogTopic != null ?
+changelogTopic :
+
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),

Review comment:
   It seems we don't need this if-then-else any longer as it's already 
taken care of within `ProcessorContextUtils.changelogFor`? (Similar for other 
classes.)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -73,22 +73,23 @@
 private final Set globalNonPersistentStoresTopics = new 
HashSet<>();
 private final OffsetCheckpoint checkpointFile;
 private final Map checkpointFileCache;
+private final Map storeToChangelogTopic;
 
 public GlobalStateManagerImpl(final LogContext logContext,
   final ProcessorTopology topology,
   final Consumer 
globalConsumer,
   final StateDirectory stateDirectory,
   final StateRestoreListener 
stateRestoreListener,
   final StreamsConfig config) {
+storeToChangelogTopic = topology.storeToChangelogTopic();

Review comment:
   Wondering if we should pass `storeToChangelogTopic()` and 
`.globalStateStores()` into the constructor instead of `ProcessorTopology` ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax closed pull request #8855: DO NOT MERGE

2020-07-08 Thread GitBox

mjsax closed pull request #8855:
URL: https://github.com/apache/kafka/pull/8855


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata

2020-07-08 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10251:
---

 Summary: Flaky Test 
kafka.api.TransactionsBounceTest.testWithGroupMetadata
 Key: KAFKA-10251
 URL: https://issues.apache.org/jira/browse/KAFKA-10251
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Sophie Blee-Goldman


h3. Stacktrace

org.scalatest.exceptions.TestFailedException: Consumed 0 records before timeout 
instead of the expected 200 records at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at 
org.scalatest.Assertions.fail(Assertions.scala:1091) at 
org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at 
kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109)

 

 

The logs are pretty much just this on repeat:
{code:java}
[2020-07-08 23:41:04,034] ERROR Error when sending message to topic 
output-topic with key: 9955, value: 9955 with error: 
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
org.apache.kafka.common.KafkaException: Failing batch since transaction was 
aborted at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
 at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error 
when sending message to topic output-topic with key: 9959, value: 9959 with 
error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
org.apache.kafka.common.KafkaException: Failing batch since transaction was 
aborted at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
 at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata

2020-07-08 Thread Sophie Blee-Goldman (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154110#comment-17154110
 ] 

Sophie Blee-Goldman commented on KAFKA-10251:
-

cc [~bchen225242] – 
[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3318/testReport/junit/kafka.api/TransactionsBounceTest/testWithGroupMetadata/]

> Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
> -
>
> Key: KAFKA-10251
> URL: https://issues.apache.org/jira/browse/KAFKA-10251
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> h3. Stacktrace
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 200 records at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) 
> at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) 
> at org.scalatest.Assertions.fail(Assertions.scala:1091) at 
> org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
> org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
> kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at 
> kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109)
>  
>  
> The logs are pretty much just this on repeat:
> {code:java}
> [2020-07-08 23:41:04,034] ERROR Error when sending message to topic 
> output-topic with key: 9955, value: 9955 with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error 
> when sending message to topic output-topic with key: 9959, value: 9959 with 
> error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang opened a new pull request #8997: MINOR: Improve log4j for per-consumer assignment

2020-07-08 Thread GitBox

guozhangwang opened a new pull request #8997:
URL: https://github.com/apache/kafka/pull/8997


   Add log4j entry summarizing the assignment (prev owned and assigned) at the 
consumer level.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8997: MINOR: Improve log4j for per-consumer assignment

2020-07-08 Thread GitBox

guozhangwang commented on pull request #8997:
URL: https://github.com/apache/kafka/pull/8997#issuecomment-655841394


   @ableegoldman @abbccdda 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8997: MINOR: Improve log4j for per-consumer assignment

2020-07-08 Thread GitBox

guozhangwang commented on a change in pull request #8997:
URL: https://github.com/apache/kafka/pull/8997#discussion_r451912298



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##
@@ -66,32 +72,17 @@ public ClientState() {
 prevActiveTasks = new TreeSet<>();
 prevStandbyTasks = new TreeSet<>();
 consumerToPreviousStatefulTaskIds = new TreeMap<>();
+consumerToPreviousActiveTaskIds = new TreeMap<>();
+consumerToAssignedActiveTaskIds = new TreeMap<>();
+consumerToAssignedStandbyTaskIds = new TreeMap<>();
+consumerToRevokingActiveTaskIds = new TreeMap<>();
 ownedPartitions = new TreeMap<>(TOPIC_PARTITION_COMPARATOR);
 taskOffsetSums = new TreeMap<>();
 taskLagTotals = new TreeMap<>();
 this.capacity = capacity;
 }
 
-private ClientState(final Set activeTasks,

Review comment:
   This is not used anywhere.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -725,8 +725,10 @@ private boolean assignTasksToClients(final Set 
allSourceTopics,

statefulTasks,

assignmentConfigs);
 
-log.info("Assigned tasks to clients as {}{}.",
-Utils.NL, 
clientStates.entrySet().stream().map(Map.Entry::toString).collect(Collectors.joining(Utils.NL)));
+log.info("Assigned tasks to clients as: {}{}.", Utils.NL,

Review comment:
   This is unnecessarily verbose, plus part of it is replaced by line 960 
below, so I trimmed a bit here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2020-07-08 Thread Sophie Blee-Goldman (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154113#comment-17154113
 ] 

Sophie Blee-Goldman commented on KAFKA-8940:


Failed again:
h3. Error Message

java.lang.AssertionError: verifying tagg fail: key=133 
tagg=[ConsumerRecord(topic = tagg, partition = 0, leaderEpoch = 0, offset = 2, 
CreateTime = 1594252802631, serialized key size = 3, serialized value size = 8, 
headers = RecordHeaders(headers = [], isReadOnly = false), key = 133, value = 
1)] expected=0 taggEvents: [ConsumerRecord(topic = tagg, partition = 0, 
leaderEpoch = 0, offset = 2, CreateTime = 1594252802631, serialized key size = 
3, serialized value size = 8, headers = RecordHeaders(headers = [], isReadOnly 
= false), key = 133, value = 1)] verifying suppressed min-suppressed verifying 
min-suppressed with 20 keys fail: resultCount=20 expectedCount=10 
result=[[5-1004@159425280/159433920], 
[0-999@159425280/159433920], [8-1007@159416640/159425280], 
[2-1001@159416640/159425280], [7-1006@159425280/159433920], 
[1-1000@159425280/159433920], [3-1002@159416640/159425280], 
[9-1008@159416640/159425280], [4-1003@159416640/159425280], 
[6-1005@159425280/159433920], [8-1007@159425280/159433920], 
[5-1004@159416640/159425280], [2-1001@159425280/159433920], 
[4-1003@159425280/159433920], [6-1005@159416640/159425280], 
[0-999@159416640/159425280], [7-1006@159416640/159425280], 
[1-1000@159416640/159425280], [9-1008@159425280/159433920], 
[3-1002@159425280/159433920]] expected=[7-1006, 3-1002, 0-999, 1-1000, 
4-1003, 2-1001, 5-1004, 8-1007, 6-1005, 9-1008] verifying suppressed 
sws-suppressed verifying min with 10 keys min fail: key=7-1006 actual=844 
expected=7

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-8940
> URL: https://issues.apache.org/jira/browse/KAFKA-8940
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: flaky-test
> Fix For: 2.5.0
>
>
> I lost the screen shot unfortunately... it reports the set of expected 
> records does not match the received records.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-08 Thread GitBox

ableegoldman commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r451922040



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -224,6 +224,9 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
 for (final StateStoreMetadata store : stores.values()) {
 if (store.changelogPartition == null) {
 log.info("State store {} is not logged and hence would not 
be restored", store.stateStore.name());
+} else if (!store.stateStore.persistent()) {
+log.info("Initializing to the starting offset for 
changelog {} of in-memory state store {}",
+ store.changelogPartition, 
store.stateStore.name());

Review comment:
   > Don't we need to call store.setOffset(null) or this case?
   
   Well, either it's a recycled task in which case no, we don't want to wipe 
out the existing offset, or it's a new task in which case it's initialized to 
`null` anyway.
   
   I'm also not sure what you mean in the second comment. Did you maybe paste 
the link to the wrong code? (just guessing since you linked to that same code 
earlier in John's PR)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-08 Thread GitBox

ableegoldman commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r451922421



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##
@@ -784,7 +784,7 @@ public void close() {
 }
 
 @Test
-public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws 
IOException {
+public void 
shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws 
IOException {
 final long checkpointOffset = 10L;
 
 final Map offsets = mkMap(

Review comment:
   Yeah, I think that's the point of the test: we wrote a checkpoint but it 
was missing the offset for one of the persistent stores, thus, we should throw 
a TaskCorruptedException  in `initializeStoreOffsetsFromCheckpoint`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >