[jira] [Commented] (KAFKA-10307) Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2020-07-24 Thread Boyang Chen (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17164760#comment-17164760
 ] 

Boyang Chen commented on KAFKA-10307:
-

[~vvcephei] Could you take a look? [~feyman] found this out in his PR to 
refactor the stream assignor logic: 

[https://github.com/apache/kafka/pull/8832#issuecomment-646943356]

 

> Topology cycles in 
> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> -
>
> Key: KAFKA-10307
> URL: https://issues.apache.org/jira/browse/KAFKA-10307
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Priority: Minor
>
> We have spotted a cycled topology for the foreign-key join test 
> *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug 
> in the algorithm or the test only. Used 
> [https://zz85.github.io/kafka-streams-viz/] to visualize:
> {code:java}
> Sub-topology: 0
> Source: KTABLE-SOURCE-19 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Source: KTABLE-SOURCE-32 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-30-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Source: KSTREAM-SOURCE-01 (topics: [table1])
>   --> KTABLE-SOURCE-02
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-OUTPUT-21
>   <-- KTABLE-SOURCE-19
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 (stores: 
> [INNER-store1])
>   --> KTABLE-FK-JOIN-OUTPUT-34
>   <-- KTABLE-SOURCE-32
> Processor: KTABLE-FK-JOIN-OUTPUT-21 (stores: [INNER-store1])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Processor: KTABLE-FK-JOIN-OUTPUT-34 (stores: [INNER-store2])
>   --> KTABLE-TOSTREAM-35
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Processor: KTABLE-SOURCE-02 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
>   <-- KSTREAM-SOURCE-01
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 (stores: 
> [])
>   --> KTABLE-SINK-11
>   <-- KTABLE-SOURCE-02
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 (stores: 
> [])
>   --> KTABLE-SINK-24
>   <-- KTABLE-FK-JOIN-OUTPUT-21
> Processor: KTABLE-TOSTREAM-35 (stores: [])
>   --> KSTREAM-SINK-36
>   <-- KTABLE-FK-JOIN-OUTPUT-34
> Sink: KSTREAM-SINK-36 (topic: output-)
>   <-- KTABLE-TOSTREAM-35
> Sink: KTABLE-SINK-11 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
> Sink: KTABLE-SINK-24 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23  Sub-topology: 1
> Source: KSTREAM-SOURCE-04 (topics: [table2])
>   --> KTABLE-SOURCE-05
> Source: KTABLE-SOURCE-12 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15
>   <-- KTABLE-SOURCE-12
> Processor: KTABLE-SOURCE-05 (stores: 
> [table2-STATE-STORE-03])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16
>   <-- KSTREAM-SOURCE-04
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 (stores: 
> [table2-STATE-STORE-03])
>   --> KTABLE-SINK-18
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
>   --> KTABLE-SINK-18
>   <-- KTABLE-SOURCE-05
> Sink: KTABLE-SINK-18 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15, 
> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16  Sub-topology: 2
> Source: KSTREAM-SOURCE-07 (topics: [table3])
>   --> KTABLE-SOURCE-08
> Sou

[jira] [Updated] (KAFKA-10307) Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2020-07-24 Thread Boyang Chen (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-10307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10307:

Component/s: streams

> Topology cycles in 
> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> -
>
> Key: KAFKA-10307
> URL: https://issues.apache.org/jira/browse/KAFKA-10307
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Priority: Major
>
> We have spotted a cycled topology for the foreign-key join test 
> *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug 
> in the algorithm or the test only. Used 
> [https://zz85.github.io/kafka-streams-viz/] to visualize:
> {code:java}
> Sub-topology: 0
> Source: KTABLE-SOURCE-19 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Source: KTABLE-SOURCE-32 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-30-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Source: KSTREAM-SOURCE-01 (topics: [table1])
>   --> KTABLE-SOURCE-02
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-OUTPUT-21
>   <-- KTABLE-SOURCE-19
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 (stores: 
> [INNER-store1])
>   --> KTABLE-FK-JOIN-OUTPUT-34
>   <-- KTABLE-SOURCE-32
> Processor: KTABLE-FK-JOIN-OUTPUT-21 (stores: [INNER-store1])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Processor: KTABLE-FK-JOIN-OUTPUT-34 (stores: [INNER-store2])
>   --> KTABLE-TOSTREAM-35
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Processor: KTABLE-SOURCE-02 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
>   <-- KSTREAM-SOURCE-01
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 (stores: 
> [])
>   --> KTABLE-SINK-11
>   <-- KTABLE-SOURCE-02
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 (stores: 
> [])
>   --> KTABLE-SINK-24
>   <-- KTABLE-FK-JOIN-OUTPUT-21
> Processor: KTABLE-TOSTREAM-35 (stores: [])
>   --> KSTREAM-SINK-36
>   <-- KTABLE-FK-JOIN-OUTPUT-34
> Sink: KSTREAM-SINK-36 (topic: output-)
>   <-- KTABLE-TOSTREAM-35
> Sink: KTABLE-SINK-11 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
> Sink: KTABLE-SINK-24 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23  Sub-topology: 1
> Source: KSTREAM-SOURCE-04 (topics: [table2])
>   --> KTABLE-SOURCE-05
> Source: KTABLE-SOURCE-12 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15
>   <-- KTABLE-SOURCE-12
> Processor: KTABLE-SOURCE-05 (stores: 
> [table2-STATE-STORE-03])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16
>   <-- KSTREAM-SOURCE-04
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 (stores: 
> [table2-STATE-STORE-03])
>   --> KTABLE-SINK-18
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
>   --> KTABLE-SINK-18
>   <-- KTABLE-SOURCE-05
> Sink: KTABLE-SINK-18 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15, 
> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16  Sub-topology: 2
> Source: KSTREAM-SOURCE-07 (topics: [table3])
>   --> KTABLE-SOURCE-08
> Source: KTABLE-SOURCE-25 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTIO

[jira] [Updated] (KAFKA-10307) Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2020-07-24 Thread Boyang Chen (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-10307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10307:

Affects Version/s: 2.6.0
   2.4.0
   2.5.0

> Topology cycles in 
> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> -
>
> Key: KAFKA-10307
> URL: https://issues.apache.org/jira/browse/KAFKA-10307
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Priority: Major
>
> We have spotted a cycled topology for the foreign-key join test 
> *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug 
> in the algorithm or the test only. Used 
> [https://zz85.github.io/kafka-streams-viz/] to visualize:
> {code:java}
> Sub-topology: 0
> Source: KTABLE-SOURCE-19 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Source: KTABLE-SOURCE-32 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-30-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Source: KSTREAM-SOURCE-01 (topics: [table1])
>   --> KTABLE-SOURCE-02
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-OUTPUT-21
>   <-- KTABLE-SOURCE-19
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 (stores: 
> [INNER-store1])
>   --> KTABLE-FK-JOIN-OUTPUT-34
>   <-- KTABLE-SOURCE-32
> Processor: KTABLE-FK-JOIN-OUTPUT-21 (stores: [INNER-store1])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Processor: KTABLE-FK-JOIN-OUTPUT-34 (stores: [INNER-store2])
>   --> KTABLE-TOSTREAM-35
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Processor: KTABLE-SOURCE-02 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
>   <-- KSTREAM-SOURCE-01
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 (stores: 
> [])
>   --> KTABLE-SINK-11
>   <-- KTABLE-SOURCE-02
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 (stores: 
> [])
>   --> KTABLE-SINK-24
>   <-- KTABLE-FK-JOIN-OUTPUT-21
> Processor: KTABLE-TOSTREAM-35 (stores: [])
>   --> KSTREAM-SINK-36
>   <-- KTABLE-FK-JOIN-OUTPUT-34
> Sink: KSTREAM-SINK-36 (topic: output-)
>   <-- KTABLE-TOSTREAM-35
> Sink: KTABLE-SINK-11 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
> Sink: KTABLE-SINK-24 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23  Sub-topology: 1
> Source: KSTREAM-SOURCE-04 (topics: [table2])
>   --> KTABLE-SOURCE-05
> Source: KTABLE-SOURCE-12 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15
>   <-- KTABLE-SOURCE-12
> Processor: KTABLE-SOURCE-05 (stores: 
> [table2-STATE-STORE-03])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16
>   <-- KSTREAM-SOURCE-04
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 (stores: 
> [table2-STATE-STORE-03])
>   --> KTABLE-SINK-18
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
>   --> KTABLE-SINK-18
>   <-- KTABLE-SOURCE-05
> Sink: KTABLE-SINK-18 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15, 
> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16  Sub-topology: 2
> Source: KSTREAM-SOURCE-07 (topics: [table3])
>   --> KTABLE-SOURCE-08
> Source: KTABLE-SOURCE-25 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic])
>  

[jira] [Updated] (KAFKA-10307) Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2020-07-24 Thread Boyang Chen (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-10307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10307:

Priority: Major  (was: Minor)

> Topology cycles in 
> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> -
>
> Key: KAFKA-10307
> URL: https://issues.apache.org/jira/browse/KAFKA-10307
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Priority: Major
>
> We have spotted a cycled topology for the foreign-key join test 
> *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug 
> in the algorithm or the test only. Used 
> [https://zz85.github.io/kafka-streams-viz/] to visualize:
> {code:java}
> Sub-topology: 0
> Source: KTABLE-SOURCE-19 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Source: KTABLE-SOURCE-32 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-30-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Source: KSTREAM-SOURCE-01 (topics: [table1])
>   --> KTABLE-SOURCE-02
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-OUTPUT-21
>   <-- KTABLE-SOURCE-19
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 (stores: 
> [INNER-store1])
>   --> KTABLE-FK-JOIN-OUTPUT-34
>   <-- KTABLE-SOURCE-32
> Processor: KTABLE-FK-JOIN-OUTPUT-21 (stores: [INNER-store1])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Processor: KTABLE-FK-JOIN-OUTPUT-34 (stores: [INNER-store2])
>   --> KTABLE-TOSTREAM-35
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Processor: KTABLE-SOURCE-02 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
>   <-- KSTREAM-SOURCE-01
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 (stores: 
> [])
>   --> KTABLE-SINK-11
>   <-- KTABLE-SOURCE-02
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 (stores: 
> [])
>   --> KTABLE-SINK-24
>   <-- KTABLE-FK-JOIN-OUTPUT-21
> Processor: KTABLE-TOSTREAM-35 (stores: [])
>   --> KSTREAM-SINK-36
>   <-- KTABLE-FK-JOIN-OUTPUT-34
> Sink: KSTREAM-SINK-36 (topic: output-)
>   <-- KTABLE-TOSTREAM-35
> Sink: KTABLE-SINK-11 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
> Sink: KTABLE-SINK-24 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23  Sub-topology: 1
> Source: KSTREAM-SOURCE-04 (topics: [table2])
>   --> KTABLE-SOURCE-05
> Source: KTABLE-SOURCE-12 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15
>   <-- KTABLE-SOURCE-12
> Processor: KTABLE-SOURCE-05 (stores: 
> [table2-STATE-STORE-03])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16
>   <-- KSTREAM-SOURCE-04
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 (stores: 
> [table2-STATE-STORE-03])
>   --> KTABLE-SINK-18
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
>   --> KTABLE-SINK-18
>   <-- KTABLE-SOURCE-05
> Sink: KTABLE-SINK-18 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15, 
> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16  Sub-topology: 2
> Source: KSTREAM-SOURCE-07 (topics: [table3])
>   --> KTABLE-SOURCE-08
> Source: KTABLE-SOURCE-25 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-27
> Processor: KTABLE-FK-JOIN-SUBSCRIPTI

[GitHub] [kafka] abbccdda commented on pull request #8907: MINOR: code cleanup for `VOut` inconsistent naming

2020-07-24 Thread GitBox

abbccdda commented on pull request #8907:
URL: https://github.com/apache/kafka/pull/8907#issuecomment-663817294


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-24 Thread GitBox

chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-663818439


   @junrao 
   
   I have rebased this PR to include fix of ```group_mode_transactions_test```. 
Could you run system tests again? Except for ```streams_eos_test``` and 
transaction tests, other tests work well on my local.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 edited a comment on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-24 Thread GitBox

chia7712 edited a comment on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-663818439


   @junrao 
   
   I have rebased this PR to include fix of ```group_mode_transactions_test```. 
Could you run system tests again? Except for ```streams_eos_test```, 
```streams.streams_eos_test``` and transaction tests, other tests work well on 
my local.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 edited a comment on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-24 Thread GitBox

chia7712 edited a comment on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-663818439


   @junrao 
   
   I have rebased this PR to include fix of ```group_mode_transactions_test```. 
Could you run system tests again? Except for ```streams_eos_test```, 
```streams_standby_replica_test``` and transaction tests, other tests work well 
on my local.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#issuecomment-663383364


   @vvcephei I updated the PR and needed to do quite some changes to get it 
into reasonable shape. Instead of throwing a new `RetryableException` that we 
catch in the outer layer, I just retry in each step directly. This make 
reasoning about the timeout easier.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r459890602



##
File path: 
streams/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java
##
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.internals;
-
-import org.apache.kafka.streams.StreamsConfig;
-
-import java.util.Map;
-
-/**
- * A {@link StreamsConfig} that does not log its configuration on construction.
- *
- * This producer cleaner output for unit tests using the {@code test-utils},
- * since logging the config is not really valuable in this context.
- */
-public class QuietStreamsConfig extends StreamsConfig {

Review comment:
   Moved this class into `ClientUtils.java`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r459891762



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;

Review comment:
   Some side improvement: if we `seek` base on the checkpoint, there is no 
reason to call `position()` because we know what our offset is. -- Only if we 
`seekToBeginning()` we need to get the current offset from the consumer itself 
(instead of `position()` we could also call `beginningOffsets` but `position` 
it the easer to use API.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r459892544



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(globalConsumer.position(topicPartition)),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
+offset = position.get();
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
+long deadlineMs = NO_DEADLINE;
 while (offset < highWatermark) {
 try {
 final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+if (records.isEmpty()) {
+if (taskTimeoutMs == 0L) {
+deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
   For `poll()`, even if retrying is disabled, we need to retry to give a 
fetch request (with default request timeout of 30 seconds but only a default 
`pollTime` of 100ms) a fair change to actually return.
   
   At least I believe this makes sense. Also \cc @guozhangwang 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r459892977



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
restoreRecords.add(recordConverter.convert(record));
 }
 }
-offset = globalConsumer.position(topicPartition);
+try {
+offset = globalConsumer.position(topicPartition);
+} catch (final TimeoutException error) {
+// the `globalConsumer.position()` call should never 
block, because we know that we did
+// a successful `position()` call above for the 
requested partition and thus the consumer
+// should have a valid local position that it can 
return immediately

Review comment:
   We don't need to call `retryUntilSuccessOrThrowOnTaskTimeout` now, so we 
actually get cleaner code :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx commented on pull request #9071: KAFKA-10305: Print usage when parsing fails for ConsumerPerformance

2020-07-24 Thread GitBox

huxihx commented on pull request #9071:
URL: https://github.com/apache/kafka/pull/9071#issuecomment-663404582


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-07-24 Thread Jerry Wei (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17164290#comment-17164290
 ] 

Jerry Wei commented on KAFKA-10134:
---

[~guozhang] I'm using logback and PR: https://github.com/apache/kafka/pull/9011 
included as well.  yeah, just to restart consumer w/o broker change, both 
brokers &consumers were running on my local laptop, not found any other issue.

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
> Attachments: consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac opened a new pull request #9072: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)

2020-07-24 Thread GitBox

dajac opened a new pull request #9072:
URL: https://github.com/apache/kafka/pull/9072


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #8977: KAFKA-10162; Quota Enhancements (KIP-599)

2020-07-24 Thread GitBox

dajac commented on pull request #8977:
URL: https://github.com/apache/kafka/pull/8977#issuecomment-663462152


   I have opened a new proper PR: https://github.com/apache/kafka/pull/9072



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6764) ConsoleConsumer behavior inconsistent when specifying --partition with --from-beginning

2020-07-24 Thread Pardhu Madipalli (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-6764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17164370#comment-17164370
 ] 

Pardhu Madipalli commented on KAFKA-6764:
-

I am observing this behavior with kafka broker with SSL enabled even with a new 
consumer group also. I produced a few messages. Then I created a new consumer 
with a new group. Then *kafka-console-consumer --from-beginning* is not 
displaying any messages. But when I tried with *--partition 0* I could see it 
reading the messages.

Then I killed my broker with ssl and created a new broker without ssl. This 
time, even if I did not specify *--partition 0*, I can see all the messages as 
expected.

In both the cases only a single broker is running. Topic partitions and 
replication factor are both 1.

> ConsoleConsumer behavior inconsistent when specifying --partition with 
> --from-beginning 
> 
>
> Key: KAFKA-6764
> URL: https://issues.apache.org/jira/browse/KAFKA-6764
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Larry McQueary
>Assignee: Larry McQueary
>Priority: Minor
>  Labels: newbie
>
> Per its usage statement, {{kafka-console-consumer.sh}} ignores 
> {{\-\-from-beginning}} when the specified consumer group has committed 
> offsets, and sets {{auto.offset.reset}} to {{latest}}. However, if 
> {{\-\-partition}} is also specified, {{\-\-from-beginning}} is observed in 
> all cases, whether there are committed offsets or not.
> This happens because when {{\-\-from-beginning}} is specified, {{offsetArg}} 
> is set to {{OffsetRequest.EarliestTime}}. However, {{offsetArg}} is [only 
> passed to the 
> constructor|https://github.com/apache/kafka/blob/fedac0cea74fce529ee1c0cefd6af53ecbdd/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L76-L79]
>  for {{NewShinyConsumer}} when {{\-\-partition}} is also specified. Hence, it 
> is honored in this case and not the other.
> This case should either be handled consistently, or the usage statement 
> should be modified to indicate the actual behavior/usage. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9924) Add RocksDB Memory Consumption to RocksDB Metrics

2020-07-24 Thread Bruno Cadonna (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-9924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17164402#comment-17164402
 ] 

Bruno Cadonna commented on KAFKA-9924:
--

The scope of this ticket was extended to not only monitor memory usage but also 
to expose RocksDB properties in Kafka Streams metrics. See 

https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Report+Properties+of+RocksDB


> Add RocksDB Memory Consumption to RocksDB Metrics 
> --
>
> Key: KAFKA-9924
> URL: https://issues.apache.org/jira/browse/KAFKA-9924
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: needs-kip
>
> RocksDB's memory consumption should be added to the RocksDB metrics.
> RocksDB's memory consumption can be retrieved with the following class:
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/MemoryUtil.java
> The memory consumption metrics should be added on client level and should be 
> recorded on INFO level.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-24 Thread GitBox

cadonna commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663512173


   I started a run of Streams system tests:
   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4077/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-24 Thread GitBox

cadonna commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663512735


   @mjsax I thought this fix is most urgently needed on 2.6 to unblock the 
release. I will open a new PR for trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-24 Thread GitBox

cadonna commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663512974


   @ableegoldman I agree it should be a blocker.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna edited a comment on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-24 Thread GitBox

cadonna edited a comment on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663513384


   @ableegoldman I agree that having the extra protection you propose makes 
sense. I will open a follow-up PR on trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-24 Thread GitBox

cadonna commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663513384


   @ableegoldman I agree that having the extra protection you propose makes 
sense. I will open a follow-up PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-24 Thread GitBox

ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663532890


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-24 Thread GitBox

ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663534471


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-24 Thread GitBox

ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663534611


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-24 Thread GitBox

ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663537187


   One PR build was started here:
   
   https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3515/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-24 Thread GitBox

ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663537509


   Looks like the last message somehow made Jenkins start working again.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-24 Thread GitBox

ijuma edited a comment on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663537509


   Looks like the last message somehow made the Jenkins status to be updated in 
the PR again.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-07-24 Thread GitBox

dajac commented on a change in pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#discussion_r460053049



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
 // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
 log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-int remainingRetries = retries;
+long currentWallClockMs = time.milliseconds();
+final long deadlineMs = currentWallClockMs + retryTimeoutMs;

Review comment:
   Thinking a bit more about this, with the default, you may end up not 
honouring the deadline. `createTopics` can take up to 1m so if you invoke one 
when less than 1m is reaming before the deadline, you may not honour the 
deadline. It may not be that important though.
   
   If we want to strictly enforce it, we could calculate the maximum timeout 
for each call, something like deadline - now, and set it with 
`CreateTopicsOptions.timeoutMs`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work

2020-07-24 Thread GitBox

ijuma commented on pull request #9051:
URL: https://github.com/apache/kafka/pull/9051#issuecomment-663547862


   @huxihx Note that we usually include the reviewer(s) in the commit message. 
For example:
   
   
https://github.com/apache/kafka/commit/0d5c967073e78e2a50ffa5bb860bff9ff43086d5



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-24 Thread GitBox

vvcephei commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663587201







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-24 Thread GitBox

vvcephei commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663587337


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-24 Thread GitBox

junrao commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-663597341


   3 system test failures with trunk. 
http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2020-07-23--001.1595551051--apache--trunk--0b181fdde/report.html



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9067: MINOR: Streams integration tests should not call exit

2020-07-24 Thread GitBox

vvcephei commented on pull request #9067:
URL: https://github.com/apache/kafka/pull/9067#issuecomment-663599523


   Hey @mjsax ,
   
   I double-checked, and it does seem to be the right fix. I confirmed that no 
matter whether you run the test with IDEA or gradle, there's nothing to 
automatically set the exit procedure, so `Exit.exit` is equivalent to 
`System.exit` unless a test specifically overrides the exit procedure, as I 
have done here.
   
   Nevertheless, I see no reason not to also take your advice and just replace 
all calls to `System.exit` with `Exit.exit`. Since `Exit` is provided as a 
utility in the main Client module, it's available in every context in which 
Streams is available. Note, this by itself won't fix anything. If a test calls 
`Exit.exit`, it will fail with the reported error unless it overrides the exit 
procedure.
   
   I did my best to trace all call paths that end in `exit()`, and still 
`SmokeTestDriverIntegrationTest` is the only one that I found.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-24 Thread GitBox

chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-663618045


   @junrao thanks for the reports.
   
   Unfortunately, the failed tests are totally different in both results :(
   
   This PR has been rebased so the fix for ```group_mode_transactions_test``` 
is included.
   
   ```streams_standby_replica_test``` will get fixed by #9066.
   
   I will test the others on my local later.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9067: MINOR: Streams integration tests should not call exit

2020-07-24 Thread GitBox

vvcephei commented on pull request #9067:
URL: https://github.com/apache/kafka/pull/9067#issuecomment-663631102


   Hey @mjsax ,
   
   Since Streams accounted for almost all of the `System.exit` calls, and since 
`Exit.exit` is available almost everywhere in AK, I'm proposing to add a 
checkstyle rule forbidding it. The violation will look like this:
   
   ```
   [ant:checkstyle] [ERROR] 
/home/confluent/kafka/clients/src/main/java/org/apache/kafka/some/BadClass.java:43:
 Line matches the illegal pattern ''System.exit': Should not directly call 
System.exit, but Exit.exit instead.'. [Regexp]
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-07-24 Thread Sophie Blee-Goldman (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17164531#comment-17164531
 ] 

Sophie Blee-Goldman commented on KAFKA-10284:
-

Nope, it just spun in a loop where it would poll and then call 
Consumer#position to try and initialize some metadata. Poll just returned empty 
I guess and Consumer#position continued to throw TimeoutException. 

The TimeoutException part seems pretty weird, so maybe it was an unrelated bug 
(or some other issue like a hung socket being the primary/only guess we had). I 
just thought of it because it definitely happened immediately after a "id does 
not match expected member.id" error.

I actually still have some of the broker and client side logs from around this 
incident, if that might help. But again, I'm not really sure if it could be 
related to this or not

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.6.1
>
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei opened a new pull request #9073: MINOR: add task ':streams:testAll'

2020-07-24 Thread GitBox

vvcephei opened a new pull request #9073:
URL: https://github.com/apache/kafka/pull/9073


   Adds a new task to Streams to run all the tests for all sub-projects.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10306) GlobalThread might loop forever

2020-07-24 Thread Matthias J. Sax (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-10306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-10306:
---

Assignee: Matthias J. Sax

> GlobalThread might loop forever
> ---
>
> Key: KAFKA-10306
> URL: https://issues.apache.org/jira/browse/KAFKA-10306
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 2.6.0
>
>
> We did some code refactoring in the `GlobalStateMangerImpl` that can lead to 
> an infinite loop if the global consumer throws an `InvalidOffsetException`. 
> This is a regression bug and thus I marked this ticket as blocker fro 2.6 
> release.
> The issue seem to occur only during the initial bootstrapping of the global 
> store, but not during regular processing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10306) GlobalThread might loop forever

2020-07-24 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-10306:
---

 Summary: GlobalThread might loop forever
 Key: KAFKA-10306
 URL: https://issues.apache.org/jira/browse/KAFKA-10306
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.0
Reporter: Matthias J. Sax
 Fix For: 2.6.0


We did some code refactoring in the `GlobalStateMangerImpl` that can lead to an 
infinite loop if the global consumer throws an `InvalidOffsetException`. This 
is a regression bug and thus I marked this ticket as blocker fro 2.6 release.

The issue seem to occur only during the initial bootstrapping of the global 
store, but not during regular processing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8359) Reconsider default for leader imbalance percentage

2020-07-24 Thread Romain Hardouin (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-8359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17164538#comment-17164538
 ] 

Romain Hardouin commented on KAFKA-8359:


{quote}[...] there is no major downside to set leader imbalance percentage to 0.
{quote}
Does it mean there are _minor_ downsides?

Following the principle of least astonishment, the default value should be 0 
because users expect to have balanced clusters when using 
{{auto.leader.rebalance.enable=true}} 

 

> Reconsider default for leader imbalance percentage
> --
>
> Key: KAFKA-8359
> URL: https://issues.apache.org/jira/browse/KAFKA-8359
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dhruvil Shah
>Priority: Major
>
> By default, the leader imbalance ratio is 10%. This means that the controller 
> won't trigger preferred leader election for a broker unless the ratio of the 
> number of partitions a broker is the current leader of and the number of 
> partitions it is the preferred leader of is off by more than 10%. The problem 
> is when a broker is catching up after a restart, the smallest topics tend to 
> catch up first and the largest ones later, so the 10% remaining difference 
> may not be proportional to the broker's load. To keep better balance in the 
> cluster, we should consider setting 
> `leader.imbalance.per.broker.percentage=0` by default so that the preferred 
> leaders are always elected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-24 Thread GitBox

ableegoldman commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r460194062



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -243,18 +242,24 @@ public void handleAssignment(final Map> activeTasks,
 
 for (final Task task : tasksToClose) {
 try {
-if (task.isActive()) {
-// Active tasks are revoked and suspended/committed during 
#handleRevocation
-if (!task.state().equals(State.SUSPENDED)) {
-log.error("Active task {} should be suspended prior to 
attempting to close but was in {}",
-  task.id(), task.state());
-throw new IllegalStateException("Active task " + 
task.id() + " should have been suspended");
-}
-} else {
-task.suspend();
-task.prepareCommit();
-task.postCommit();
+// Always try to first suspend and commit the task before 
closing it;
+// some tasks may already be suspended which should be a no-op.
+//
+// Also since active tasks should already be suspended / 
committed and
+// standby tasks should have no offsets to commit, we should 
expect nothing to commit
+task.suspend();
+
+final Map offsets = 
task.prepareCommit();
+
+if (!offsets.isEmpty()) {
+log.error("Task {} should has been committed prior to 
attempting to close, but it reports non-empty offsets {} to commit",

Review comment:
   I thought we would catch and save the first exception thrown by a 
rebalance listener callback, and then rethrow after all rebalance callbacks 
have been invoked? In this case that would mean `handleAssignment` would still 
get called, and then we would throw an IllegalStateException and bail on the 
rest of `handleAssignment` for no reason.
   
   The IllegalStateException itself is not the problem, since only the first 
exception (the TaskMigrated) would ultimately be thrown up to `poll`. But we 
should still go through the rest of `handleAssignment` in order to properly 
clean up the active tasks and manage the standbys (since we don't need to close 
standbys in case of TaskMigrated)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9073: MINOR: add task ':streams:testAll'

2020-07-24 Thread GitBox

abbccdda commented on pull request #9073:
URL: https://github.com/apache/kafka/pull/9073#issuecomment-663652859


   One more thought, it might be useful to add this to the repo `README` if a 
user is only doing stream side changes and want to test it out on local 
@vvcephei.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] heritamas commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-07-24 Thread GitBox

heritamas commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r460197007



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##
@@ -132,7 +132,7 @@ public String version() {
 return listConsumerGroupOffsets(group).entrySet().stream()
 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
-.filter(x -> x.downstreamOffset() > 0)  // ignore offsets we 
cannot translate accurately
+.filter(x -> x.downstreamOffset() >= 0)  // ignore offsets we 
cannot translate accurately

Review comment:
   Yes, does not hurt to leave it. Just for sure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-24 Thread GitBox

ableegoldman commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r460211291



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
 static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;

Review comment:
   It would be awesome if we could do it based on the size of the pending 
data and the configured memtable size. Not sure if that's really feasible, just 
throwing it out there





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-24 Thread GitBox

ableegoldman commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r460212084



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -454,6 +456,41 @@ public void flush() {
 }
 }
 
+public void flushCache() {
+RuntimeException firstException = null;
+// attempting to flush the stores
+if (!stores.isEmpty()) {
+log.debug("Flushing all store caches registered in the state 
manager: {}", stores);
+for (final StateStoreMetadata metadata : stores.values()) {
+final StateStore store = metadata.stateStore;
+
+try {
+// buffer should be flushed to send all records to 
changelog
+if (store instanceof TimeOrderedKeyValueBuffer) {

Review comment:
   WDYT about adding a generic `FlushingRequiredStore` marker interface 
that all caching state stores and the suppression buffer would both implement. 
It seems weird to handle them separately.
   
   We could even make this public and allow user custom state stores to 
implement this, but that might be opening a can of worms we will greatly regret 
😉 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-24 Thread GitBox

ableegoldman commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r460212719



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##
@@ -49,6 +61,31 @@
 this.stateDirectory = stateDirectory;
 }
 
+protected void initializeCheckpoint() {
+// we will delete the local checkpoint file after registering the 
state stores and loading them into the
+// state manager, therefore we should initialize the snapshot as empty 
to indicate over-write checkpoint needed
+offsetSnapshotSinceLastFlush = Collections.emptyMap();
+}
+
+/**
+ * The following exceptions maybe thrown from the state manager flushing 
call
+ *
+ * @throws TaskMigratedException recoverable error sending changelog 
records that would cause the task to be removed
+ * @throws StreamsException fatal error when flushing the state store, for 
example sending changelog records failed
+ *  or flushing state store get IO errors; such 
error should cause the thread to die
+ */
+protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
+final Map offsetSnapshot = 
stateMgr.changelogOffsets();
+if (StateManagerUtil.checkpointNeeded(enforceCheckpoint, 
offsetSnapshotSinceLastFlush, offsetSnapshot)) {
+// since there's no written offsets we can checkpoint with empty 
map,
+// and the state's current offset would be used to checkpoint
+stateMgr.flush();
+stateMgr.checkpoint(Collections.emptyMap());

Review comment:
   But `ProcessorStateManager` doesn't handle global tasks 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-24 Thread GitBox

ableegoldman commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r460214147



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##
@@ -93,8 +93,8 @@ public boolean isActive() {
 public void initializeIfNeeded() {
 if (state() == State.CREATED) {
 StateManagerUtil.registerStateStores(log, logPrefix, topology, 
stateMgr, stateDirectory, processorContext);
-
 // initialize the snapshot with the current offsets as we don't 
need to commit then until they change

Review comment:
   Sounds good. I think the most important thing is to not have the comment 
that says `initialize the snapshot with the current offsets...` right above the 
line that initializes the snapshot as empty 🙁 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10260) Streams could recover stores in a task independently

2020-07-24 Thread Matthias J. Sax (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17164585#comment-17164585
 ] 

Matthias J. Sax commented on KAFKA-10260:
-

Note: this issue applies for global tasks, too. Cf 
https://issues.apache.org/jira/browse/KAFKA-10306

> Streams could recover stores in a task independently
> 
>
> Key: KAFKA-10260
> URL: https://issues.apache.org/jira/browse/KAFKA-10260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> Currently, ProcessorStateManager checks for corrupted tasks by checking, for 
> each persistent store, if its checkpoint is missing, then the task directory 
> must also be empty.
> This is a little overzealous, since we aren't checking whether the store's 
> specific directory is nonempty, only if there are any directories for any 
> stores. So if there are two stores in a task, and one is correctly written 
> and checkpointed, while the other is neither written nor checkpointed, we 
> _could_ correctly load the first and recover the second but instead we'll 
> consider the whole task corrupted and discard the first and recover both.
> The fix would be to check, for each persistent store that doesn't have a 
> checkpoint, that its _specific_ store directory is also missing. Such a store 
> will be restored from the changelog and we don't need to consider the task 
> corrupted.
> See ProcessorStateManager#initializeStoreOffsetsFromCheckpoint



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-24 Thread GitBox

ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663687037


   One build passed, one failed due to environmental reasons, one had a single 
flaky failure:
   
   
`org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1`
   
   I ran the full test suite locally and it passed as well. Merging to trunk 
and 2.6. cc @rhauch 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-24 Thread GitBox

ijuma merged pull request #9065:
URL: https://github.com/apache/kafka/pull/9065


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on pull request #9067: MINOR: Streams integration tests should not call exit

2020-07-24 Thread GitBox

lbradstreet commented on pull request #9067:
URL: https://github.com/apache/kafka/pull/9067#issuecomment-663689440


   > Hey @mjsax ,
   > 
   > I double-checked, and it does seem to be the right fix. I confirmed that 
no matter whether you run the test with IDEA or gradle, there's nothing to 
automatically set the exit procedure, so `Exit.exit` is equivalent to 
`System.exit` unless a test specifically overrides the exit procedure, as I 
have done here.
   > 
   > Nevertheless, I see no reason not to also take your advice and just 
replace all calls to `System.exit` with `Exit.exit`. Since `Exit` is provided 
as a utility in the main Client module, it's available in every context in 
which Streams is available. Note, this by itself won't fix anything. If a test 
calls `Exit.exit`, it will fail with the reported error unless it overrides the 
exit procedure.
   > 
   > I did my best to trace all call paths that end in `exit()`, and still 
`SmokeTestDriverIntegrationTest` is the only one that I found.
   
   Right, you'll need ```Exit.setExitProcedure((_, _) => doSomething())``` to 
be setup somewhere to override the Exit.exit behavior, otherwise it will still 
do the same thing as it did before.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

abbccdda commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460165601



##
File path: docs/streams/developer-guide/config-streams.html
##
@@ -326,13 +321,18 @@ bootstrap.serversstate.cleanup.delay.ms
 Low
 The amount of time in milliseconds to wait before 
deleting state when a partition has migrated.
-60 milliseconds
+60 milliseconds (10 minutes)
   
   state.dir
 High
 Directory location for state stores.
 /tmp/kafka-streams
   
+  task.timeout.ms
+Medium

Review comment:
   nit: for a doc clean-up, it is helpful to include a screenshot of 
updated paragraph. The changes starting at L331 should be suffice.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -318,6 +341,72 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 }
 }
 
+private void retryUntilSuccessOrThrowOnTaskTimeout(final Runnable runnable,
+   final String 
errorMessage) {
+long deadlineMs = NO_DEADLINE;
+
+do {
+try {
+runnable.run();
+return;
+} catch (final TimeoutException retryableException) {

Review comment:
   `retriableException` to be consistent with the defined exception type in 
AK.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();

Review comment:
   For the above call, was curious why we couldn't seek for all the topic 
partitions that are missing positions here, instead of doing one by one look-up?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java
##
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.internals;
-
-import org.apache.kafka.streams.StreamsConfig;
-
-import java.util.Map;
-
-/**
- * A {@link StreamsConfig} that does not log its configuration on construction.
- *
- * This producer cleaner output for unit tests using the {@code test-utils},
- * since logging the config is not really valuable in this context.
- */
-public class QuietStreamsConfig extends StreamsConfig {

Review comment:
   What's the purpose for this move?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(glob

[GitHub] [kafka] ableegoldman commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-24 Thread GitBox

ableegoldman commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663701977


   @vvcephei I don't think there should be any issue there, since the 
checkpointed  `OFFSET_UNKNOWN` is just ignored on read (or should be) and will 
never make it into the subscription info.
   
   That said, it does make me a bit nervous and I wouldn't count on this always 
being true, so I'd rather choose a different sentinel to be safe. 
   
   Maybe we can put together a brief list of all sentinel values used in 
Streams (and the clients) to keep track and avoid issues like this going 
forward. I had to spend a long time digging around the client code to figure 
out whether an offset of -1 was ever possible in non-error cases (the answer 
seems to be "yes"). We'd have to actually keep it up to date of course, which 
might be...challenging



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-24 Thread GitBox

cadonna commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663702921


   @vvcephei Thanks for checking! Given your finding, I will change 
`UNKNOWN_OFFSET` to `-4L`. Actually, we do not to change `UNKNOWN_OFFSET` at 
all, because we now never write it into the offset sums. I just thought it 
would be better to change it to not end up in a similar situation after a 
future refactoring. I will follow the same reasoning in this case.   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-24 Thread GitBox

vvcephei commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663710405







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-24 Thread GitBox

vvcephei commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663710503


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #9074: KAFKA-10287: PROPOSAL: safe offset tracking

2020-07-24 Thread GitBox

vvcephei opened a new pull request #9074:
URL: https://github.com/apache/kafka/pull/9074


   This PR should be targeted to trunk, but it's not a trivial task. I just 
wanted to propose this approach.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9074: KAFKA-10287: PROPOSAL: safe offset tracking

2020-07-24 Thread GitBox

vvcephei commented on pull request #9074:
URL: https://github.com/apache/kafka/pull/9074#issuecomment-663719873


   Hey @cadonna , I was reflecting on your fix for KAFKA-10287, and it seems 
like dealing with those sentinel values is quite risky. We've had several bugs 
in the past, I don't feel certain that we don't have bugs today, and any 
refactoring could easily introduce bugs in the future.
   
   What do you think about taking a structural approach to offset sentinel 
management, like in this proposal?
   
   This wouldn't be for 2.6, but it's targeted at that branch because I built 
upon your PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9073: MINOR: add task ':streams:testAll'

2020-07-24 Thread GitBox

vvcephei commented on pull request #9073:
URL: https://github.com/apache/kafka/pull/9073#issuecomment-663721003


   Thanks @abbccdda ! There was only one flaky test, and the last change was 
only to the README, so I'll merge now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9073: MINOR: add task ':streams:testAll'

2020-07-24 Thread GitBox

vvcephei merged pull request #9073:
URL: https://github.com/apache/kafka/pull/9073


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9070: MINOR: speed up release script

2020-07-24 Thread GitBox

vvcephei merged pull request #9070:
URL: https://github.com/apache/kafka/pull/9070


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9052: MINOR: TopologyTestDriver should not require dummy parameters

2020-07-24 Thread GitBox

vvcephei commented on pull request #9052:
URL: https://github.com/apache/kafka/pull/9052#issuecomment-663728382


   Thanks @mjsax . Your suggestions sound good to me. I'll circle back on this 
PR in a bit.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9073: MINOR: add task ':streams:testAll'

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9073:
URL: https://github.com/apache/kafka/pull/9073#discussion_r460282456



##
File path: build.gradle
##
@@ -1266,6 +1266,27 @@ project(':streams') {
 if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
 standardOutput = new File(generatedDocsDir, 
"streams_config.html").newOutputStream()
   }
+
+  task testAll(
+dependsOn: [
+':streams:test',
+':streams:test-utils:test',
+':streams:streams-scala:test',
+':streams:upgrade-system-tests-0100:test',
+':streams:upgrade-system-tests-0101:test',
+':streams:upgrade-system-tests-0102:test',
+':streams:upgrade-system-tests-0110:test',
+':streams:upgrade-system-tests-10:test',
+':streams:upgrade-system-tests-11:test',
+':streams:upgrade-system-tests-20:test',
+':streams:upgrade-system-tests-21:test',
+':streams:upgrade-system-tests-22:test',
+':streams:upgrade-system-tests-23:test',
+':streams:upgrade-system-tests-24:test',
+':streams:upgrade-system-tests-25:test',

Review comment:
   I don't think we have actual unit/integration tests in those packages? 
Also seems a pain to keep it updates (and I am sure we will forget to update 
it). Should we just skip those?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9073: MINOR: add task ':streams:testAll'

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9073:
URL: https://github.com/apache/kafka/pull/9073#discussion_r460282610



##
File path: build.gradle
##
@@ -1266,6 +1266,27 @@ project(':streams') {
 if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
 standardOutput = new File(generatedDocsDir, 
"streams_config.html").newOutputStream()
   }
+
+  task testAll(
+dependsOn: [
+':streams:test',
+':streams:test-utils:test',
+':streams:streams-scala:test',
+':streams:upgrade-system-tests-0100:test',
+':streams:upgrade-system-tests-0101:test',
+':streams:upgrade-system-tests-0102:test',
+':streams:upgrade-system-tests-0110:test',
+':streams:upgrade-system-tests-10:test',
+':streams:upgrade-system-tests-11:test',
+':streams:upgrade-system-tests-20:test',
+':streams:upgrade-system-tests-21:test',
+':streams:upgrade-system-tests-22:test',
+':streams:upgrade-system-tests-23:test',
+':streams:upgrade-system-tests-24:test',
+':streams:upgrade-system-tests-25:test',

Review comment:
   Oh. Already merged. Never mind.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9067: MINOR: Streams integration tests should not call exit

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9067:
URL: https://github.com/apache/kafka/pull/9067#discussion_r460282952



##
File path: checkstyle/checkstyle.xml
##
@@ -103,6 +103,13 @@
   
 
 
+
+  
+  
+  
+  

Review comment:
   Nice one!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException

2020-07-24 Thread GitBox

mjsax opened a new pull request #9075:
URL: https://github.com/apache/kafka/pull/9075


   Fix for `2.6` blocker bug.
   
   Call for review @guozhangwang @vvcephei (\cc @rhauch)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9075:
URL: https://github.com/apache/kafka/pull/9075#discussion_r460300664



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -291,27 +290,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 long restoreCount = 0L;
 
 while (offset < highWatermark) {
-try {
-final ConsumerRecords records = 
globalConsumer.poll(pollTime);
-final List> restoreRecords 
= new ArrayList<>();
-for (final ConsumerRecord record : 
records.records(topicPartition)) {
-if (record.key() != null) {
-
restoreRecords.add(recordConverter.convert(record));
-}
+final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+final List> restoreRecords = 
new ArrayList<>();
+for (final ConsumerRecord record : 
records.records(topicPartition)) {
+if (record.key() != null) {
+restoreRecords.add(recordConverter.convert(record));
 }
-offset = globalConsumer.position(topicPartition);
-stateRestoreAdapter.restoreBatch(restoreRecords);
-stateRestoreListener.onBatchRestored(topicPartition, 
storeName, offset, restoreRecords.size());
-restoreCount += restoreRecords.size();
-} catch (final InvalidOffsetException recoverableException) {

Review comment:
   This is the actual bug: we swallow the exception. However, because we 
don't do any "seek", we just hit the same exception in `poll()` over and over 
and never recover but loop forever.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9067: MINOR: Streams integration tests should not call exit

2020-07-24 Thread GitBox

vvcephei commented on pull request #9067:
URL: https://github.com/apache/kafka/pull/9067#issuecomment-663746926







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9075:
URL: https://github.com/apache/kafka/pull/9075#discussion_r460302228



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java
##
@@ -31,7 +31,7 @@
 
 void flushState();
 
-void close() throws IOException;
+void close(final boolean wipeStateStore) throws IOException;

Review comment:
   We could also not wipe out the store and let the user do it manually. A 
manual cleanup is actually required nowadays, thus, it's actually a small side 
"improvement".
   
   Note that we wipe out the whole global task dir here -- in contrast, users 
could do a manual per-store wipe out... But as we do the same coarse grained 
wipe out for all tasks and we have already a ticket for "per store cleanup" I 
though it would be ok for now.
   
   Let me know what you think.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9075:
URL: https://github.com/apache/kafka/pull/9075#discussion_r460302576



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
##
@@ -234,24 +234,18 @@ void initialize() {
 }
 
 void pollAndUpdate() {
-try {
-final ConsumerRecords received = 
globalConsumer.poll(pollTime);
-for (final ConsumerRecord record : received) {
-stateMaintainer.update(record);
-}
-final long now = time.milliseconds();
-if (now >= lastFlush + flushInterval) {
-stateMaintainer.flushState();
-lastFlush = now;
-}
-} catch (final InvalidOffsetException recoverableException) {

Review comment:
   We just let the original exception bubble up, to be able to wipe out the 
store. -- This is also just a side "improvement"; we could also just die and 
let users cleanup the state directory manually. However, it seems better to 
wipe it out directly.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9075:
URL: https://github.com/apache/kafka/pull/9075#discussion_r460303342



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -323,21 +322,6 @@ public void shouldRestoreRecordsUpToHighwatermark() {
 assertEquals(2, stateRestoreCallback.restored.size());
 }
 
-@Test
-public void shouldRecoverFromInvalidOffsetExceptionAndRestoreRecords() {

Review comment:
   This test was broken: it throw the `InvalidOffsetException` only once 
and thus the second `poll()` succeeds -- however, this is not how a real 
consumer behalves. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9075:
URL: https://github.com/apache/kafka/pull/9075#discussion_r460303491



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##
@@ -128,10 +130,9 @@ public void 
shouldThrowStreamsExceptionOnStartupIfThereIsAStreamsException() {
 assertFalse(globalStreamThread.stillRunning());
 }
 
-@SuppressWarnings("unchecked")

Review comment:
   Just a little side cleanup :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460287991



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(globalConsumer.position(topicPartition)),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
+offset = position.get();
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
+long deadlineMs = NO_DEADLINE;
 while (offset < highWatermark) {
 try {
 final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+if (records.isEmpty()) {
+if (taskTimeoutMs == 0L) {
+deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
   Man, this is confusing. I think I see what you're getting at, but it 
seems pretty strange to have to go to all that trouble to extract the consumer 
config so that we can apply a shorter timeout on each poll, but then loop 
around until the originally configured client timeout passes.
   
   Can you explain how the outcome is different than just calling 
`globalConsumer.poll(requestTimeoutMs)`?
   
   It also seems strange to extract a consumer timeout configuration that 
specifically does not apply to `poll` and apply it to `poll`. This seems like 
it would violate users' expectations when they set that configuration value.
   
   Why wouldn't we instead apply the non-progress timeout (`taskTimeoutMs`), 
since it seems like that's exactly what it's for? I.e., it seems like 
`globalConsumer.poll(pollTime + taskTimeoutMs)` might be the most appropriate 
choice here?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -185,32 +200,16 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
 
 log.info("Restoring state for global store {}", store.name());
 final List topicPartitions = 
topicPartitionsForStore(store);
-Map highWatermarks = null;
 
-int attempts = 0;
-while (highWatermarks == null) {
-try {
-highWatermarks = globalConsumer.endOffsets(topicPartitions);
-} catch (final TimeoutException retryableException) {
-if (++attempts > retries) {
-log.error("Failed to get end offsets for topic partitions 
of global store {} after {} retry attempts. " +
-"You can increase the number of retries via 
configuration parameter `retries`.",
-store.name(),
-retries,
-retryableException);
-throw new StreamsException(String.format("Failed to get 
end offsets for topic partitions of global store %s after %d retry attempts. " +
-"You can increase the number of retries via 
configuration parameter `retries`.", store.name(), retries),
-retryableException);
-}
-log.debug("Failed to get end offsets for partitions {}, 
backing off for {} ms to retry (attempt {} of {})",
-topicPartitions,
-retryBackoffMs,
-attempts,
-retries,
-retryableException);
-Utils.sleep(retryBackoffMs);
-}
-}
+final Map highWatermarks = new HashMap<>();
+re

[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460304324



##
File path: 
streams/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java
##
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.internals;
-
-import org.apache.kafka.streams.StreamsConfig;
-
-import java.util.Map;
-
-/**
- * A {@link StreamsConfig} that does not log its configuration on construction.
- *
- * This producer cleaner output for unit tests using the {@code test-utils},
- * since logging the config is not really valuable in this context.
- */
-public class QuietStreamsConfig extends StreamsConfig {

Review comment:
   Well, `KafkaStreams` is the "streams client" IMHO :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460305033



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -185,32 +200,16 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
 
 log.info("Restoring state for global store {}", store.name());
 final List topicPartitions = 
topicPartitionsForStore(store);
-Map highWatermarks = null;
 
-int attempts = 0;
-while (highWatermarks == null) {
-try {
-highWatermarks = globalConsumer.endOffsets(topicPartitions);
-} catch (final TimeoutException retryableException) {
-if (++attempts > retries) {
-log.error("Failed to get end offsets for topic partitions 
of global store {} after {} retry attempts. " +
-"You can increase the number of retries via 
configuration parameter `retries`.",
-store.name(),
-retries,
-retryableException);
-throw new StreamsException(String.format("Failed to get 
end offsets for topic partitions of global store %s after %d retry attempts. " +
-"You can increase the number of retries via 
configuration parameter `retries`.", store.name(), retries),
-retryableException);
-}
-log.debug("Failed to get end offsets for partitions {}, 
backing off for {} ms to retry (attempt {} of {})",
-topicPartitions,
-retryBackoffMs,
-attempts,
-retries,
-retryableException);
-Utils.sleep(retryBackoffMs);
-}
-}
+final Map highWatermarks = new HashMap<>();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
highWatermarks.putAll(globalConsumer.endOffsets(topicPartitions)),

Review comment:
   Correct, we rely on the clients internal backoff behavior here. Happy to 
switch to a `Supplier`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460305945



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();

Review comment:
   Well, I guess we want to restore partition by partition because the 
"restore listener" callback is easier to handle this way?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460307083



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(globalConsumer.position(topicPartition)),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
+offset = position.get();
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
+long deadlineMs = NO_DEADLINE;
 while (offset < highWatermark) {
 try {
 final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+if (records.isEmpty()) {
+if (taskTimeoutMs == 0L) {
+deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
   > Can you explain how the outcome is different than just calling 
globalConsumer.poll(requestTimeoutMs)?
   
   I did consider it, but was not 100% sure if this might be better. Was also 
concerned about "miss using" request timeout (as you already mentioned) as we 
have `StreamsConfig.POLL_TIMEOUT_MS`.
   
   I like the proposal to use `pollTime + taskTimeoutMs` though!
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log

2020-07-24 Thread GitBox

ijuma commented on pull request #9054:
URL: https://github.com/apache/kafka/pull/9054#issuecomment-663753175


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460308601



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
restoreRecords.add(recordConverter.convert(record));
 }
 }
-offset = globalConsumer.position(topicPartition);
+try {
+offset = globalConsumer.position(topicPartition);
+} catch (final TimeoutException error) {
+// the `globalConsumer.position()` call should never 
block, because we know that we did
+// a successful `position()` call above for the 
requested partition and thus the consumer
+// should have a valid local position that it can 
return immediately

Review comment:
   What John says: We need to track the restore progress to eventually 
break the outer while loop. We don't use `offset` to `seek()`, so it's not 
about updating the consumer's position but knowing when we are done.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9731) Increased fetch request rate with leader selector due to HW propagation

2020-07-24 Thread Shane (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163061#comment-17163061
 ] 

Shane edited comment on KAFKA-9731 at 7/24/20, 10:07 PM:
-

Hey Kafka team, 
 What are the odds of back-porting this to 2.5.  I'm seeing the same issues in 
2.5.
 I can add more details if you want to see, but we're seeing the same increase 
in fetch requests per second. 

Thanks!


was (Author: shanesaww):
Hey Apache team, 
What are the odds of back-porting this to 2.5.  I'm seeing the same issues in 
2.5.
I can add more details if you want to see, but we're seeing the same increase 
in fetch requests per second. 

Thanks!

> Increased fetch request rate with leader selector due to HW propagation
> ---
>
> Key: KAFKA-9731
> URL: https://issues.apache.org/jira/browse/KAFKA-9731
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.4.0, 2.4.1
>Reporter: Vahid Hashemian
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.6.0
>
> Attachments: image-2020-03-17-10-19-08-987.png
>
>
> KIP-392 adds high watermark propagation to followers as a means to better 
> sync up followers HW with leader. The issue we have noticed after trying out 
> 2.4.0 and 2.4.1 is a spike in fetch request rate in the default selector case 
> (leader), that does not really require this high watermark propagation:
> !image-2020-03-17-10-19-08-987.png|width=811,height=354!
> This spike causes an increase in resource allocation (CPU) on the brokers.
> An easy solution would be to disable this propagation (at least) for the 
> default leader selector case to improve the backward compatibility.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460308959



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -318,6 +341,72 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 }
 }
 
+private void retryUntilSuccessOrThrowOnTaskTimeout(final Runnable runnable,
+   final String 
errorMessage) {
+long deadlineMs = NO_DEADLINE;
+
+do {
+try {
+runnable.run();
+return;
+} catch (final TimeoutException retryableException) {

Review comment:
   Not sure if I can follow?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460308797



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(globalConsumer.position(topicPartition)),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
+offset = position.get();
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
+long deadlineMs = NO_DEADLINE;
 while (offset < highWatermark) {
 try {
 final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+if (records.isEmpty()) {
+if (taskTimeoutMs == 0L) {
+deadlineMs = maybeUpdateDeadlineOrThrow(
+deadlineMs,
+requestTimeoutMs,
+new StreamsException(String.format(
+"Global task did not make progress to 
restore state. Retrying is disabled. You can enable it by setting `%s` to a 
value larger than zero.",
+StreamsConfig.TASK_TIMEOUT_MS_CONFIG
+))
+);
+} else {
+deadlineMs = 
maybeUpdateDeadlineOrThrow(deadlineMs);
+}
+
+continue;
+}
+deadlineMs = NO_DEADLINE;
+
 final List> restoreRecords 
= new ArrayList<>();
 for (final ConsumerRecord record : 
records.records(topicPartition)) {
 if (record.key() != null) {
 
restoreRecords.add(recordConverter.convert(record));
 }
 }
-offset = globalConsumer.position(topicPartition);
+try {
+offset = globalConsumer.position(topicPartition);
+} catch (final TimeoutException error) {
+// the `globalConsumer.position()` call should never 
block, because we know that we did
+// a successful `position()` call above for the 
requested partition and thus the consumer
+// should have a valid local position that it can 
return immediately
+
+// hence, a `TimeoutException` indicates a bug and 
thus we rethrow it as fatal `IllegalStateException`
+throw new IllegalStateException(error);

Review comment:
   As this would expose a bug, is does not seem useful to users -- and for 
us, we have the comment in the code.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException

2020-07-24 Thread GitBox

vvcephei commented on a change in pull request #9075:
URL: https://github.com/apache/kafka/pull/9075#discussion_r460307479



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
##
@@ -114,8 +118,16 @@ public void flushState() {
 stateMgr.checkpoint(offsets);
 }
 
-public void close() throws IOException {
+public void close(final boolean wipeStateStore) throws IOException {
 stateMgr.close();
+if (wipeStateStore) {
+try {
+log.error("Wiping state stores for global task.");
+Utils.delete(stateMgr.baseDir());
+} catch (final IOException e) {
+log.error("Failed to wiping state stores for global task.", e);

Review comment:
   ```suggestion
   log.error("Failed to delete global task directory after 
detecting corruption.", e);
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
##
@@ -114,8 +118,16 @@ public void flushState() {
 stateMgr.checkpoint(offsets);
 }
 
-public void close() throws IOException {
+public void close(final boolean wipeStateStore) throws IOException {
 stateMgr.close();
+if (wipeStateStore) {
+try {
+log.error("Wiping state stores for global task.");

Review comment:
   This doesn't seem to be an error. Maybe info would be better? Also, I 
think "wipe state stores" might be confusing for a user looking at the log 
messages with no context. "Deleting the task directory" seems to be a more 
context-free statement of what we're doing.
   
   ```suggestion
   log.info("Deleting global task directory after detecting 
corruption.");
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java
##
@@ -31,7 +31,7 @@
 
 void flushState();
 
-void close() throws IOException;
+void close(final boolean wipeStateStore) throws IOException;

Review comment:
   Thanks @mjsax , this sounds perfect to me.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
##
@@ -284,10 +278,21 @@ public void run() {
 }
 setState(State.RUNNING);
 
+boolean wipeStateStore = false;
 try {
 while (stillRunning()) {
 stateConsumer.pollAndUpdate();
 }
+} catch (final InvalidOffsetException recoverableException) {
+wipeStateStore = true;
+log.error(
+"Updating global state failed. You can restart KafkaStreams to 
recover from this error.",

Review comment:
   ```suggestion
   "Updating global state failed due to inconsistent local 
state. Will attempt to clean up the local state. You can restart KafkaStreams 
to recover from this error.",
   ```
   
   Just a thought to indicate why just restarting would recover anything.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460309750



##
File path: streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
##
@@ -78,6 +78,7 @@ public void init(final ProcessorContext context, final 
StateStore root) {
 new File(context.stateDir() + File.separator + name).mkdir();
 }
 this.initialized = true;
+context.register(root, (k, v) -> { });

Review comment:
   That is what the real code does, too. Otherwise we need to call 
`register` manually in the tests what is annoying.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException

2020-07-24 Thread GitBox

vvcephei commented on pull request #9075:
URL: https://github.com/apache/kafka/pull/9075#issuecomment-663754855


   Otherwise, it LGTM. Feel free to merge whether or not you like my 
suggestions ;) 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9075:
URL: https://github.com/apache/kafka/pull/9075#discussion_r460315914



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
##
@@ -331,17 +336,36 @@ private StateConsumer initialize() {
 logContext,
 globalConsumer,
 new GlobalStateUpdateTask(
+logContext,
 topology,
 globalProcessorContext,
 stateMgr,
-config.defaultDeserializationExceptionHandler(),
-logContext
+config.defaultDeserializationExceptionHandler()
 ),
 time,
 
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)),
 config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)
 );
-stateConsumer.initialize();
+
+try {
+stateConsumer.initialize();
+} catch (final InvalidOffsetException recoverableException) {
+log.error(
+"Bootstrapping global state failed. You can restart 
KafkaStreams to recover from this error.",

Review comment:
   ```suggestion
   "Bootstrapping global state failed due to inconsistent 
local state. Will attempt to clean up the local state. You can restart 
KafkaStreams to recover from this error.",
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dhruvilshah3 commented on a change in pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log

2020-07-24 Thread GitBox

dhruvilshah3 commented on a change in pull request #9054:
URL: https://github.com/apache/kafka/pull/9054#discussion_r460317793



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -932,6 +927,7 @@ class LogManager(logDirs: Seq[File],
 val logsToCheckpoint = logsInDir(logDir)
 checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, 
logsToCheckpoint, ArrayBuffer.empty)
 checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
+sourceLog.removeLogMetrics()

Review comment:
   This seems a bit odd but looks like an existing issue. We are removing 
metrics for the `current` log but we leave the metrics for the `future` log as 
is, which means it will retain the `is-future` tag. Maybe worth filing a JIRA 
if you think it is a problem too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460319742



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(globalConsumer.position(topicPartition)),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
+offset = position.get();
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
+long deadlineMs = NO_DEADLINE;
 while (offset < highWatermark) {
 try {
 final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+if (records.isEmpty()) {
+if (taskTimeoutMs == 0L) {
+deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
   Actually, we need to use all three: `pollTime + requestTimeout + 
taskTimeoutMs`
   
   If we don't include `requestTimeout` and `taskTimeoutMs` is set to `0` (or a 
smaller value than `requestTimeout` in general), the first `poll()` would 
return no data with very high probability and we would fail what seems not 
desirable.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460308959



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -318,6 +341,72 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 }
 }
 
+private void retryUntilSuccessOrThrowOnTaskTimeout(final Runnable runnable,
+   final String 
errorMessage) {
+long deadlineMs = NO_DEADLINE;
+
+do {
+try {
+runnable.run();
+return;
+} catch (final TimeoutException retryableException) {

Review comment:
   Not sure if I can follow?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460320303



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(globalConsumer.position(topicPartition)),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
+offset = position.get();
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
+long deadlineMs = NO_DEADLINE;
 while (offset < highWatermark) {
 try {
 final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+if (records.isEmpty()) {
+if (taskTimeoutMs == 0L) {
+deadlineMs = maybeUpdateDeadlineOrThrow(
+deadlineMs,
+requestTimeoutMs,
+new StreamsException(String.format(
+"Global task did not make progress to 
restore state. Retrying is disabled. You can enable it by setting `%s` to a 
value larger than zero.",
+StreamsConfig.TASK_TIMEOUT_MS_CONFIG
+))
+);
+} else {
+deadlineMs = 
maybeUpdateDeadlineOrThrow(deadlineMs);
+}
+
+continue;
+}
+deadlineMs = NO_DEADLINE;
+
 final List> restoreRecords 
= new ArrayList<>();
 for (final ConsumerRecord record : 
records.records(topicPartition)) {
 if (record.key() != null) {
 
restoreRecords.add(recordConverter.convert(record));
 }
 }
-offset = globalConsumer.position(topicPartition);
+try {
+offset = globalConsumer.position(topicPartition);
+} catch (final TimeoutException error) {
+// the `globalConsumer.position()` call should never 
block, because we know that we did
+// a successful `position()` call above for the 
requested partition and thus the consumer
+// should have a valid local position that it can 
return immediately
+
+// hence, a `TimeoutException` indicates a bug and 
thus we rethrow it as fatal `IllegalStateException`
+throw new IllegalStateException(error);

Review comment:
   As requested by John, I'll drop this and just call 
`retryUntilSuccessOrThrowOnTaskTimeout` here, too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460320506



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
restoreRecords.add(recordConverter.convert(record));
 }
 }
-offset = globalConsumer.position(topicPartition);
+try {
+offset = globalConsumer.position(topicPartition);
+} catch (final TimeoutException error) {
+// the `globalConsumer.position()` call should never 
block, because we know that we did
+// a successful `position()` call above for the 
requested partition and thus the consumer
+// should have a valid local position that it can 
return immediately

Review comment:
   I guess with the new `retryUntilSuccessOrThrowOnTaskTimeout` the code is 
still clean is we just use it here.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
restoreRecords.add(recordConverter.convert(record));
 }
 }
-offset = globalConsumer.position(topicPartition);
+try {
+offset = globalConsumer.position(topicPartition);
+} catch (final TimeoutException error) {
+// the `globalConsumer.position()` call should never 
block, because we know that we did
+// a successful `position()` call above for the 
requested partition and thus the consumer
+// should have a valid local position that it can 
return immediately

Review comment:
   I guess with the new `retryUntilSuccessOrThrowOnTaskTimeout` the code is 
still clean is we just use it here, too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #9067: MINOR: Streams integration tests should not call exit

2020-07-24 Thread GitBox

lbradstreet commented on a change in pull request #9067:
URL: https://github.com/apache/kafka/pull/9067#discussion_r460322318



##
File path: checkstyle/checkstyle.xml
##
@@ -103,6 +103,13 @@
   
 
 
+
+  
+  
+  
+  

Review comment:
   Thank you for this!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#issuecomment-663766278


   Thanks for the review @abbccdda @vvcephei -- updated the PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460324591



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(globalConsumer.position(topicPartition)),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
+offset = position.get();
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
+long deadlineMs = NO_DEADLINE;
 while (offset < highWatermark) {
 try {
 final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+if (records.isEmpty()) {
+if (taskTimeoutMs == 0L) {
+deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
   Btw: just for completeness: we will still use `pollTime` unmodified in 
`globalConsumer.poll(...)` during regular processing, so the config has still 
value. Only in the startup bootstrapping phase, we would use the sum of 
poll/request/task-timeout to not fail unnecessarily.
   
   Also note that during normal processing, we don't apply `taskTimeoutMs` atm. 
Not sure if we should, but I would not want to include it in this PR. -- It 
won't block the "main" processing if the global `poll()` loop would not make 
progress because the `StreamThreads` are already running, and thus it's impact 
is different -- also, during normal processing the global `highWatermark` may 
move and thus we need a different strategy anyway compared to the bootstrap 
phase.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #9061: MINOR: removed incorrect deprecation annotations

2020-07-24 Thread GitBox

mjsax merged pull request #9061:
URL: https://github.com/apache/kafka/pull/9061


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460338414



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(globalConsumer.position(topicPartition)),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
+offset = position.get();
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
+long deadlineMs = NO_DEADLINE;
 while (offset < highWatermark) {
 try {
 final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+if (records.isEmpty()) {
+if (taskTimeoutMs == 0L) {
+deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
   Ah, that last point sounds good.
   
   I'm not opposed, but it still seems strange to grab a random timeout from 
the client config and apply it to the `poll` method. It kind of makes me wonder 
what we're really trying to do here. If we want to give poll at least 30 
seconds to return data, then we can just give it at least 30 seconds, right? No 
reason to abuse the client configuration.
   
   On the other hand, the pollTime may not even appropriate for the global 
thread, right? It seems more like it was meant for StreamThread to make sure we 
don't block too long and violate the max poll interval. But since the global 
thread is only assigned and not subscribed, it has no constraint on how long it 
should block, except for the taskTimeoutMs config, right?
   
   Clearly, we need to set some kind of lower bound on it, though, but it's not 
clear that it needs to be configurable. Anyway, just food for thought. Feel 
free to keep the requestTimeout if you really think it's appropriate.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-24 Thread GitBox

vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460338747



##
File path: 
streams/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java
##
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.internals;
-
-import org.apache.kafka.streams.StreamsConfig;
-
-import java.util.Map;
-
-/**
- * A {@link StreamsConfig} that does not log its configuration on construction.
- *
- * This producer cleaner output for unit tests using the {@code test-utils},
- * since logging the config is not really valuable in this context.
- */
-public class QuietStreamsConfig extends StreamsConfig {

Review comment:
   Haha, that's kind of a stretch, but it's not a big deal :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >