[jira] [Commented] (KAFKA-10307) Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
[ https://issues.apache.org/jira/browse/KAFKA-10307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17164760#comment-17164760 ] Boyang Chen commented on KAFKA-10307: - [~vvcephei] Could you take a look? [~feyman] found this out in his PR to refactor the stream assignor logic: [https://github.com/apache/kafka/pull/8832#issuecomment-646943356] > Topology cycles in > KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable > - > > Key: KAFKA-10307 > URL: https://issues.apache.org/jira/browse/KAFKA-10307 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Priority: Minor > > We have spotted a cycled topology for the foreign-key join test > *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug > in the algorithm or the test only. Used > [https://zz85.github.io/kafka-streams-viz/] to visualize: > {code:java} > Sub-topology: 0 > Source: KTABLE-SOURCE-19 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 > Source: KTABLE-SOURCE-32 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-30-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 > Source: KSTREAM-SOURCE-01 (topics: [table1]) > --> KTABLE-SOURCE-02 > Processor: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 (stores: > [table1-STATE-STORE-00]) > --> KTABLE-FK-JOIN-OUTPUT-21 > <-- KTABLE-SOURCE-19 > Processor: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 (stores: > [INNER-store1]) > --> KTABLE-FK-JOIN-OUTPUT-34 > <-- KTABLE-SOURCE-32 > Processor: KTABLE-FK-JOIN-OUTPUT-21 (stores: [INNER-store1]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 > Processor: KTABLE-FK-JOIN-OUTPUT-34 (stores: [INNER-store2]) > --> KTABLE-TOSTREAM-35 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 > Processor: KTABLE-SOURCE-02 (stores: > [table1-STATE-STORE-00]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 > <-- KSTREAM-SOURCE-01 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 (stores: > []) > --> KTABLE-SINK-11 > <-- KTABLE-SOURCE-02 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 (stores: > []) > --> KTABLE-SINK-24 > <-- KTABLE-FK-JOIN-OUTPUT-21 > Processor: KTABLE-TOSTREAM-35 (stores: []) > --> KSTREAM-SINK-36 > <-- KTABLE-FK-JOIN-OUTPUT-34 > Sink: KSTREAM-SINK-36 (topic: output-) > <-- KTABLE-TOSTREAM-35 > Sink: KTABLE-SINK-11 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 > Sink: KTABLE-SINK-24 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 Sub-topology: 1 > Source: KSTREAM-SOURCE-04 (topics: [table2]) > --> KTABLE-SOURCE-05 > Source: KTABLE-SOURCE-12 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 (stores: > [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 > <-- KTABLE-SOURCE-12 > Processor: KTABLE-SOURCE-05 (stores: > [table2-STATE-STORE-03]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 > <-- KSTREAM-SOURCE-04 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 (stores: > [table2-STATE-STORE-03]) > --> KTABLE-SINK-18 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 (stores: > [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13]) > --> KTABLE-SINK-18 > <-- KTABLE-SOURCE-05 > Sink: KTABLE-SINK-18 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15, > KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 Sub-topology: 2 > Source: KSTREAM-SOURCE-07 (topics: [table3]) > --> KTABLE-SOURCE-08 > Sou
[jira] [Updated] (KAFKA-10307) Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
[ https://issues.apache.org/jira/browse/KAFKA-10307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10307: Component/s: streams > Topology cycles in > KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable > - > > Key: KAFKA-10307 > URL: https://issues.apache.org/jira/browse/KAFKA-10307 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Priority: Major > > We have spotted a cycled topology for the foreign-key join test > *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug > in the algorithm or the test only. Used > [https://zz85.github.io/kafka-streams-viz/] to visualize: > {code:java} > Sub-topology: 0 > Source: KTABLE-SOURCE-19 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 > Source: KTABLE-SOURCE-32 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-30-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 > Source: KSTREAM-SOURCE-01 (topics: [table1]) > --> KTABLE-SOURCE-02 > Processor: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 (stores: > [table1-STATE-STORE-00]) > --> KTABLE-FK-JOIN-OUTPUT-21 > <-- KTABLE-SOURCE-19 > Processor: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 (stores: > [INNER-store1]) > --> KTABLE-FK-JOIN-OUTPUT-34 > <-- KTABLE-SOURCE-32 > Processor: KTABLE-FK-JOIN-OUTPUT-21 (stores: [INNER-store1]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 > Processor: KTABLE-FK-JOIN-OUTPUT-34 (stores: [INNER-store2]) > --> KTABLE-TOSTREAM-35 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 > Processor: KTABLE-SOURCE-02 (stores: > [table1-STATE-STORE-00]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 > <-- KSTREAM-SOURCE-01 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 (stores: > []) > --> KTABLE-SINK-11 > <-- KTABLE-SOURCE-02 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 (stores: > []) > --> KTABLE-SINK-24 > <-- KTABLE-FK-JOIN-OUTPUT-21 > Processor: KTABLE-TOSTREAM-35 (stores: []) > --> KSTREAM-SINK-36 > <-- KTABLE-FK-JOIN-OUTPUT-34 > Sink: KSTREAM-SINK-36 (topic: output-) > <-- KTABLE-TOSTREAM-35 > Sink: KTABLE-SINK-11 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 > Sink: KTABLE-SINK-24 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 Sub-topology: 1 > Source: KSTREAM-SOURCE-04 (topics: [table2]) > --> KTABLE-SOURCE-05 > Source: KTABLE-SOURCE-12 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 (stores: > [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 > <-- KTABLE-SOURCE-12 > Processor: KTABLE-SOURCE-05 (stores: > [table2-STATE-STORE-03]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 > <-- KSTREAM-SOURCE-04 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 (stores: > [table2-STATE-STORE-03]) > --> KTABLE-SINK-18 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 (stores: > [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13]) > --> KTABLE-SINK-18 > <-- KTABLE-SOURCE-05 > Sink: KTABLE-SINK-18 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15, > KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 Sub-topology: 2 > Source: KSTREAM-SOURCE-07 (topics: [table3]) > --> KTABLE-SOURCE-08 > Source: KTABLE-SOURCE-25 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTIO
[jira] [Updated] (KAFKA-10307) Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
[ https://issues.apache.org/jira/browse/KAFKA-10307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10307: Affects Version/s: 2.6.0 2.4.0 2.5.0 > Topology cycles in > KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable > - > > Key: KAFKA-10307 > URL: https://issues.apache.org/jira/browse/KAFKA-10307 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Priority: Major > > We have spotted a cycled topology for the foreign-key join test > *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug > in the algorithm or the test only. Used > [https://zz85.github.io/kafka-streams-viz/] to visualize: > {code:java} > Sub-topology: 0 > Source: KTABLE-SOURCE-19 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 > Source: KTABLE-SOURCE-32 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-30-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 > Source: KSTREAM-SOURCE-01 (topics: [table1]) > --> KTABLE-SOURCE-02 > Processor: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 (stores: > [table1-STATE-STORE-00]) > --> KTABLE-FK-JOIN-OUTPUT-21 > <-- KTABLE-SOURCE-19 > Processor: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 (stores: > [INNER-store1]) > --> KTABLE-FK-JOIN-OUTPUT-34 > <-- KTABLE-SOURCE-32 > Processor: KTABLE-FK-JOIN-OUTPUT-21 (stores: [INNER-store1]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 > Processor: KTABLE-FK-JOIN-OUTPUT-34 (stores: [INNER-store2]) > --> KTABLE-TOSTREAM-35 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 > Processor: KTABLE-SOURCE-02 (stores: > [table1-STATE-STORE-00]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 > <-- KSTREAM-SOURCE-01 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 (stores: > []) > --> KTABLE-SINK-11 > <-- KTABLE-SOURCE-02 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 (stores: > []) > --> KTABLE-SINK-24 > <-- KTABLE-FK-JOIN-OUTPUT-21 > Processor: KTABLE-TOSTREAM-35 (stores: []) > --> KSTREAM-SINK-36 > <-- KTABLE-FK-JOIN-OUTPUT-34 > Sink: KSTREAM-SINK-36 (topic: output-) > <-- KTABLE-TOSTREAM-35 > Sink: KTABLE-SINK-11 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 > Sink: KTABLE-SINK-24 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 Sub-topology: 1 > Source: KSTREAM-SOURCE-04 (topics: [table2]) > --> KTABLE-SOURCE-05 > Source: KTABLE-SOURCE-12 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 (stores: > [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 > <-- KTABLE-SOURCE-12 > Processor: KTABLE-SOURCE-05 (stores: > [table2-STATE-STORE-03]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 > <-- KSTREAM-SOURCE-04 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 (stores: > [table2-STATE-STORE-03]) > --> KTABLE-SINK-18 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 (stores: > [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13]) > --> KTABLE-SINK-18 > <-- KTABLE-SOURCE-05 > Sink: KTABLE-SINK-18 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15, > KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 Sub-topology: 2 > Source: KSTREAM-SOURCE-07 (topics: [table3]) > --> KTABLE-SOURCE-08 > Source: KTABLE-SOURCE-25 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic]) >
[jira] [Updated] (KAFKA-10307) Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
[ https://issues.apache.org/jira/browse/KAFKA-10307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10307: Priority: Major (was: Minor) > Topology cycles in > KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable > - > > Key: KAFKA-10307 > URL: https://issues.apache.org/jira/browse/KAFKA-10307 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Priority: Major > > We have spotted a cycled topology for the foreign-key join test > *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug > in the algorithm or the test only. Used > [https://zz85.github.io/kafka-streams-viz/] to visualize: > {code:java} > Sub-topology: 0 > Source: KTABLE-SOURCE-19 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 > Source: KTABLE-SOURCE-32 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-30-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 > Source: KSTREAM-SOURCE-01 (topics: [table1]) > --> KTABLE-SOURCE-02 > Processor: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 (stores: > [table1-STATE-STORE-00]) > --> KTABLE-FK-JOIN-OUTPUT-21 > <-- KTABLE-SOURCE-19 > Processor: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 (stores: > [INNER-store1]) > --> KTABLE-FK-JOIN-OUTPUT-34 > <-- KTABLE-SOURCE-32 > Processor: KTABLE-FK-JOIN-OUTPUT-21 (stores: [INNER-store1]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 > Processor: KTABLE-FK-JOIN-OUTPUT-34 (stores: [INNER-store2]) > --> KTABLE-TOSTREAM-35 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 > Processor: KTABLE-SOURCE-02 (stores: > [table1-STATE-STORE-00]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 > <-- KSTREAM-SOURCE-01 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 (stores: > []) > --> KTABLE-SINK-11 > <-- KTABLE-SOURCE-02 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 (stores: > []) > --> KTABLE-SINK-24 > <-- KTABLE-FK-JOIN-OUTPUT-21 > Processor: KTABLE-TOSTREAM-35 (stores: []) > --> KSTREAM-SINK-36 > <-- KTABLE-FK-JOIN-OUTPUT-34 > Sink: KSTREAM-SINK-36 (topic: output-) > <-- KTABLE-TOSTREAM-35 > Sink: KTABLE-SINK-11 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 > Sink: KTABLE-SINK-24 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 Sub-topology: 1 > Source: KSTREAM-SOURCE-04 (topics: [table2]) > --> KTABLE-SOURCE-05 > Source: KTABLE-SOURCE-12 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 (stores: > [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 > <-- KTABLE-SOURCE-12 > Processor: KTABLE-SOURCE-05 (stores: > [table2-STATE-STORE-03]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 > <-- KSTREAM-SOURCE-04 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 (stores: > [table2-STATE-STORE-03]) > --> KTABLE-SINK-18 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 (stores: > [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13]) > --> KTABLE-SINK-18 > <-- KTABLE-SOURCE-05 > Sink: KTABLE-SINK-18 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15, > KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 Sub-topology: 2 > Source: KSTREAM-SOURCE-07 (topics: [table3]) > --> KTABLE-SOURCE-08 > Source: KTABLE-SOURCE-25 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-27 > Processor: KTABLE-FK-JOIN-SUBSCRIPTI
[GitHub] [kafka] abbccdda commented on pull request #8907: MINOR: code cleanup for `VOut` inconsistent naming
abbccdda commented on pull request #8907: URL: https://github.com/apache/kafka/pull/8907#issuecomment-663817294 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-663818439 @junrao I have rebased this PR to include fix of ```group_mode_transactions_test```. Could you run system tests again? Except for ```streams_eos_test``` and transaction tests, other tests work well on my local. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 edited a comment on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 edited a comment on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-663818439 @junrao I have rebased this PR to include fix of ```group_mode_transactions_test```. Could you run system tests again? Except for ```streams_eos_test```, ```streams.streams_eos_test``` and transaction tests, other tests work well on my local. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 edited a comment on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 edited a comment on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-663818439 @junrao I have rebased this PR to include fix of ```group_mode_transactions_test```. Could you run system tests again? Except for ```streams_eos_test```, ```streams_standby_replica_test``` and transaction tests, other tests work well on my local. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on pull request #9047: URL: https://github.com/apache/kafka/pull/9047#issuecomment-663383364 @vvcephei I updated the PR and needed to do quite some changes to get it into reasonable shape. Instead of throwing a new `RetryableException` that we catch in the outer layer, I just retry in each step directly. This make reasoning about the timeout easier. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r459890602 ## File path: streams/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java ## @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.internals; - -import org.apache.kafka.streams.StreamsConfig; - -import java.util.Map; - -/** - * A {@link StreamsConfig} that does not log its configuration on construction. - * - * This producer cleaner output for unit tests using the {@code test-utils}, - * since logging the config is not really valuable in this context. - */ -public class QuietStreamsConfig extends StreamsConfig { Review comment: Moved this class into `ClientUtils.java` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r459891762 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; Review comment: Some side improvement: if we `seek` base on the checkpoint, there is no reason to call `position()` because we know what our offset is. -- Only if we `seekToBeginning()` we need to get the current offset from the consumer itself (instead of `position()` we could also call `beginningOffsets` but `position` it the easer to use API. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r459892544 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(globalConsumer.position(topicPartition)), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); +offset = position.get(); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; +long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords records = globalConsumer.poll(pollTime); +if (records.isEmpty()) { +if (taskTimeoutMs == 0L) { +deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: For `poll()`, even if retrying is disabled, we need to retry to give a fetch request (with default request timeout of 30 seconds but only a default `pollTime` of 100ms) a fair change to actually return. At least I believe this makes sense. Also \cc @guozhangwang This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r459892977 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, restoreRecords.add(recordConverter.convert(record)); } } -offset = globalConsumer.position(topicPartition); +try { +offset = globalConsumer.position(topicPartition); +} catch (final TimeoutException error) { +// the `globalConsumer.position()` call should never block, because we know that we did +// a successful `position()` call above for the requested partition and thus the consumer +// should have a valid local position that it can return immediately Review comment: We don't need to call `retryUntilSuccessOrThrowOnTaskTimeout` now, so we actually get cleaner code :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] huxihx commented on pull request #9071: KAFKA-10305: Print usage when parsing fails for ConsumerPerformance
huxihx commented on pull request #9071: URL: https://github.com/apache/kafka/pull/9071#issuecomment-663404582 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17164290#comment-17164290 ] Jerry Wei commented on KAFKA-10134: --- [~guozhang] I'm using logback and PR: https://github.com/apache/kafka/pull/9011 included as well. yeah, just to restart consumer w/o broker change, both brokers &consumers were running on my local laptop, not found any other issue. > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > Attachments: consumer5.log.2020-07-22.log > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac opened a new pull request #9072: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)
dajac opened a new pull request #9072: URL: https://github.com/apache/kafka/pull/9072 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #8977: KAFKA-10162; Quota Enhancements (KIP-599)
dajac commented on pull request #8977: URL: https://github.com/apache/kafka/pull/8977#issuecomment-663462152 I have opened a new proper PR: https://github.com/apache/kafka/pull/9072 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6764) ConsoleConsumer behavior inconsistent when specifying --partition with --from-beginning
[ https://issues.apache.org/jira/browse/KAFKA-6764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17164370#comment-17164370 ] Pardhu Madipalli commented on KAFKA-6764: - I am observing this behavior with kafka broker with SSL enabled even with a new consumer group also. I produced a few messages. Then I created a new consumer with a new group. Then *kafka-console-consumer --from-beginning* is not displaying any messages. But when I tried with *--partition 0* I could see it reading the messages. Then I killed my broker with ssl and created a new broker without ssl. This time, even if I did not specify *--partition 0*, I can see all the messages as expected. In both the cases only a single broker is running. Topic partitions and replication factor are both 1. > ConsoleConsumer behavior inconsistent when specifying --partition with > --from-beginning > > > Key: KAFKA-6764 > URL: https://issues.apache.org/jira/browse/KAFKA-6764 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Larry McQueary >Assignee: Larry McQueary >Priority: Minor > Labels: newbie > > Per its usage statement, {{kafka-console-consumer.sh}} ignores > {{\-\-from-beginning}} when the specified consumer group has committed > offsets, and sets {{auto.offset.reset}} to {{latest}}. However, if > {{\-\-partition}} is also specified, {{\-\-from-beginning}} is observed in > all cases, whether there are committed offsets or not. > This happens because when {{\-\-from-beginning}} is specified, {{offsetArg}} > is set to {{OffsetRequest.EarliestTime}}. However, {{offsetArg}} is [only > passed to the > constructor|https://github.com/apache/kafka/blob/fedac0cea74fce529ee1c0cefd6af53ecbdd/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L76-L79] > for {{NewShinyConsumer}} when {{\-\-partition}} is also specified. Hence, it > is honored in this case and not the other. > This case should either be handled consistently, or the usage statement > should be modified to indicate the actual behavior/usage. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9924) Add RocksDB Memory Consumption to RocksDB Metrics
[ https://issues.apache.org/jira/browse/KAFKA-9924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17164402#comment-17164402 ] Bruno Cadonna commented on KAFKA-9924: -- The scope of this ticket was extended to not only monitor memory usage but also to expose RocksDB properties in Kafka Streams metrics. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Report+Properties+of+RocksDB > Add RocksDB Memory Consumption to RocksDB Metrics > -- > > Key: KAFKA-9924 > URL: https://issues.apache.org/jira/browse/KAFKA-9924 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Labels: needs-kip > > RocksDB's memory consumption should be added to the RocksDB metrics. > RocksDB's memory consumption can be retrieved with the following class: > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/MemoryUtil.java > The memory consumption metrics should be added on client level and should be > recorded on INFO level. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
cadonna commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663512173 I started a run of Streams system tests: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4077/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
cadonna commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663512735 @mjsax I thought this fix is most urgently needed on 2.6 to unblock the release. I will open a new PR for trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
cadonna commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663512974 @ableegoldman I agree it should be a blocker. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna edited a comment on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
cadonna edited a comment on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663513384 @ableegoldman I agree that having the extra protection you propose makes sense. I will open a follow-up PR on trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
cadonna commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663513384 @ableegoldman I agree that having the extra protection you propose makes sense. I will open a follow-up PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on pull request #9065: URL: https://github.com/apache/kafka/pull/9065#issuecomment-663532890 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on pull request #9065: URL: https://github.com/apache/kafka/pull/9065#issuecomment-663534471 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on pull request #9065: URL: https://github.com/apache/kafka/pull/9065#issuecomment-663534611 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on pull request #9065: URL: https://github.com/apache/kafka/pull/9065#issuecomment-663537187 One PR build was started here: https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3515/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on pull request #9065: URL: https://github.com/apache/kafka/pull/9065#issuecomment-663537509 Looks like the last message somehow made Jenkins start working again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma edited a comment on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma edited a comment on pull request #9065: URL: https://github.com/apache/kafka/pull/9065#issuecomment-663537509 Looks like the last message somehow made the Jenkins status to be updated in the PR again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager
dajac commented on a change in pull request #9060: URL: https://github.com/apache/kafka/pull/9060#discussion_r460053049 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams // have existed with the expected number of partitions, or some create topic returns fatal errors. log.debug("Starting to validate internal topics {} in partition assignor.", topics); -int remainingRetries = retries; +long currentWallClockMs = time.milliseconds(); +final long deadlineMs = currentWallClockMs + retryTimeoutMs; Review comment: Thinking a bit more about this, with the default, you may end up not honouring the deadline. `createTopics` can take up to 1m so if you invoke one when less than 1m is reaming before the deadline, you may not honour the deadline. It may not be that important though. If we want to strictly enforce it, we could calculate the maximum timeout for each call, something like deadline - now, and set it with `CreateTopicsOptions.timeoutMs`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work
ijuma commented on pull request #9051: URL: https://github.com/apache/kafka/pull/9051#issuecomment-663547862 @huxihx Note that we usually include the reviewer(s) in the commit message. For example: https://github.com/apache/kafka/commit/0d5c967073e78e2a50ffa5bb860bff9ff43086d5 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
vvcephei commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663587201 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
vvcephei commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663587337 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-663597341 3 system test failures with trunk. http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2020-07-23--001.1595551051--apache--trunk--0b181fdde/report.html This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9067: MINOR: Streams integration tests should not call exit
vvcephei commented on pull request #9067: URL: https://github.com/apache/kafka/pull/9067#issuecomment-663599523 Hey @mjsax , I double-checked, and it does seem to be the right fix. I confirmed that no matter whether you run the test with IDEA or gradle, there's nothing to automatically set the exit procedure, so `Exit.exit` is equivalent to `System.exit` unless a test specifically overrides the exit procedure, as I have done here. Nevertheless, I see no reason not to also take your advice and just replace all calls to `System.exit` with `Exit.exit`. Since `Exit` is provided as a utility in the main Client module, it's available in every context in which Streams is available. Note, this by itself won't fix anything. If a test calls `Exit.exit`, it will fail with the reported error unless it overrides the exit procedure. I did my best to trace all call paths that end in `exit()`, and still `SmokeTestDriverIntegrationTest` is the only one that I found. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-663618045 @junrao thanks for the reports. Unfortunately, the failed tests are totally different in both results :( This PR has been rebased so the fix for ```group_mode_transactions_test``` is included. ```streams_standby_replica_test``` will get fixed by #9066. I will test the others on my local later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9067: MINOR: Streams integration tests should not call exit
vvcephei commented on pull request #9067: URL: https://github.com/apache/kafka/pull/9067#issuecomment-663631102 Hey @mjsax , Since Streams accounted for almost all of the `System.exit` calls, and since `Exit.exit` is available almost everywhere in AK, I'm proposing to add a checkstyle rule forbidding it. The violation will look like this: ``` [ant:checkstyle] [ERROR] /home/confluent/kafka/clients/src/main/java/org/apache/kafka/some/BadClass.java:43: Line matches the illegal pattern ''System.exit': Should not directly call System.exit, but Exit.exit instead.'. [Regexp] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
[ https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17164531#comment-17164531 ] Sophie Blee-Goldman commented on KAFKA-10284: - Nope, it just spun in a loop where it would poll and then call Consumer#position to try and initialize some metadata. Poll just returned empty I guess and Consumer#position continued to throw TimeoutException. The TimeoutException part seems pretty weird, so maybe it was an unrelated bug (or some other issue like a hung socket being the primary/only guess we had). I just thought of it because it definitely happened immediately after a "id does not match expected member.id" error. I actually still have some of the broker and client side logs from around this incident, if that might help. But again, I'm not really sure if it could be related to this or not > Group membership update due to static member rejoin should be persisted > --- > > Key: KAFKA-10284 > URL: https://issues.apache.org/jira/browse/KAFKA-10284 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.6.1 > > > For known static members rejoin, we would update its corresponding member.id > without triggering a new rebalance. This serves the purpose for avoiding > unnecessary rebalance for static membership, as well as fencing purpose if > some still uses the old member.id. > The bug is that we don't actually persist the membership update, so if no > upcoming rebalance gets triggered, this new member.id information will get > lost during group coordinator immigration, thus bringing up the zombie member > identity. > The bug find credit goes to [~hachikuji] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei opened a new pull request #9073: MINOR: add task ':streams:testAll'
vvcephei opened a new pull request #9073: URL: https://github.com/apache/kafka/pull/9073 Adds a new task to Streams to run all the tests for all sub-projects. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10306) GlobalThread might loop forever
[ https://issues.apache.org/jira/browse/KAFKA-10306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-10306: --- Assignee: Matthias J. Sax > GlobalThread might loop forever > --- > > Key: KAFKA-10306 > URL: https://issues.apache.org/jira/browse/KAFKA-10306 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Blocker > Fix For: 2.6.0 > > > We did some code refactoring in the `GlobalStateMangerImpl` that can lead to > an infinite loop if the global consumer throws an `InvalidOffsetException`. > This is a regression bug and thus I marked this ticket as blocker fro 2.6 > release. > The issue seem to occur only during the initial bootstrapping of the global > store, but not during regular processing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10306) GlobalThread might loop forever
Matthias J. Sax created KAFKA-10306: --- Summary: GlobalThread might loop forever Key: KAFKA-10306 URL: https://issues.apache.org/jira/browse/KAFKA-10306 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.0 Reporter: Matthias J. Sax Fix For: 2.6.0 We did some code refactoring in the `GlobalStateMangerImpl` that can lead to an infinite loop if the global consumer throws an `InvalidOffsetException`. This is a regression bug and thus I marked this ticket as blocker fro 2.6 release. The issue seem to occur only during the initial bootstrapping of the global store, but not during regular processing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8359) Reconsider default for leader imbalance percentage
[ https://issues.apache.org/jira/browse/KAFKA-8359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17164538#comment-17164538 ] Romain Hardouin commented on KAFKA-8359: {quote}[...] there is no major downside to set leader imbalance percentage to 0. {quote} Does it mean there are _minor_ downsides? Following the principle of least astonishment, the default value should be 0 because users expect to have balanced clusters when using {{auto.leader.rebalance.enable=true}} > Reconsider default for leader imbalance percentage > -- > > Key: KAFKA-8359 > URL: https://issues.apache.org/jira/browse/KAFKA-8359 > Project: Kafka > Issue Type: Improvement >Reporter: Dhruvil Shah >Priority: Major > > By default, the leader imbalance ratio is 10%. This means that the controller > won't trigger preferred leader election for a broker unless the ratio of the > number of partitions a broker is the current leader of and the number of > partitions it is the preferred leader of is off by more than 10%. The problem > is when a broker is catching up after a restart, the smallest topics tend to > catch up first and the largest ones later, so the 10% remaining difference > may not be proportional to the broker's load. To keep better balance in the > cluster, we should consider setting > `leader.imbalance.per.broker.percentage=0` by default so that the preferred > leaders are always elected. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
ableegoldman commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r460194062 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -243,18 +242,24 @@ public void handleAssignment(final Map> activeTasks, for (final Task task : tasksToClose) { try { -if (task.isActive()) { -// Active tasks are revoked and suspended/committed during #handleRevocation -if (!task.state().equals(State.SUSPENDED)) { -log.error("Active task {} should be suspended prior to attempting to close but was in {}", - task.id(), task.state()); -throw new IllegalStateException("Active task " + task.id() + " should have been suspended"); -} -} else { -task.suspend(); -task.prepareCommit(); -task.postCommit(); +// Always try to first suspend and commit the task before closing it; +// some tasks may already be suspended which should be a no-op. +// +// Also since active tasks should already be suspended / committed and +// standby tasks should have no offsets to commit, we should expect nothing to commit +task.suspend(); + +final Map offsets = task.prepareCommit(); + +if (!offsets.isEmpty()) { +log.error("Task {} should has been committed prior to attempting to close, but it reports non-empty offsets {} to commit", Review comment: I thought we would catch and save the first exception thrown by a rebalance listener callback, and then rethrow after all rebalance callbacks have been invoked? In this case that would mean `handleAssignment` would still get called, and then we would throw an IllegalStateException and bail on the rest of `handleAssignment` for no reason. The IllegalStateException itself is not the problem, since only the first exception (the TaskMigrated) would ultimately be thrown up to `poll`. But we should still go through the rest of `handleAssignment` in order to properly clean up the active tasks and manage the standbys (since we don't need to close standbys in case of TaskMigrated) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9073: MINOR: add task ':streams:testAll'
abbccdda commented on pull request #9073: URL: https://github.com/apache/kafka/pull/9073#issuecomment-663652859 One more thought, it might be useful to add this to the repo `README` if a user is only doing stream side changes and want to test it out on local @vvcephei. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] heritamas commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…
heritamas commented on a change in pull request #8730: URL: https://github.com/apache/kafka/pull/8730#discussion_r460197007 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java ## @@ -132,7 +132,7 @@ public String version() { return listConsumerGroupOffsets(group).entrySet().stream() .filter(x -> shouldCheckpointTopic(x.getKey().topic())) .map(x -> checkpoint(group, x.getKey(), x.getValue())) -.filter(x -> x.downstreamOffset() > 0) // ignore offsets we cannot translate accurately +.filter(x -> x.downstreamOffset() >= 0) // ignore offsets we cannot translate accurately Review comment: Yes, does not hurt to leave it. Just for sure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
ableegoldman commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r460211291 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ## @@ -38,13 +41,39 @@ */ final class StateManagerUtil { static final String CHECKPOINT_FILE_NAME = ".checkpoint"; +static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L; Review comment: It would be awesome if we could do it based on the size of the pending data and the configured memtable size. Not sure if that's really feasible, just throwing it out there This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
ableegoldman commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r460212084 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -454,6 +456,41 @@ public void flush() { } } +public void flushCache() { +RuntimeException firstException = null; +// attempting to flush the stores +if (!stores.isEmpty()) { +log.debug("Flushing all store caches registered in the state manager: {}", stores); +for (final StateStoreMetadata metadata : stores.values()) { +final StateStore store = metadata.stateStore; + +try { +// buffer should be flushed to send all records to changelog +if (store instanceof TimeOrderedKeyValueBuffer) { Review comment: WDYT about adding a generic `FlushingRequiredStore` marker interface that all caching state stores and the suppression buffer would both implement. It seems weird to handle them separately. We could even make this public and allow user custom state stores to implement this, but that might be opening a can of worms we will greatly regret 😉 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
ableegoldman commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r460212719 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ## @@ -49,6 +61,31 @@ this.stateDirectory = stateDirectory; } +protected void initializeCheckpoint() { +// we will delete the local checkpoint file after registering the state stores and loading them into the +// state manager, therefore we should initialize the snapshot as empty to indicate over-write checkpoint needed +offsetSnapshotSinceLastFlush = Collections.emptyMap(); +} + +/** + * The following exceptions maybe thrown from the state manager flushing call + * + * @throws TaskMigratedException recoverable error sending changelog records that would cause the task to be removed + * @throws StreamsException fatal error when flushing the state store, for example sending changelog records failed + * or flushing state store get IO errors; such error should cause the thread to die + */ +protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { +final Map offsetSnapshot = stateMgr.changelogOffsets(); +if (StateManagerUtil.checkpointNeeded(enforceCheckpoint, offsetSnapshotSinceLastFlush, offsetSnapshot)) { +// since there's no written offsets we can checkpoint with empty map, +// and the state's current offset would be used to checkpoint +stateMgr.flush(); +stateMgr.checkpoint(Collections.emptyMap()); Review comment: But `ProcessorStateManager` doesn't handle global tasks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
ableegoldman commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r460214147 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ## @@ -93,8 +93,8 @@ public boolean isActive() { public void initializeIfNeeded() { if (state() == State.CREATED) { StateManagerUtil.registerStateStores(log, logPrefix, topology, stateMgr, stateDirectory, processorContext); - // initialize the snapshot with the current offsets as we don't need to commit then until they change Review comment: Sounds good. I think the most important thing is to not have the comment that says `initialize the snapshot with the current offsets...` right above the line that initializes the snapshot as empty 🙁 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10260) Streams could recover stores in a task independently
[ https://issues.apache.org/jira/browse/KAFKA-10260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17164585#comment-17164585 ] Matthias J. Sax commented on KAFKA-10260: - Note: this issue applies for global tasks, too. Cf https://issues.apache.org/jira/browse/KAFKA-10306 > Streams could recover stores in a task independently > > > Key: KAFKA-10260 > URL: https://issues.apache.org/jira/browse/KAFKA-10260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > > Currently, ProcessorStateManager checks for corrupted tasks by checking, for > each persistent store, if its checkpoint is missing, then the task directory > must also be empty. > This is a little overzealous, since we aren't checking whether the store's > specific directory is nonempty, only if there are any directories for any > stores. So if there are two stores in a task, and one is correctly written > and checkpointed, while the other is neither written nor checkpointed, we > _could_ correctly load the first and recover the second but instead we'll > consider the whole task corrupted and discard the first and recover both. > The fix would be to check, for each persistent store that doesn't have a > checkpoint, that its _specific_ store directory is also missing. Such a store > will be restored from the changelog and we don't need to consider the task > corrupted. > See ProcessorStateManager#initializeStoreOffsetsFromCheckpoint -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on pull request #9065: URL: https://github.com/apache/kafka/pull/9065#issuecomment-663687037 One build passed, one failed due to environmental reasons, one had a single flaky failure: `org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1` I ran the full test suite locally and it passed as well. Merging to trunk and 2.6. cc @rhauch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma merged pull request #9065: URL: https://github.com/apache/kafka/pull/9065 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on pull request #9067: MINOR: Streams integration tests should not call exit
lbradstreet commented on pull request #9067: URL: https://github.com/apache/kafka/pull/9067#issuecomment-663689440 > Hey @mjsax , > > I double-checked, and it does seem to be the right fix. I confirmed that no matter whether you run the test with IDEA or gradle, there's nothing to automatically set the exit procedure, so `Exit.exit` is equivalent to `System.exit` unless a test specifically overrides the exit procedure, as I have done here. > > Nevertheless, I see no reason not to also take your advice and just replace all calls to `System.exit` with `Exit.exit`. Since `Exit` is provided as a utility in the main Client module, it's available in every context in which Streams is available. Note, this by itself won't fix anything. If a test calls `Exit.exit`, it will fail with the reported error unless it overrides the exit procedure. > > I did my best to trace all call paths that end in `exit()`, and still `SmokeTestDriverIntegrationTest` is the only one that I found. Right, you'll need ```Exit.setExitProcedure((_, _) => doSomething())``` to be setup somewhere to override the Exit.exit behavior, otherwise it will still do the same thing as it did before. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
abbccdda commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460165601 ## File path: docs/streams/developer-guide/config-streams.html ## @@ -326,13 +321,18 @@ bootstrap.serversstate.cleanup.delay.ms Low The amount of time in milliseconds to wait before deleting state when a partition has migrated. -60 milliseconds +60 milliseconds (10 minutes) state.dir High Directory location for state stores. /tmp/kafka-streams + task.timeout.ms +Medium Review comment: nit: for a doc clean-up, it is helpful to include a screenshot of updated paragraph. The changes starting at L331 should be suffice. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -318,6 +341,72 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, } } +private void retryUntilSuccessOrThrowOnTaskTimeout(final Runnable runnable, + final String errorMessage) { +long deadlineMs = NO_DEADLINE; + +do { +try { +runnable.run(); +return; +} catch (final TimeoutException retryableException) { Review comment: `retriableException` to be consistent with the defined exception type in AK. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); Review comment: For the above call, was curious why we couldn't seek for all the topic partitions that are missing positions here, instead of doing one by one look-up? ## File path: streams/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java ## @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.internals; - -import org.apache.kafka.streams.StreamsConfig; - -import java.util.Map; - -/** - * A {@link StreamsConfig} that does not log its configuration on construction. - * - * This producer cleaner output for unit tests using the {@code test-utils}, - * since logging the config is not really valuable in this context. - */ -public class QuietStreamsConfig extends StreamsConfig { Review comment: What's the purpose for this move? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(glob
[GitHub] [kafka] ableegoldman commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
ableegoldman commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663701977 @vvcephei I don't think there should be any issue there, since the checkpointed `OFFSET_UNKNOWN` is just ignored on read (or should be) and will never make it into the subscription info. That said, it does make me a bit nervous and I wouldn't count on this always being true, so I'd rather choose a different sentinel to be safe. Maybe we can put together a brief list of all sentinel values used in Streams (and the clients) to keep track and avoid issues like this going forward. I had to spend a long time digging around the client code to figure out whether an offset of -1 was ever possible in non-error cases (the answer seems to be "yes"). We'd have to actually keep it up to date of course, which might be...challenging This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
cadonna commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663702921 @vvcephei Thanks for checking! Given your finding, I will change `UNKNOWN_OFFSET` to `-4L`. Actually, we do not to change `UNKNOWN_OFFSET` at all, because we now never write it into the offset sums. I just thought it would be better to change it to not end up in a similar situation after a future refactoring. I will follow the same reasoning in this case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
vvcephei commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663710405 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
vvcephei commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663710503 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #9074: KAFKA-10287: PROPOSAL: safe offset tracking
vvcephei opened a new pull request #9074: URL: https://github.com/apache/kafka/pull/9074 This PR should be targeted to trunk, but it's not a trivial task. I just wanted to propose this approach. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9074: KAFKA-10287: PROPOSAL: safe offset tracking
vvcephei commented on pull request #9074: URL: https://github.com/apache/kafka/pull/9074#issuecomment-663719873 Hey @cadonna , I was reflecting on your fix for KAFKA-10287, and it seems like dealing with those sentinel values is quite risky. We've had several bugs in the past, I don't feel certain that we don't have bugs today, and any refactoring could easily introduce bugs in the future. What do you think about taking a structural approach to offset sentinel management, like in this proposal? This wouldn't be for 2.6, but it's targeted at that branch because I built upon your PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9073: MINOR: add task ':streams:testAll'
vvcephei commented on pull request #9073: URL: https://github.com/apache/kafka/pull/9073#issuecomment-663721003 Thanks @abbccdda ! There was only one flaky test, and the last change was only to the README, so I'll merge now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9073: MINOR: add task ':streams:testAll'
vvcephei merged pull request #9073: URL: https://github.com/apache/kafka/pull/9073 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9070: MINOR: speed up release script
vvcephei merged pull request #9070: URL: https://github.com/apache/kafka/pull/9070 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9052: MINOR: TopologyTestDriver should not require dummy parameters
vvcephei commented on pull request #9052: URL: https://github.com/apache/kafka/pull/9052#issuecomment-663728382 Thanks @mjsax . Your suggestions sound good to me. I'll circle back on this PR in a bit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9073: MINOR: add task ':streams:testAll'
mjsax commented on a change in pull request #9073: URL: https://github.com/apache/kafka/pull/9073#discussion_r460282456 ## File path: build.gradle ## @@ -1266,6 +1266,27 @@ project(':streams') { if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() } standardOutput = new File(generatedDocsDir, "streams_config.html").newOutputStream() } + + task testAll( +dependsOn: [ +':streams:test', +':streams:test-utils:test', +':streams:streams-scala:test', +':streams:upgrade-system-tests-0100:test', +':streams:upgrade-system-tests-0101:test', +':streams:upgrade-system-tests-0102:test', +':streams:upgrade-system-tests-0110:test', +':streams:upgrade-system-tests-10:test', +':streams:upgrade-system-tests-11:test', +':streams:upgrade-system-tests-20:test', +':streams:upgrade-system-tests-21:test', +':streams:upgrade-system-tests-22:test', +':streams:upgrade-system-tests-23:test', +':streams:upgrade-system-tests-24:test', +':streams:upgrade-system-tests-25:test', Review comment: I don't think we have actual unit/integration tests in those packages? Also seems a pain to keep it updates (and I am sure we will forget to update it). Should we just skip those? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9073: MINOR: add task ':streams:testAll'
mjsax commented on a change in pull request #9073: URL: https://github.com/apache/kafka/pull/9073#discussion_r460282610 ## File path: build.gradle ## @@ -1266,6 +1266,27 @@ project(':streams') { if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() } standardOutput = new File(generatedDocsDir, "streams_config.html").newOutputStream() } + + task testAll( +dependsOn: [ +':streams:test', +':streams:test-utils:test', +':streams:streams-scala:test', +':streams:upgrade-system-tests-0100:test', +':streams:upgrade-system-tests-0101:test', +':streams:upgrade-system-tests-0102:test', +':streams:upgrade-system-tests-0110:test', +':streams:upgrade-system-tests-10:test', +':streams:upgrade-system-tests-11:test', +':streams:upgrade-system-tests-20:test', +':streams:upgrade-system-tests-21:test', +':streams:upgrade-system-tests-22:test', +':streams:upgrade-system-tests-23:test', +':streams:upgrade-system-tests-24:test', +':streams:upgrade-system-tests-25:test', Review comment: Oh. Already merged. Never mind. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9067: MINOR: Streams integration tests should not call exit
mjsax commented on a change in pull request #9067: URL: https://github.com/apache/kafka/pull/9067#discussion_r460282952 ## File path: checkstyle/checkstyle.xml ## @@ -103,6 +103,13 @@ + + + + + Review comment: Nice one! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException
mjsax opened a new pull request #9075: URL: https://github.com/apache/kafka/pull/9075 Fix for `2.6` blocker bug. Call for review @guozhangwang @vvcephei (\cc @rhauch) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException
mjsax commented on a change in pull request #9075: URL: https://github.com/apache/kafka/pull/9075#discussion_r460300664 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -291,27 +290,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, long restoreCount = 0L; while (offset < highWatermark) { -try { -final ConsumerRecords records = globalConsumer.poll(pollTime); -final List> restoreRecords = new ArrayList<>(); -for (final ConsumerRecord record : records.records(topicPartition)) { -if (record.key() != null) { - restoreRecords.add(recordConverter.convert(record)); -} +final ConsumerRecords records = globalConsumer.poll(pollTime); +final List> restoreRecords = new ArrayList<>(); +for (final ConsumerRecord record : records.records(topicPartition)) { +if (record.key() != null) { +restoreRecords.add(recordConverter.convert(record)); } -offset = globalConsumer.position(topicPartition); -stateRestoreAdapter.restoreBatch(restoreRecords); -stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size()); -restoreCount += restoreRecords.size(); -} catch (final InvalidOffsetException recoverableException) { Review comment: This is the actual bug: we swallow the exception. However, because we don't do any "seek", we just hit the same exception in `poll()` over and over and never recover but loop forever. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9067: MINOR: Streams integration tests should not call exit
vvcephei commented on pull request #9067: URL: https://github.com/apache/kafka/pull/9067#issuecomment-663746926 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException
mjsax commented on a change in pull request #9075: URL: https://github.com/apache/kafka/pull/9075#discussion_r460302228 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java ## @@ -31,7 +31,7 @@ void flushState(); -void close() throws IOException; +void close(final boolean wipeStateStore) throws IOException; Review comment: We could also not wipe out the store and let the user do it manually. A manual cleanup is actually required nowadays, thus, it's actually a small side "improvement". Note that we wipe out the whole global task dir here -- in contrast, users could do a manual per-store wipe out... But as we do the same coarse grained wipe out for all tasks and we have already a ticket for "per store cleanup" I though it would be ok for now. Let me know what you think. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException
mjsax commented on a change in pull request #9075: URL: https://github.com/apache/kafka/pull/9075#discussion_r460302576 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ## @@ -234,24 +234,18 @@ void initialize() { } void pollAndUpdate() { -try { -final ConsumerRecords received = globalConsumer.poll(pollTime); -for (final ConsumerRecord record : received) { -stateMaintainer.update(record); -} -final long now = time.milliseconds(); -if (now >= lastFlush + flushInterval) { -stateMaintainer.flushState(); -lastFlush = now; -} -} catch (final InvalidOffsetException recoverableException) { Review comment: We just let the original exception bubble up, to be able to wipe out the store. -- This is also just a side "improvement"; we could also just die and let users cleanup the state directory manually. However, it seems better to wipe it out directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException
mjsax commented on a change in pull request #9075: URL: https://github.com/apache/kafka/pull/9075#discussion_r460303342 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -323,21 +322,6 @@ public void shouldRestoreRecordsUpToHighwatermark() { assertEquals(2, stateRestoreCallback.restored.size()); } -@Test -public void shouldRecoverFromInvalidOffsetExceptionAndRestoreRecords() { Review comment: This test was broken: it throw the `InvalidOffsetException` only once and thus the second `poll()` succeeds -- however, this is not how a real consumer behalves. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException
mjsax commented on a change in pull request #9075: URL: https://github.com/apache/kafka/pull/9075#discussion_r460303491 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java ## @@ -128,10 +130,9 @@ public void shouldThrowStreamsExceptionOnStartupIfThereIsAStreamsException() { assertFalse(globalStreamThread.stillRunning()); } -@SuppressWarnings("unchecked") Review comment: Just a little side cleanup :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
vvcephei commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460287991 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(globalConsumer.position(topicPartition)), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); +offset = position.get(); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; +long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords records = globalConsumer.poll(pollTime); +if (records.isEmpty()) { +if (taskTimeoutMs == 0L) { +deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: Man, this is confusing. I think I see what you're getting at, but it seems pretty strange to have to go to all that trouble to extract the consumer config so that we can apply a shorter timeout on each poll, but then loop around until the originally configured client timeout passes. Can you explain how the outcome is different than just calling `globalConsumer.poll(requestTimeoutMs)`? It also seems strange to extract a consumer timeout configuration that specifically does not apply to `poll` and apply it to `poll`. This seems like it would violate users' expectations when they set that configuration value. Why wouldn't we instead apply the non-progress timeout (`taskTimeoutMs`), since it seems like that's exactly what it's for? I.e., it seems like `globalConsumer.poll(pollTime + taskTimeoutMs)` might be the most appropriate choice here? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -185,32 +200,16 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta log.info("Restoring state for global store {}", store.name()); final List topicPartitions = topicPartitionsForStore(store); -Map highWatermarks = null; -int attempts = 0; -while (highWatermarks == null) { -try { -highWatermarks = globalConsumer.endOffsets(topicPartitions); -} catch (final TimeoutException retryableException) { -if (++attempts > retries) { -log.error("Failed to get end offsets for topic partitions of global store {} after {} retry attempts. " + -"You can increase the number of retries via configuration parameter `retries`.", -store.name(), -retries, -retryableException); -throw new StreamsException(String.format("Failed to get end offsets for topic partitions of global store %s after %d retry attempts. " + -"You can increase the number of retries via configuration parameter `retries`.", store.name(), retries), -retryableException); -} -log.debug("Failed to get end offsets for partitions {}, backing off for {} ms to retry (attempt {} of {})", -topicPartitions, -retryBackoffMs, -attempts, -retries, -retryableException); -Utils.sleep(retryBackoffMs); -} -} +final Map highWatermarks = new HashMap<>(); +re
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460304324 ## File path: streams/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java ## @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.internals; - -import org.apache.kafka.streams.StreamsConfig; - -import java.util.Map; - -/** - * A {@link StreamsConfig} that does not log its configuration on construction. - * - * This producer cleaner output for unit tests using the {@code test-utils}, - * since logging the config is not really valuable in this context. - */ -public class QuietStreamsConfig extends StreamsConfig { Review comment: Well, `KafkaStreams` is the "streams client" IMHO :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460305033 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -185,32 +200,16 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta log.info("Restoring state for global store {}", store.name()); final List topicPartitions = topicPartitionsForStore(store); -Map highWatermarks = null; -int attempts = 0; -while (highWatermarks == null) { -try { -highWatermarks = globalConsumer.endOffsets(topicPartitions); -} catch (final TimeoutException retryableException) { -if (++attempts > retries) { -log.error("Failed to get end offsets for topic partitions of global store {} after {} retry attempts. " + -"You can increase the number of retries via configuration parameter `retries`.", -store.name(), -retries, -retryableException); -throw new StreamsException(String.format("Failed to get end offsets for topic partitions of global store %s after %d retry attempts. " + -"You can increase the number of retries via configuration parameter `retries`.", store.name(), retries), -retryableException); -} -log.debug("Failed to get end offsets for partitions {}, backing off for {} ms to retry (attempt {} of {})", -topicPartitions, -retryBackoffMs, -attempts, -retries, -retryableException); -Utils.sleep(retryBackoffMs); -} -} +final Map highWatermarks = new HashMap<>(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> highWatermarks.putAll(globalConsumer.endOffsets(topicPartitions)), Review comment: Correct, we rely on the clients internal backoff behavior here. Happy to switch to a `Supplier` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460305945 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); Review comment: Well, I guess we want to restore partition by partition because the "restore listener" callback is easier to handle this way? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460307083 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(globalConsumer.position(topicPartition)), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); +offset = position.get(); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; +long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords records = globalConsumer.poll(pollTime); +if (records.isEmpty()) { +if (taskTimeoutMs == 0L) { +deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: > Can you explain how the outcome is different than just calling globalConsumer.poll(requestTimeoutMs)? I did consider it, but was not 100% sure if this might be better. Was also concerned about "miss using" request timeout (as you already mentioned) as we have `StreamsConfig.POLL_TIMEOUT_MS`. I like the proposal to use `pollTime + taskTimeoutMs` though! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log
ijuma commented on pull request #9054: URL: https://github.com/apache/kafka/pull/9054#issuecomment-663753175 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460308601 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, restoreRecords.add(recordConverter.convert(record)); } } -offset = globalConsumer.position(topicPartition); +try { +offset = globalConsumer.position(topicPartition); +} catch (final TimeoutException error) { +// the `globalConsumer.position()` call should never block, because we know that we did +// a successful `position()` call above for the requested partition and thus the consumer +// should have a valid local position that it can return immediately Review comment: What John says: We need to track the restore progress to eventually break the outer while loop. We don't use `offset` to `seek()`, so it's not about updating the consumer's position but knowing when we are done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-9731) Increased fetch request rate with leader selector due to HW propagation
[ https://issues.apache.org/jira/browse/KAFKA-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163061#comment-17163061 ] Shane edited comment on KAFKA-9731 at 7/24/20, 10:07 PM: - Hey Kafka team, What are the odds of back-porting this to 2.5. I'm seeing the same issues in 2.5. I can add more details if you want to see, but we're seeing the same increase in fetch requests per second. Thanks! was (Author: shanesaww): Hey Apache team, What are the odds of back-porting this to 2.5. I'm seeing the same issues in 2.5. I can add more details if you want to see, but we're seeing the same increase in fetch requests per second. Thanks! > Increased fetch request rate with leader selector due to HW propagation > --- > > Key: KAFKA-9731 > URL: https://issues.apache.org/jira/browse/KAFKA-9731 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.4.0, 2.4.1 >Reporter: Vahid Hashemian >Assignee: Ismael Juma >Priority: Major > Fix For: 2.6.0 > > Attachments: image-2020-03-17-10-19-08-987.png > > > KIP-392 adds high watermark propagation to followers as a means to better > sync up followers HW with leader. The issue we have noticed after trying out > 2.4.0 and 2.4.1 is a spike in fetch request rate in the default selector case > (leader), that does not really require this high watermark propagation: > !image-2020-03-17-10-19-08-987.png|width=811,height=354! > This spike causes an increase in resource allocation (CPU) on the brokers. > An easy solution would be to disable this propagation (at least) for the > default leader selector case to improve the backward compatibility. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460308959 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -318,6 +341,72 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, } } +private void retryUntilSuccessOrThrowOnTaskTimeout(final Runnable runnable, + final String errorMessage) { +long deadlineMs = NO_DEADLINE; + +do { +try { +runnable.run(); +return; +} catch (final TimeoutException retryableException) { Review comment: Not sure if I can follow? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460308797 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(globalConsumer.position(topicPartition)), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); +offset = position.get(); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; +long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords records = globalConsumer.poll(pollTime); +if (records.isEmpty()) { +if (taskTimeoutMs == 0L) { +deadlineMs = maybeUpdateDeadlineOrThrow( +deadlineMs, +requestTimeoutMs, +new StreamsException(String.format( +"Global task did not make progress to restore state. Retrying is disabled. You can enable it by setting `%s` to a value larger than zero.", +StreamsConfig.TASK_TIMEOUT_MS_CONFIG +)) +); +} else { +deadlineMs = maybeUpdateDeadlineOrThrow(deadlineMs); +} + +continue; +} +deadlineMs = NO_DEADLINE; + final List> restoreRecords = new ArrayList<>(); for (final ConsumerRecord record : records.records(topicPartition)) { if (record.key() != null) { restoreRecords.add(recordConverter.convert(record)); } } -offset = globalConsumer.position(topicPartition); +try { +offset = globalConsumer.position(topicPartition); +} catch (final TimeoutException error) { +// the `globalConsumer.position()` call should never block, because we know that we did +// a successful `position()` call above for the requested partition and thus the consumer +// should have a valid local position that it can return immediately + +// hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException` +throw new IllegalStateException(error); Review comment: As this would expose a bug, is does not seem useful to users -- and for us, we have the comment in the code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException
vvcephei commented on a change in pull request #9075: URL: https://github.com/apache/kafka/pull/9075#discussion_r460307479 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java ## @@ -114,8 +118,16 @@ public void flushState() { stateMgr.checkpoint(offsets); } -public void close() throws IOException { +public void close(final boolean wipeStateStore) throws IOException { stateMgr.close(); +if (wipeStateStore) { +try { +log.error("Wiping state stores for global task."); +Utils.delete(stateMgr.baseDir()); +} catch (final IOException e) { +log.error("Failed to wiping state stores for global task.", e); Review comment: ```suggestion log.error("Failed to delete global task directory after detecting corruption.", e); ``` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java ## @@ -114,8 +118,16 @@ public void flushState() { stateMgr.checkpoint(offsets); } -public void close() throws IOException { +public void close(final boolean wipeStateStore) throws IOException { stateMgr.close(); +if (wipeStateStore) { +try { +log.error("Wiping state stores for global task."); Review comment: This doesn't seem to be an error. Maybe info would be better? Also, I think "wipe state stores" might be confusing for a user looking at the log messages with no context. "Deleting the task directory" seems to be a more context-free statement of what we're doing. ```suggestion log.info("Deleting global task directory after detecting corruption."); ``` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateMaintainer.java ## @@ -31,7 +31,7 @@ void flushState(); -void close() throws IOException; +void close(final boolean wipeStateStore) throws IOException; Review comment: Thanks @mjsax , this sounds perfect to me. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ## @@ -284,10 +278,21 @@ public void run() { } setState(State.RUNNING); +boolean wipeStateStore = false; try { while (stillRunning()) { stateConsumer.pollAndUpdate(); } +} catch (final InvalidOffsetException recoverableException) { +wipeStateStore = true; +log.error( +"Updating global state failed. You can restart KafkaStreams to recover from this error.", Review comment: ```suggestion "Updating global state failed due to inconsistent local state. Will attempt to clean up the local state. You can restart KafkaStreams to recover from this error.", ``` Just a thought to indicate why just restarting would recover anything. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460309750 ## File path: streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java ## @@ -78,6 +78,7 @@ public void init(final ProcessorContext context, final StateStore root) { new File(context.stateDir() + File.separator + name).mkdir(); } this.initialized = true; +context.register(root, (k, v) -> { }); Review comment: That is what the real code does, too. Otherwise we need to call `register` manually in the tests what is annoying. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException
vvcephei commented on pull request #9075: URL: https://github.com/apache/kafka/pull/9075#issuecomment-663754855 Otherwise, it LGTM. Feel free to merge whether or not you like my suggestions ;) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException
mjsax commented on a change in pull request #9075: URL: https://github.com/apache/kafka/pull/9075#discussion_r460315914 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ## @@ -331,17 +336,36 @@ private StateConsumer initialize() { logContext, globalConsumer, new GlobalStateUpdateTask( +logContext, topology, globalProcessorContext, stateMgr, -config.defaultDeserializationExceptionHandler(), -logContext +config.defaultDeserializationExceptionHandler() ), time, Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) ); -stateConsumer.initialize(); + +try { +stateConsumer.initialize(); +} catch (final InvalidOffsetException recoverableException) { +log.error( +"Bootstrapping global state failed. You can restart KafkaStreams to recover from this error.", Review comment: ```suggestion "Bootstrapping global state failed due to inconsistent local state. Will attempt to clean up the local state. You can restart KafkaStreams to recover from this error.", ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dhruvilshah3 commented on a change in pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log
dhruvilshah3 commented on a change in pull request #9054: URL: https://github.com/apache/kafka/pull/9054#discussion_r460317793 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -932,6 +927,7 @@ class LogManager(logDirs: Seq[File], val logsToCheckpoint = logsInDir(logDir) checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, logsToCheckpoint, ArrayBuffer.empty) checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint) +sourceLog.removeLogMetrics() Review comment: This seems a bit odd but looks like an existing issue. We are removing metrics for the `current` log but we leave the metrics for the `future` log as is, which means it will retain the `is-future` tag. Maybe worth filing a JIRA if you think it is a problem too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460319742 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(globalConsumer.position(topicPartition)), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); +offset = position.get(); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; +long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords records = globalConsumer.poll(pollTime); +if (records.isEmpty()) { +if (taskTimeoutMs == 0L) { +deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: Actually, we need to use all three: `pollTime + requestTimeout + taskTimeoutMs` If we don't include `requestTimeout` and `taskTimeoutMs` is set to `0` (or a smaller value than `requestTimeout` in general), the first `poll()` would return no data with very high probability and we would fail what seems not desirable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460308959 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -318,6 +341,72 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, } } +private void retryUntilSuccessOrThrowOnTaskTimeout(final Runnable runnable, + final String errorMessage) { +long deadlineMs = NO_DEADLINE; + +do { +try { +runnable.run(); +return; +} catch (final TimeoutException retryableException) { Review comment: Not sure if I can follow? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460320303 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(globalConsumer.position(topicPartition)), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); +offset = position.get(); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; +long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords records = globalConsumer.poll(pollTime); +if (records.isEmpty()) { +if (taskTimeoutMs == 0L) { +deadlineMs = maybeUpdateDeadlineOrThrow( +deadlineMs, +requestTimeoutMs, +new StreamsException(String.format( +"Global task did not make progress to restore state. Retrying is disabled. You can enable it by setting `%s` to a value larger than zero.", +StreamsConfig.TASK_TIMEOUT_MS_CONFIG +)) +); +} else { +deadlineMs = maybeUpdateDeadlineOrThrow(deadlineMs); +} + +continue; +} +deadlineMs = NO_DEADLINE; + final List> restoreRecords = new ArrayList<>(); for (final ConsumerRecord record : records.records(topicPartition)) { if (record.key() != null) { restoreRecords.add(recordConverter.convert(record)); } } -offset = globalConsumer.position(topicPartition); +try { +offset = globalConsumer.position(topicPartition); +} catch (final TimeoutException error) { +// the `globalConsumer.position()` call should never block, because we know that we did +// a successful `position()` call above for the requested partition and thus the consumer +// should have a valid local position that it can return immediately + +// hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException` +throw new IllegalStateException(error); Review comment: As requested by John, I'll drop this and just call `retryUntilSuccessOrThrowOnTaskTimeout` here, too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460320506 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, restoreRecords.add(recordConverter.convert(record)); } } -offset = globalConsumer.position(topicPartition); +try { +offset = globalConsumer.position(topicPartition); +} catch (final TimeoutException error) { +// the `globalConsumer.position()` call should never block, because we know that we did +// a successful `position()` call above for the requested partition and thus the consumer +// should have a valid local position that it can return immediately Review comment: I guess with the new `retryUntilSuccessOrThrowOnTaskTimeout` the code is still clean is we just use it here. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, restoreRecords.add(recordConverter.convert(record)); } } -offset = globalConsumer.position(topicPartition); +try { +offset = globalConsumer.position(topicPartition); +} catch (final TimeoutException error) { +// the `globalConsumer.position()` call should never block, because we know that we did +// a successful `position()` call above for the requested partition and thus the consumer +// should have a valid local position that it can return immediately Review comment: I guess with the new `retryUntilSuccessOrThrowOnTaskTimeout` the code is still clean is we just use it here, too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9067: MINOR: Streams integration tests should not call exit
lbradstreet commented on a change in pull request #9067: URL: https://github.com/apache/kafka/pull/9067#discussion_r460322318 ## File path: checkstyle/checkstyle.xml ## @@ -103,6 +103,13 @@ + + + + + Review comment: Thank you for this! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on pull request #9047: URL: https://github.com/apache/kafka/pull/9047#issuecomment-663766278 Thanks for the review @abbccdda @vvcephei -- updated the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460324591 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(globalConsumer.position(topicPartition)), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); +offset = position.get(); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; +long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords records = globalConsumer.poll(pollTime); +if (records.isEmpty()) { +if (taskTimeoutMs == 0L) { +deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: Btw: just for completeness: we will still use `pollTime` unmodified in `globalConsumer.poll(...)` during regular processing, so the config has still value. Only in the startup bootstrapping phase, we would use the sum of poll/request/task-timeout to not fail unnecessarily. Also note that during normal processing, we don't apply `taskTimeoutMs` atm. Not sure if we should, but I would not want to include it in this PR. -- It won't block the "main" processing if the global `poll()` loop would not make progress because the `StreamThreads` are already running, and thus it's impact is different -- also, during normal processing the global `highWatermark` may move and thus we need a different strategy anyway compared to the bootstrap phase. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #9061: MINOR: removed incorrect deprecation annotations
mjsax merged pull request #9061: URL: https://github.com/apache/kafka/pull/9061 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
vvcephei commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460338414 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(globalConsumer.position(topicPartition)), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); +offset = position.get(); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; +long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords records = globalConsumer.poll(pollTime); +if (records.isEmpty()) { +if (taskTimeoutMs == 0L) { +deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: Ah, that last point sounds good. I'm not opposed, but it still seems strange to grab a random timeout from the client config and apply it to the `poll` method. It kind of makes me wonder what we're really trying to do here. If we want to give poll at least 30 seconds to return data, then we can just give it at least 30 seconds, right? No reason to abuse the client configuration. On the other hand, the pollTime may not even appropriate for the global thread, right? It seems more like it was meant for StreamThread to make sure we don't block too long and violate the max poll interval. But since the global thread is only assigned and not subscribed, it has no constraint on how long it should block, except for the taskTimeoutMs config, right? Clearly, we need to set some kind of lower bound on it, though, but it's not clear that it needs to be configurable. Anyway, just food for thought. Feel free to keep the requestTimeout if you really think it's appropriate. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
vvcephei commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460338747 ## File path: streams/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java ## @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.internals; - -import org.apache.kafka.streams.StreamsConfig; - -import java.util.Map; - -/** - * A {@link StreamsConfig} that does not log its configuration on construction. - * - * This producer cleaner output for unit tests using the {@code test-utils}, - * since logging the config is not really valuable in this context. - */ -public class QuietStreamsConfig extends StreamsConfig { Review comment: Haha, that's kind of a stretch, but it's not a big deal :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org