[jira] [Created] (KAFKA-10252) MeteredTimestampedKeyValueStore not setting up correct topic name for GlobalKTable associating StateStores
Gordon Wang created KAFKA-10252: --- Summary: MeteredTimestampedKeyValueStore not setting up correct topic name for GlobalKTable associating StateStores Key: KAFKA-10252 URL: https://issues.apache.org/jira/browse/KAFKA-10252 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.4.1 Reporter: Gordon Wang When creating a GlobalKTable for getting a ReadOnlyKeyValueStore likw below: {code} GlobalKTable globalTable = streamsBuilder.globalTable(topic, Consumed.with(keySerde, valueSerde), Materialized.as(Stores.inMemoryKeyValueStore(topic))); {code} I got StreamsException like below: {code} org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for topic applicationId-sourceTopicName-changelog Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401 {code} But as seen in GlobalKTable Java Doc the changelog stream shall not be created and in fact was not created.This leads to our custom serde to be searching for schema (we are using Confluent Platform and Avro based schema registry for the job) using a wrong topic name (should just be sourceTopicName rather than applicationId-sourceTopicName-changelog). After digging into the code, I found *initStoreSerde* method in *MeteredTimestampedKeyValueStore* would assume the topic backing the store would always be storeChangelogTopic when initializing the Serdes for the state store, I think for GlobalKTables (ones having a GlobalProcessorContextImpl ProcessorContext) we shall use the original topic name directly here. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac opened a new pull request #8998: MINOR; KafkaAdminClient#describeLogDirs should not fail all the futures when only one call fails
dajac opened a new pull request #8998: URL: https://github.com/apache/kafka/pull/8998 Following https://github.com/apache/kafka/pull/8985, I have found another case which incorrectly realised all futures when a failure occurs. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #8998: MINOR; KafkaAdminClient#describeLogDirs should not fail all the futures when only one call fails
dajac commented on pull request #8998: URL: https://github.com/apache/kafka/pull/8998#issuecomment-655961172 cc @cmccabe @ijuma This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10252) MeteredTimestampedKeyValueStore not setting up correct topic name for GlobalKTable associating StateStores
[ https://issues.apache.org/jira/browse/KAFKA-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154293#comment-17154293 ] Gordon Wang commented on KAFKA-10252: - Updated patch for fix. > MeteredTimestampedKeyValueStore not setting up correct topic name for > GlobalKTable associating StateStores > -- > > Key: KAFKA-10252 > URL: https://issues.apache.org/jira/browse/KAFKA-10252 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.1 >Reporter: Gordon Wang >Priority: Major > Attachments: > 0001-KAFKA-10252-Use-source-topic-name-for-MeteredTimesta.patch > > > When creating a GlobalKTable for getting a ReadOnlyKeyValueStore likw below: > {code} > GlobalKTable globalTable = > streamsBuilder.globalTable(topic, > Consumed.with(keySerde, valueSerde), > Materialized.as(Stores.inMemoryKeyValueStore(topic))); > {code} > I got StreamsException like below: > {code} > org.apache.kafka.common.errors.SerializationException: Error retrieving Avro > schema for topic applicationId-sourceTopicName-changelog > Caused by: > io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: > Subject not found.; error code: 40401 > {code} > But as seen in GlobalKTable Java Doc the changelog stream shall not be > created and in fact was not created.This leads to our custom serde to be > searching for schema (we are using Confluent Platform and Avro based schema > registry for the job) using a wrong topic name (should just be > sourceTopicName rather than applicationId-sourceTopicName-changelog). > After digging into the code, I found *initStoreSerde* method in > *MeteredTimestampedKeyValueStore* would assume the topic backing the store > would always be storeChangelogTopic when initializing the Serdes for the > state store, I think for GlobalKTables (ones having a > GlobalProcessorContextImpl ProcessorContext) we shall use the original topic > name directly here. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac opened a new pull request #8999: MINOR; Return timed out connections as a List instead of a Set
dajac opened a new pull request #8999: URL: https://github.com/apache/kafka/pull/8999 As pointed out by @ijuma in https://github.com/apache/kafka/pull/8990#discussion_r451059876, using a `Set` is not necessary as the caller only cares about having the list of timed out connections/nodes. It does nothing else but iterating over the list. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set
rajinisivaram commented on a change in pull request #8999: URL: https://github.com/apache/kafka/pull/8999#discussion_r452040424 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long now) { } /** - * Return the Set of nodes whose connection setup has timed out. + * Return the List of nodes whose connection setup has timed out. * @param now the current time in ms */ -public Set nodesWithConnectionSetupTimeout(long now) { +public List nodesWithConnectionSetupTimeout(long now) { return connectingNodes.stream() .filter(id -> isConnectionSetupTimeout(id, now)) -.collect(Collectors.toSet()); +.collect(Collectors.toCollection(LinkedList::new)); Review comment: @dajac Won't this create a list regardless of whether there are timed out entries? Let's see whether this addresses @ijuma 's concern on the other PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set
dajac commented on a change in pull request #8999: URL: https://github.com/apache/kafka/pull/8999#discussion_r452052951 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long now) { } /** - * Return the Set of nodes whose connection setup has timed out. + * Return the List of nodes whose connection setup has timed out. * @param now the current time in ms */ -public Set nodesWithConnectionSetupTimeout(long now) { +public List nodesWithConnectionSetupTimeout(long now) { return connectingNodes.stream() .filter(id -> isConnectionSetupTimeout(id, now)) -.collect(Collectors.toSet()); +.collect(Collectors.toCollection(LinkedList::new)); Review comment: @rajinisivaram Yes, it does as an empty list is returned if there are no timed out entries. This was true with the Set as well btw. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #8966: KAFKA-10220: add null check for configurationKey
omkreddy commented on pull request #8966: URL: https://github.com/apache/kafka/pull/8966#issuecomment-656000189 @tombentley Thanks for the explanation. I will go ahead and merge the PR, This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy closed pull request #8966: KAFKA-10220: add null check for configurationKey
omkreddy closed pull request #8966: URL: https://github.com/apache/kafka/pull/8966 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #8966: KAFKA-10220: add null check for configurationKey
showuon commented on pull request #8966: URL: https://github.com/apache/kafka/pull/8966#issuecomment-656008310 @omkreddy , sorry, I saw the PR is closed, instead of merging. Could you please check again. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10253) Kafka Connect gets into a rebalance loop
Konstantin Lalafaryan created KAFKA-10253: - Summary: Kafka Connect gets into a rebalance loop Key: KAFKA-10253 URL: https://issues.apache.org/jira/browse/KAFKA-10253 Project: Kafka Issue Type: Bug Affects Versions: 2.5.0 Reporter: Konstantin Lalafaryan Hello everyone! We are running kafka-connect cluster (3 workers) and very often it gets into a rebalance loop. {code:java} // code placeholder {code} 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,733 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655831 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655831 with protocol version 2 and got assignment: Assignment\{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,736 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655832 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655832 with protocol version 2 and got assignment: Assignment\{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,740 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655833 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655833 with protocol version 2 and got assignment: Assignment\{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Work
[jira] [Updated] (KAFKA-10253) Kafka Connect gets into a rebalance loop
[ https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantin Lalafaryan updated KAFKA-10253: -- Description: Hello everyone! We are running kafka-connect cluster (3 workers) and very often it gets into a rebalance loop. {code:java} 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,733 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655831 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655831 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,736 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655832 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655832 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,740 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655833 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655833 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:2
[jira] [Updated] (KAFKA-10253) Kafka Connect gets into an infinite rebalance loop
[ https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantin Lalafaryan updated KAFKA-10253: -- Description: Hello everyone! We are running kafka-connect cluster (3 workers) and very often it gets into an infinite rebalance loop. {code:java} 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,733 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655831 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655831 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,736 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655832 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655832 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,740 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655833 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655833 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-
[jira] [Updated] (KAFKA-10253) Kafka Connect gets into a rebalance loop
[ https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantin Lalafaryan updated KAFKA-10253: -- Description: Hello everyone! We are running kafka-connect cluster (3 workers) and very often it gets into an infinite rebalance loop. {code:java} 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,733 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655831 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655831 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,736 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655832 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655832 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,740 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655833 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655833 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-
[GitHub] [kafka] omkreddy edited a comment on pull request #8966: KAFKA-10220: add null check for configurationKey
omkreddy edited a comment on pull request #8966: URL: https://github.com/apache/kafka/pull/8966#issuecomment-656014523 @showuon I normally use `kafka-merge-pr.py` script for merging PRs. This will do a direct push from local. In this case, github marks it as closed. You can check the commit on trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #8966: KAFKA-10220: add null check for configurationKey
omkreddy commented on pull request #8966: URL: https://github.com/apache/kafka/pull/8966#issuecomment-656014523 @showuon I normally use `kafka-merge-pr.py` script for merging PRs. This will do a direct push from local. In this case, github marks it as closed. You can check the commit of trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10253) Kafka Connect gets into an infinite rebalance loop
[ https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantin Lalafaryan updated KAFKA-10253: -- Description: Hello everyone! We are running kafka-connect cluster (3 workers) and very often it gets into an infinite rebalance loop. {code:java} 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,733 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655831 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655831 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,736 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655832 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655832 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,740 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655833 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655833 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09
[jira] [Updated] (KAFKA-10253) Kafka Connect gets into an infinite rebalance loop
[ https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantin Lalafaryan updated KAFKA-10253: -- Summary: Kafka Connect gets into an infinite rebalance loop (was: Kafka Connect gets into a rebalance loop) > Kafka Connect gets into an infinite rebalance loop > -- > > Key: KAFKA-10253 > URL: https://issues.apache.org/jira/browse/KAFKA-10253 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0 >Reporter: Konstantin Lalafaryan >Priority: Blocker > > Hello everyone! > > We are running kafka-connect cluster (3 workers) and very often it gets into > an infinite rebalance loop. > > {code:java} > 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) > [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,731 INFO [Worker > clientId=connect-1, groupId= kafka-connect] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,733 INFO [Worker > clientId=connect-1, groupId= kafka-connect] Was selected to perform > assignments, but do not have latest config found in sync request. Returning > an empty configuration to trigger re-sync. > (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) > [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker > clientId=connect-1, groupId= kafka-connect] Successfully joined group with > generation 305655831 > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker > clientId=connect-1, groupId= kafka-connect] Joined group at generation > 305655831 with protocol version 2 and got assignment: Assignment{error=1, > leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', > leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], > taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker > clientId=connect-1, groupId= kafka-connect] Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) > [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker > clientId=connect-1, groupId= kafka-connect] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,736 INFO [Worker > clientId=connect-1, groupId= kafka-connect] Was selected to perform > assignments, but do not have latest config found in sync request. Returning > an empty configuration to trigger re-sync. > (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) > [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker > clientId=connect-1, groupId= kafka-connect] Successfully joined group with > generation 305655832 > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker > clientId=connect-1, groupId= kafka-connect] Joined group at generation > 305655832 with protocol version 2 and got assignment: Assignment{error=1, > leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', > leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], > taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker > clientId=connect-1, groupId= kafka-connect] Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) > [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker > clientId=connect-1, groupId= kafka-connect] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,740 INFO [Worker > clientId=connect-1, groupId= kafka-connect] Was selected to perform > assignments, but do not have latest config found in sync request. Returning > an empty configuration to trigger re-sync. > (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) > [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker > clientId=connect-1, groupId= kafka-connect] Successfully joined group with > generation 305655833 > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker > clientId=connect-1, groupId= kafka-connect] Joined gro
[jira] [Updated] (KAFKA-10253) Kafka Connect gets into an infinite rebalance loop
[ https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantin Lalafaryan updated KAFKA-10253: -- Component/s: KafkaConnect > Kafka Connect gets into an infinite rebalance loop > -- > > Key: KAFKA-10253 > URL: https://issues.apache.org/jira/browse/KAFKA-10253 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Konstantin Lalafaryan >Priority: Blocker > > Hello everyone! > > We are running kafka-connect cluster (3 workers) and very often it gets into > an infinite rebalance loop. > > {code:java} > 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= > kafka-connect] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,733 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Was selected to perform assignments, but do not have latest > config found in sync request. Returning an empty configuration to trigger > re-sync. > (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Successfully joined group with generation 305655831 > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Joined group at generation 305655831 with protocol version 2 > and got assignment: Assignment{error=1, > leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', > leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], > taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= > kafka-connect] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,736 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Was selected to perform assignments, but do not have latest > config found in sync request. Returning an empty configuration to trigger > re-sync. > (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Successfully joined group with generation 305655832 > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Joined group at generation 305655832 with protocol version 2 > and got assignment: Assignment{error=1, > leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', > leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], > taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= > kafka-connect] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,740 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Was selected to perform assignments, but do not have latest > config found in sync request. Returning an empty configuration to trigger > re-sync. > (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Successfully joined group with generation 305655833 > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Joined group at generation 305655833 wi
[GitHub] [kafka] showuon commented on pull request #8966: KAFKA-10220: add null check for configurationKey
showuon commented on pull request #8966: URL: https://github.com/apache/kafka/pull/8966#issuecomment-656019261 Cool! Thanks, @omkreddy ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy closed pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand
omkreddy closed pull request #8808: URL: https://github.com/apache/kafka/pull/8808 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10109) kafka-acls.sh/AclCommand opens multiple AdminClients
[ https://issues.apache.org/jira/browse/KAFKA-10109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-10109. --- Fix Version/s: 2.7.0 Resolution: Fixed Issue resolved by pull request 8808 [https://github.com/apache/kafka/pull/8808] > kafka-acls.sh/AclCommand opens multiple AdminClients > > > Key: KAFKA-10109 > URL: https://issues.apache.org/jira/browse/KAFKA-10109 > Project: Kafka > Issue Type: Bug > Components: tools >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Minor > Fix For: 2.7.0 > > > {{AclCommand.AclCommandService}} uses {{withAdminClient(opts: > AclCommandOptions)(f: Admin => Unit)}} to abstract the execution of an action > using an {{AdminClient}} instance. Unfortunately the use of this method in > implemeting {{addAcls()}} and {{removeAcls()}} calls {{listAcls()}}. This > causes the creation of a second {{AdminClient}} instance. When the > {{--command-config}} option has been used to specify a {{client.id}} for the > Admin client, the second instance fails to register an MBean, resulting in a > warning being logged. > {code} > ./bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config > config/broker_connection.conf.reproducing --add --allow-principal User:alice > --operation Describe --topic 'test' --resource-pattern-type prefixed > Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, > patternType=PREFIXED)`: > (principal=User:alice, host=*, operation=DESCRIBE, > permissionType=ALLOW) > [2020-06-03 18:43:12,190] WARN Error registering AppInfo mbean > (org.apache.kafka.common.utils.AppInfoParser) > javax.management.InstanceAlreadyExistsException: > kafka.admin.client:type=app-info,id=administrator_data > at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) > at > org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) > at > org.apache.kafka.clients.admin.KafkaAdminClient.(KafkaAdminClient.java:500) > at > org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:444) > at org.apache.kafka.clients.admin.Admin.create(Admin.java:59) > at > org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39) > at > kafka.admin.AclCommand$AdminClientService.withAdminClient(AclCommand.scala:105) > at > kafka.admin.AclCommand$AdminClientService.listAcls(AclCommand.scala:146) > at > kafka.admin.AclCommand$AdminClientService.$anonfun$addAcls$1(AclCommand.scala:123) > at > kafka.admin.AclCommand$AdminClientService.$anonfun$addAcls$1$adapted(AclCommand.scala:116) > at > kafka.admin.AclCommand$AdminClientService.withAdminClient(AclCommand.scala:108) > at > kafka.admin.AclCommand$AdminClientService.addAcls(AclCommand.scala:116) > at kafka.admin.AclCommand$.main(AclCommand.scala:78) > at kafka.admin.AclCommand.main(AclCommand.scala) > Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, > patternType=PREFIXED)`: > (principal=User:alice, host=*, operation=DESCRIBE, permissionType=ALLOW) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] omkreddy commented on a change in pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest
omkreddy commented on a change in pull request #8992: URL: https://github.com/apache/kafka/pull/8992#discussion_r452092180 ## File path: core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala ## @@ -74,48 +75,58 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with @Test def testProducerWithAuthenticationFailure(): Unit = { val producer = createProducer() -verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 1)) -verifyAuthenticationException(producer.partitionsFor(topic)) +try { + verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 1)) + verifyAuthenticationException(producer.partitionsFor(topic)) -createClientCredential() -verifyWithRetry(sendOneRecord(producer)) + createClientCredential() + verifyWithRetry(sendOneRecord(producer)) +} finally producer.close() Review comment: @akatona84 Thanks for the PR. Normally producer/consumer instances created in the test are closed in [IntegrationTestHarness.tearDown ](https://github.com/confluentinc/ce-kafka/blob/master/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala#L152). many core tests extends `IntegrationTestHarness` class. Do we know, which tests were failed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10252) MeteredTimestampedKeyValueStore not setting up correct topic name for GlobalKTable associating StateStores
[ https://issues.apache.org/jira/browse/KAFKA-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154406#comment-17154406 ] Bruno Cadonna commented on KAFKA-10252: --- Thank you for the bug report, [~bulbfreeman]. I think this is a duplicate of https://issues.apache.org/jira/browse/KAFKA-10179. There is also an open PR for fixing the bug linked in that ticket. > MeteredTimestampedKeyValueStore not setting up correct topic name for > GlobalKTable associating StateStores > -- > > Key: KAFKA-10252 > URL: https://issues.apache.org/jira/browse/KAFKA-10252 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.1 >Reporter: Gordon Wang >Priority: Major > Attachments: > 0001-KAFKA-10252-Use-source-topic-name-for-MeteredTimesta.patch > > > When creating a GlobalKTable for getting a ReadOnlyKeyValueStore likw below: > {code} > GlobalKTable globalTable = > streamsBuilder.globalTable(topic, > Consumed.with(keySerde, valueSerde), > Materialized.as(Stores.inMemoryKeyValueStore(topic))); > {code} > I got StreamsException like below: > {code} > org.apache.kafka.common.errors.SerializationException: Error retrieving Avro > schema for topic applicationId-sourceTopicName-changelog > Caused by: > io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: > Subject not found.; error code: 40401 > {code} > But as seen in GlobalKTable Java Doc the changelog stream shall not be > created and in fact was not created.This leads to our custom serde to be > searching for schema (we are using Confluent Platform and Avro based schema > registry for the job) using a wrong topic name (should just be > sourceTopicName rather than applicationId-sourceTopicName-changelog). > After digging into the code, I found *initStoreSerde* method in > *MeteredTimestampedKeyValueStore* would assume the topic backing the store > would always be storeChangelogTopic when initializing the Serdes for the > state store, I think for GlobalKTables (ones having a > GlobalProcessorContextImpl ProcessorContext) we shall use the original topic > name directly here. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] soarez opened a new pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers
soarez opened a new pull request #9000: URL: https://github.com/apache/kafka/pull/9000 Following up on #8752 which seems to have gone stale. @mjsax can you continue the review? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akatona84 commented on a change in pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest
akatona84 commented on a change in pull request #8992: URL: https://github.com/apache/kafka/pull/8992#discussion_r452108786 ## File path: core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala ## @@ -74,48 +75,58 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with @Test def testProducerWithAuthenticationFailure(): Unit = { val producer = createProducer() -verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 1)) -verifyAuthenticationException(producer.partitionsFor(topic)) +try { + verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 1)) + verifyAuthenticationException(producer.partitionsFor(topic)) -createClientCredential() -verifyWithRetry(sendOneRecord(producer)) + createClientCredential() + verifyWithRetry(sendOneRecord(producer)) +} finally producer.close() Review comment: Oh, right, it was suspicious that I did not see consumer thread on the failed .classMethod cases :) Thanks. testConsumerGroupServiceWithAuthenticationSuccess failed and left adminclient threads for later ones: ``` java.lang.AssertionError: Found unexpected threads during @AfterClass, allThreads=Set(Test worker, metrics-meter-tick-thread-2, main, Signal Dispatcher, Reference Handler, Finalizer, /0:0:0:0:0:0:0:1:55620 to /0:0:0:0:0:0:0:1:33045 workers Thread 2, kafka-admin-client-thread | adminclient-45, /0:0:0:0:0:0:0:1:55620 to /0:0:0:0:0:0:0:1:33045 workers Thread 3, metrics-meter-tick-thread-1), unexpected=Set(kafka-admin-client-thread | adminclient-45) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akatona84 commented on a change in pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest
akatona84 commented on a change in pull request #8992: URL: https://github.com/apache/kafka/pull/8992#discussion_r452108786 ## File path: core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala ## @@ -74,48 +75,58 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with @Test def testProducerWithAuthenticationFailure(): Unit = { val producer = createProducer() -verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 1)) -verifyAuthenticationException(producer.partitionsFor(topic)) +try { + verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 1)) + verifyAuthenticationException(producer.partitionsFor(topic)) -createClientCredential() -verifyWithRetry(sendOneRecord(producer)) + createClientCredential() + verifyWithRetry(sendOneRecord(producer)) +} finally producer.close() Review comment: Oh, right, it was suspicious that I did not see consumer thread on the failed .classMethod cases :) Thanks. testConsumerGroupServiceWithAuthenticationSuccess failed and left adminclient threads for later ones: > java.lang.AssertionError: Found unexpected threads during @AfterClass, allThreads=Set(Test worker, metrics-meter-tick-thread-2, main, Signal Dispatcher, Reference Handler, Finalizer, /0:0:0:0:0:0:0:1:55620 to /0:0:0:0:0:0:0:1:33045 workers Thread 2, kafka-admin-client-thread | adminclient-45, /0:0:0:0:0:0:0:1:55620 to /0:0:0:0:0:0:0:1:33045 workers Thread 3, metrics-meter-tick-thread-1), unexpected=Set(kafka-admin-client-thread | adminclient-45) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akatona84 commented on pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest
akatona84 commented on pull request #8992: URL: https://github.com/apache/kafka/pull/8992#issuecomment-656038173 Thanks, @omkreddy for explaining how the consumers and prods are closed, updated my PR so only closing things where it is necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akatona84 edited a comment on pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest
akatona84 edited a comment on pull request #8992: URL: https://github.com/apache/kafka/pull/8992#issuecomment-656038173 Thanks, @omkreddy for explaining how the consumers and prods are closed (I've missed that), updated my PR so only closing things where it is necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akatona84 commented on a change in pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest
akatona84 commented on a change in pull request #8992: URL: https://github.com/apache/kafka/pull/8992#discussion_r452108786 ## File path: core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala ## @@ -74,48 +75,58 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with @Test def testProducerWithAuthenticationFailure(): Unit = { val producer = createProducer() -verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 1)) -verifyAuthenticationException(producer.partitionsFor(topic)) +try { + verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 1)) + verifyAuthenticationException(producer.partitionsFor(topic)) -createClientCredential() -verifyWithRetry(sendOneRecord(producer)) + createClientCredential() + verifyWithRetry(sendOneRecord(producer)) +} finally producer.close() Review comment: Oh, right, it was suspicious that I did not see consumer thread on the failed .classMethod cases :) Thanks. testConsumerGroupServiceWithAuthenticationSuccess failed and left adminclient threads for later ones: > java.lang.AssertionError: Found unexpected threads during `@AfterClass`, allThreads=Set(Test worker, metrics-meter-tick-thread-2, main, Signal Dispatcher, Reference Handler, Finalizer, /0:0:0:0:0:0:0:1:55620 to /0:0:0:0:0:0:0:1:33045 workers Thread 2, kafka-admin-client-thread | adminclient-45, /0:0:0:0:0:0:0:1:55620 to /0:0:0:0:0:0:0:1:33045 workers Thread 3, metrics-meter-tick-thread-1), unexpected=Set(kafka-admin-client-thread | adminclient-45) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest
omkreddy commented on pull request #8992: URL: https://github.com/apache/kafka/pull/8992#issuecomment-656045969 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
cadonna commented on a change in pull request #8902: URL: https://github.com/apache/kafka/pull/8902#discussion_r452119998 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,24 +45,23 @@ protected ProcessorNode currentNode; private long currentSystemTimeMs; -final StateManager stateManager; Review comment: This is basically to avoid the cast from `StateManager` to `ProcessorStateManager` in `ProcessorContextImpl`. `ProcessorStateManager` has method `registeredChangelogPartitionFor()` that does only exist in the `ProessorStateManager`, because it is only used in `ProcessorContextImpl`. Hence, I moved the state manager field to the respective children and introduced the abstract method `stateManager()` in `AbstractProcessorContext`. Method `stateManager()` returns a `ProcessorStateManager` in `ProcessorContextImpl` but a `StateManager` in all other places. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik opened a new pull request #9001: KAFKA-10028: Implement KIP-584 write path
kowshik opened a new pull request #9001: URL: https://github.com/apache/kafka/pull/9001 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10028) Implement write path for feature versioning scheme
[ https://issues.apache.org/jira/browse/KAFKA-10028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam reassigned KAFKA-10028: Assignee: Kowshik Prakasam > Implement write path for feature versioning scheme > -- > > Key: KAFKA-10028 > URL: https://issues.apache.org/jira/browse/KAFKA-10028 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > > Goal is to implement various classes and integration for the write path of > the feature versioning system > ([KIP-584|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features]). > This is preceded by the read path implementation (KAFKA-10027). The write > path implementation involves developing the new controller API: > UpdateFeatures that enables transactional application of a set of > cluster-wide feature updates to the ZK {{'/features'}} node, along with > required ACL permissions. > > Details about the write path are explained [in this > part|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP-584:Versioningschemeforfeatures-ChangestoKafkaController] > of the KIP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] omkreddy merged pull request #3480: MINOR: Define the term tombstone, since it's used elsewhere in the docs
omkreddy merged pull request #3480: URL: https://github.com/apache/kafka/pull/3480 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #8878: MINOR: Generator config-specific HTML ids
mimaison commented on pull request #8878: URL: https://github.com/apache/kafka/pull/8878#issuecomment-656062630 @omkreddy @ijuma What's your opinion on this change? Currently a number of config links collide and this is addressing it. However this will also break existing links. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r452140714 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion -val mergedResponseMap = if (version == 0) +val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) -sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) +sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] -val partitionTimestamps = offsetRequest.partitionTimestamps.asScala -val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - -val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) -} - -val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { -val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) -(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { -// NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages -// are typically transient and there is no value in logging the entire stack trace for the same -case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderForPartitionException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( -correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) -case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } -} -responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { -val correlationId = request.header.correlationId -val clientId = request.header.clientId -val offsetRequest = request.body[ListOffsetRequest] - -val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) - -val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, -ListOffsetResponse.UNKNOWN_TIMESTAMP, -ListOffsetResponse.UNKNOWN_OFFSET, -Optional.empty()) -} - -val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { -debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + -s"failed because the partition is duplicated in the request.") -(topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { - -def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( -e, -ListOffsetResponse.UNKNOWN_TIMESTAMP, -
[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
cadonna commented on a change in pull request #8902: URL: https://github.com/apache/kafka/pull/8902#discussion_r452119998 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,24 +45,23 @@ protected ProcessorNode currentNode; private long currentSystemTimeMs; -final StateManager stateManager; Review comment: This is basically to avoid the explicit cast from `StateManager` to `ProcessorStateManager` in `ProcessorContextImpl`. `ProcessorStateManager` has method `registeredChangelogPartitionFor()` that does only exist in the `ProessorStateManager`, because it is only used in `ProcessorContextImpl`. Hence, I moved the state manager field to the respective children and introduced the abstract method `stateManager()` in `AbstractProcessorContext`. Method `stateManager()` returns a `ProcessorStateManager` in `ProcessorContextImpl` but a `StateManager` in all other places. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
cadonna commented on a change in pull request #8902: URL: https://github.com/apache/kafka/pull/8902#discussion_r452148874 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -103,8 +104,12 @@ public void init(final ProcessorContext context, @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { +final String storeName = name(); +final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); serdes = new StateSerdes<>( -ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), + changelogTopic != null ? +changelogTopic : + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName), Review comment: I think we still need the if-then-else. `ProcessorContextUtils.changelogFor()` checks that the processor context is of type `InternalProcessorContext` to be able to call `changelogFor()` on it. It does not consider the case where no changelog exists (i.e., `changelogFor()` returns `null`). The `null` check is done here. Fallback is in both cases `ProcessorStateManager.storeChangelogTopic()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10254) 100% cpu usage by kafkaConsumer poll , when broker can‘t be connect
[ https://issues.apache.org/jira/browse/KAFKA-10254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaotong.wang updated KAFKA-10254: -- Description: steps 1、start kafka broker 2、start kafka consumer and subscribe some topic with some kafkaConsumer instance and call kafkaConsumer.*poll(Duration.ofMillis(pollTimeout))* and set auto.commit.enabled=false 3、iptables to disable kafka broker ip in client vm or shutdown kafka brokers 4、cpu go to 100% *why?* left Vserison :2.3.1 right Version:2.5.0 for 2.3.1 kafkaConsumer when kafka brokers go down,updateAssignmentMetadataIfNeeded will block x ms and return empty records , !image-2020-07-09-19-24-20-604.png|width=993,height=176! for 2.5.0 private Map>> pollForFetches(Timer timer) { *long pollTimeout = coordinator == null ? timer.remainingMs() :* *Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());* i check the source of kafka client ,poll timeout will be change to 0 ms ,when heartbeat timeout ,so it will call poll without any block ,this will cause cpu go to 100% was: steps 1、start kafka broker 2、start kafka consumer and subscribe some topic with some kafkaConsumer instance and call kafkaConsumer.*poll(Duration.ofMillis(pollTimeout))* and set auto.commit.enabled=false 3、iptables to disable kafka broker ip in client vm or shutdown kafka brokers 4、cpu go to 100% *why?* left Vserison :2.3.1 right Version:2.5.0 for 2.3.1 kafkaConsumer when kafka brokers go down,updateAssignmentMetadataIfNeeded will block x ms and return empty records , !image-2020-07-09-19-24-20-604.png! for 2.5.0 private Map>> pollForFetches(Timer timer) { *long pollTimeout = coordinator == null ? timer.remainingMs() :* *Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());* i check the source of kafka client ,poll timeout will be change to 0 ms ,when heartbeat timeout ,so it will call poll without any block ,this will cause cpu go to 100% > 100% cpu usage by kafkaConsumer poll , when broker can‘t be connect > > > Key: KAFKA-10254 > URL: https://issues.apache.org/jira/browse/KAFKA-10254 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: xiaotong.wang >Priority: Critical > Attachments: image-2020-07-09-19-24-20-604.png > > > steps > 1、start kafka broker > 2、start kafka consumer and subscribe some topic with some kafkaConsumer > instance and call kafkaConsumer.*poll(Duration.ofMillis(pollTimeout))* > and set auto.commit.enabled=false > 3、iptables to disable kafka broker ip in client vm or shutdown kafka brokers > 4、cpu go to 100% > > *why?* > > > left Vserison :2.3.1 > right Version:2.5.0 > > for 2.3.1 kafkaConsumer when kafka brokers go > down,updateAssignmentMetadataIfNeeded will block x ms and return empty > records , > !image-2020-07-09-19-24-20-604.png|width=993,height=176! > > for 2.5.0 > private Map>> pollForFetches(Timer > timer) { > *long pollTimeout = coordinator == null ? timer.remainingMs() :* > *Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), > timer.remainingMs());* > i check the source of kafka client ,poll timeout will be change to 0 ms ,when > heartbeat timeout ,so it will call poll without any block ,this will cause > cpu go to 100% > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10254) 100% cpu usage by kafkaConsumer poll , when broker can‘t be connect
xiaotong.wang created KAFKA-10254: - Summary: 100% cpu usage by kafkaConsumer poll , when broker can‘t be connect Key: KAFKA-10254 URL: https://issues.apache.org/jira/browse/KAFKA-10254 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.5.0 Reporter: xiaotong.wang Attachments: image-2020-07-09-19-24-20-604.png steps 1、start kafka broker 2、start kafka consumer and subscribe some topic with some kafkaConsumer instance and call kafkaConsumer.*poll(Duration.ofMillis(pollTimeout))* and set auto.commit.enabled=false 3、iptables to disable kafka broker ip in client vm or shutdown kafka brokers 4、cpu go to 100% *why?* left Vserison :2.3.1 right Version:2.5.0 for 2.3.1 kafkaConsumer when kafka brokers go down,updateAssignmentMetadataIfNeeded will block x ms and return empty records , !image-2020-07-09-19-24-20-604.png! for 2.5.0 private Map>> pollForFetches(Timer timer) { *long pollTimeout = coordinator == null ? timer.remainingMs() :* *Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());* i check the source of kafka client ,poll timeout will be change to 0 ms ,when heartbeat timeout ,so it will call poll without any block ,this will cause cpu go to 100% -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10254) 100% cpu usage by kafkaConsumer poll , when broker can‘t be connect
[ https://issues.apache.org/jira/browse/KAFKA-10254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaotong.wang updated KAFKA-10254: -- Description: steps 1、start kafka broker 2、start kafka consumer and subscribe some topic with some kafkaConsumer instance and call kafkaConsumer.*poll(Duration.ofMillis(pollTimeout))* and set auto.commit.enabled=false 3、iptables to disable kafka broker ip in client vm or shutdown kafka brokers 4、cpu go to 100% *why?* left Vserison :2.3.1 right Version:2.5.0 for 2.3.1 kafkaConsumer when kafka brokers go down,updateAssignmentMetadataIfNeeded will block x ms and return empty records , !image-2020-07-09-19-24-20-604.png|width=926,height=164! for 2.5.0 private Map>> pollForFetches(Timer timer) { *long pollTimeout = coordinator == null ? timer.remainingMs() :* *Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());* i check the source of kafka client ,poll timeout will be change to 0 ms ,when heartbeat timeout ,so it will call poll without any block ,this will cause cpu go to 100% was: steps 1、start kafka broker 2、start kafka consumer and subscribe some topic with some kafkaConsumer instance and call kafkaConsumer.*poll(Duration.ofMillis(pollTimeout))* and set auto.commit.enabled=false 3、iptables to disable kafka broker ip in client vm or shutdown kafka brokers 4、cpu go to 100% *why?* left Vserison :2.3.1 right Version:2.5.0 for 2.3.1 kafkaConsumer when kafka brokers go down,updateAssignmentMetadataIfNeeded will block x ms and return empty records , !image-2020-07-09-19-24-20-604.png|width=993,height=176! for 2.5.0 private Map>> pollForFetches(Timer timer) { *long pollTimeout = coordinator == null ? timer.remainingMs() :* *Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());* i check the source of kafka client ,poll timeout will be change to 0 ms ,when heartbeat timeout ,so it will call poll without any block ,this will cause cpu go to 100% > 100% cpu usage by kafkaConsumer poll , when broker can‘t be connect > > > Key: KAFKA-10254 > URL: https://issues.apache.org/jira/browse/KAFKA-10254 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: xiaotong.wang >Priority: Critical > Attachments: image-2020-07-09-19-24-20-604.png > > > steps > 1、start kafka broker > 2、start kafka consumer and subscribe some topic with some kafkaConsumer > instance and call kafkaConsumer.*poll(Duration.ofMillis(pollTimeout))* > and set auto.commit.enabled=false > 3、iptables to disable kafka broker ip in client vm or shutdown kafka brokers > 4、cpu go to 100% > > *why?* > > > left Vserison :2.3.1 > right Version:2.5.0 > > for 2.3.1 kafkaConsumer when kafka brokers go > down,updateAssignmentMetadataIfNeeded will block x ms and return empty > records , > !image-2020-07-09-19-24-20-604.png|width=926,height=164! > > for 2.5.0 > private Map>> pollForFetches(Timer > timer) { > *long pollTimeout = coordinator == null ? timer.remainingMs() :* > *Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), > timer.remainingMs());* > i check the source of kafka client ,poll timeout will be change to 0 ms ,when > heartbeat timeout ,so it will call poll without any block ,this will cause > cpu go to 100% > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on a change in pull request #8977: KAFKA-10162; Quota Enhancements (KIP-599)
dajac commented on a change in pull request #8977: URL: https://github.com/apache/kafka/pull/8977#discussion_r452162828 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MetricConfig; + +import java.util.List; + +/** + * A {@link SampledStat} that mimics the behavior of a Token Bucket and is meant to + * be used in conjunction with a {@link Rate} and a {@link org.apache.kafka.common.metrics.Quota}. + * + * The {@link TokenBucket} considers each sample as the amount of credits spent during the sample's + * window while giving back credits based on the defined quota. + * + * At time T, it computes the total O as follow: + * - O(T) = max(0, O(T-1) - Q * (W(T) - W(T-1)) + S(T) + * Where: + * - Q is the defined Quota or 0 if undefined + * - W is the time of the sample or now if undefined + * - S is the value of the sample or 0 if undefined + * + * Example with 3 samples with a Quota = 2: + * - S1 at T+0s => 4 + * - S2 at T+2s => 2 + * - S3 at T+4s => 6 + * + * The total at T+6s is computed as follow: + * - T0 => Total at T+0s => S1 = 4 + * - T1 => Total at T+2s => max(0, T0 - Q * dT) + S2 = (4 - 2 * 2) + 2 = 2 + * - T2 => Total at T+4s => max(0, T1 - Q * dT) + S3 = (2 - 2 * 2) + 6 = 6 + * - T3 => Total at T+6s => max(0, T2 - Q * dT) = (6 - 2 * 2) = 2 + */ +public class TokenBucket extends SampledStat { + +private final TimeUnit unit; + +public TokenBucket() { +this(TimeUnit.SECONDS); +} + +public TokenBucket(TimeUnit unit) { +super(0); +this.unit = unit; +} + +@Override +protected void update(Sample sample, MetricConfig config, double value, long now) { +sample.value += value; +} + +@Override Review comment: My first implementation was doing what you described: backfilling past samples up to quota * sample length and putting the remaining in the last (or current) sample. You seems to use sample in singular quite often in your description so I wonder if you also meant to backfill all the past samples. Did you? It turned out that backfilling past samples is not as strait-forward as it seems because, as you pointed out in your other comment, we may have holes or no past samples at all. Let me take few examples to illustrate. Let's assume that we have the following settings: - samples = 6 - window = 1s - quota = 5 Example 1: Let's assume that we record 2 at T and 30 at T+5. When 2 is recorded, a first sample at T is created with the value 2. Then, we record 30 at T+5. As it is above the quota of the current sample, we want to backfill T, T+1, T+2, T+3, T+4 and put the remaining in T+5. This mean, that we have to create samples for T+1, T+2, T+3 and T+4 because they don't exist in memory. We would end up with the following samples in memory: T = 5, T+1 = 5, T+2 = 5, T+3 = 5, T+4 = 5, T+5 = 7. Example 2: Let's assume that we record 2 at T, 3 at T+3 and 30 at T+5. The mechanics is similar to the previous example in the difference that it requires to add new samples between existing past samples: T+1 and T+2; as well as adding sample T+4. We would end up with the following samples in memory: T = 5, T+1 = 5, T+2 = 5, T+3 = 5, T+4 = 5, T+5 = 11. With the current base implementation of the SampleStat, adding all the past samples, especially in between existing past samples is quite complex and possibly error-prone. The simplest would be to always allocate all the samples but this is something I would like to avoid. Following this observation, I started to look at the problem from a different angle. Instead of spreading value at record time, I thought that we could adjust them while combining the samples. The idea is to run a token bucket algorithm in the combine method based on all the non-expired samples. My implementation is basically a reversed Token Bucket, a token bucket which start at zero instead of stating at the maximum number of tokens.
[GitHub] [kafka] dajac commented on a change in pull request #8977: KAFKA-10162; Quota Enhancements (KIP-599)
dajac commented on a change in pull request #8977: URL: https://github.com/apache/kafka/pull/8977#discussion_r452162969 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ## @@ -97,7 +97,25 @@ public static RecordingLevel forName(String name) { public boolean shouldRecord(final int configId) { return configId == DEBUG.id || configId == this.id; } +} +public enum QuotaEnforcementType { +/** + * The quota is not enforced. + */ +NONE, + +/** + * The quota is enforced before updating the sensor. An update + * is rejected if the quota is already exhausted. Review comment: That is indeed much better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
cadonna commented on a change in pull request #8902: URL: https://github.com/apache/kafka/pull/8902#discussion_r452165061 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -73,22 +73,23 @@ private final Set globalNonPersistentStoresTopics = new HashSet<>(); private final OffsetCheckpoint checkpointFile; private final Map checkpointFileCache; +private final Map storeToChangelogTopic; public GlobalStateManagerImpl(final LogContext logContext, final ProcessorTopology topology, final Consumer globalConsumer, final StateDirectory stateDirectory, final StateRestoreListener stateRestoreListener, final StreamsConfig config) { +storeToChangelogTopic = topology.storeToChangelogTopic(); Review comment: I would still pass `ProcessorTopology` into the constructor because it might make the signature of the constructor more stable. I removed the field for the topology and now we store only `globalStateStores` and `storeChangelogTopics` in `GlobalStateManagerImpl`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #8977: KAFKA-10162; Quota Enhancements (KIP-599)
dajac commented on a change in pull request #8977: URL: https://github.com/apache/kafka/pull/8977#discussion_r452166182 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.metrics.MetricConfig; + +import java.util.List; + +/** + * A {@link SampledStat} that mimics the behavior of a Token Bucket and is meant to + * be used in conjunction with a {@link Rate} and a {@link org.apache.kafka.common.metrics.Quota}. + * + * The {@link TokenBucket} considers each sample as the amount of credits spent during the sample's + * window while giving back credits based on the defined quota. + * + * At time T, it computes the total O as follow: + * - O(T) = max(0, O(T-1) - Q * (W(T) - W(T-1)) + S(T) + * Where: + * - Q is the defined Quota or 0 if undefined + * - W is the time of the sample or now if undefined + * - S is the value of the sample or 0 if undefined + * + * Example with 3 samples with a Quota = 2: + * - S1 at T+0s => 4 + * - S2 at T+2s => 2 + * - S3 at T+4s => 6 + * + * The total at T+6s is computed as follow: + * - T0 => Total at T+0s => S1 = 4 + * - T1 => Total at T+2s => max(0, T0 - Q * dT) + S2 = (4 - 2 * 2) + 2 = 2 + * - T2 => Total at T+4s => max(0, T1 - Q * dT) + S3 = (2 - 2 * 2) + 6 = 6 + * - T3 => Total at T+6s => max(0, T2 - Q * dT) = (6 - 2 * 2) = 2 + */ +public class TokenBucket extends SampledStat { + +private final TimeUnit unit; + +public TokenBucket() { +this(TimeUnit.SECONDS); +} + +public TokenBucket(TimeUnit unit) { +super(0); +this.unit = unit; +} + +@Override +protected void update(Sample sample, MetricConfig config, double value, long now) { +sample.value += value; +} + +@Override +public double combine(List samples, MetricConfig config, long now) { +if (this.samples.isEmpty()) +return 0; + +final double quota = config.quota() != null ? config.quota().bound() : 0; +final int startIndex = (this.current + 1) % this.samples.size(); + +long lastWindowMs = Long.MAX_VALUE; +double total = 0.0; + +for (int i = 0; i < this.samples.size(); i++) { Review comment: 1. It does not cause a problem here. As you said, `purgeObsoleteSamples` sets the value to zero and the lastWindowMs to now when it resets a sample. When the past `lastWindowMs` is higher that the current sample `lastWindowMs` or `now`, it has no effect: `Math.max(0, nowMs - lastWindowMs)`. 2. Right. I take current sample's lastWindowMs - previous sample's lastWindowMs to take this into account. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
Luke Chen created KAFKA-10255: - Summary: Fix flaky testOneWayReplicationWithAutorOffsetSync1 test Key: KAFKA-10255 URL: https://issues.apache.org/jira/browse/KAFKA-10255 Project: Kafka Issue Type: Test Reporter: Luke Chen Assignee: Luke Chen https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0 org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > testOneWayReplicationWithAutorOffsetSync1 STARTED org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1 failed, log available in /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > testOneWayReplicationWithAutorOffsetSync1 FAILED java.lang.AssertionError: consumer record size is not zero expected:<0> but was:<2> at org.junit.Assert.fail(Assert.java:89) at org.junit.Assert.failNotEquals(Assert.java:835) at org.junit.Assert.assertEquals(Assert.java:647) at org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] omkreddy merged pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest
omkreddy merged pull request #8992: URL: https://github.com/apache/kafka/pull/8992 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
vvcephei commented on pull request #8902: URL: https://github.com/apache/kafka/pull/8902#issuecomment-656181031 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
vvcephei commented on pull request #8902: URL: https://github.com/apache/kafka/pull/8902#issuecomment-656181429 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #9002: MINOR: Add ApiMessageTypeGenerator
cmccabe opened a new pull request #9002: URL: https://github.com/apache/kafka/pull/9002 Previously, we had some code hard-coded to generate message type classes for RPCs. We might want to generate message type classes for other things as well, so make it more generic. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10256) Create a server gradle module for Java code needed only by servers
Colin McCabe created KAFKA-10256: Summary: Create a server gradle module for Java code needed only by servers Key: KAFKA-10256 URL: https://issues.apache.org/jira/browse/KAFKA-10256 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe It doesn't really make sense to have a "server" directory in the "clients" gradle module. The client is not the server. Instead, we should have a separate gradle module for code which is server-specific. This will avoid polluting the client CLASSPATH with code which is internal to the server, and make the functional division of the code clearer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] joel-hamill commented on pull request #8878: MINOR: Generator config-specific HTML ids
joel-hamill commented on pull request #8878: URL: https://github.com/apache/kafka/pull/8878#issuecomment-656205597 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10240) Sink tasks should not throw WakeupException on shutdown
[ https://issues.apache.org/jira/browse/KAFKA-10240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-10240: -- Description: * When a task is scheduled for shutdown, the framework [wakes up the consumer|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L159] for that task. * As is noted in the [Javadocs for that method|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2348], “If no thread is blocking in a method which can throw {{org.apache.kafka.common.errors.WakeupException}}, the next call to such a method will raise it instead.” * It just so happens that, if the framework isn’t in the middle of a call to the consumer and then the task gets stopped, the next call the framework will make on the consumer may be to commit offsets, which will immediately throw a {{WakeupException}}. * Currently, the framework handles this by [immediately retrying the offset commit|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L337-L339] until it either throws a different exception or succeeds, and then throwing the original {{WakeupException}}. If this synchronous commit of offsets occurs during task shutdown (as opposed to in response to a consumer rebalance), it's unnecessary to throw the {{WakeupException}} back to the caller, and can cause alarming {{ERROR}}-level messages to get logged by the worker. was: * When a task is scheduled for shutdown, the framework [wakes up the consumer|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L159] for that task. * As is noted in the [Javadocs for that method|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2348], “If no thread is blocking in a method which can throw {{org.apache.kafka.common.errors.WakeupException}}, the next call to such a method will raise it instead.” * It just so happens that, if the framework isn’t in the middle of a call to the consumer and then the task gets stopped, the next call the framework will make on the consumer may be to commit offsets, which will immediately throw a {{WakeupException}}. * Currently, the framework handles this by [immediately retrying the offset commit|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L337-L339] until it either throws a different exception or succeeds, and then throwing the original {{WakeupException}}. Since this synchronous commit of offsets only occurs during task shutdown, it's unnecessary to throw the {{WakeupException}} back to the caller, and can cause alarming {{ERROR}}-level messages to get logged by the worker. > Sink tasks should not throw WakeupException on shutdown > --- > > Key: KAFKA-10240 > URL: https://issues.apache.org/jira/browse/KAFKA-10240 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, > 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > * When a task is scheduled for shutdown, the framework [wakes up the > consumer|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L159] > for that task. > * As is noted in the [Javadocs for that > method|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2348], > “If no thread is blocking in a method which can throw > {{org.apache.kafka.common.errors.WakeupException}}, the next call to such a > method will raise it instead.” > * It just so happens that, if the framework isn’t in the middle of a call to > the consumer and then the task gets stopped, the next call the framework will > make on the consumer may be to commit offsets, which will immediately throw a > {{WakeupException}}. > * Currently, the framework handles this by [immediately retrying the offset > commit|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L337-L339] > until it either throws a
[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r452324069 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -965,11 +994,11 @@ public void onFailure(RuntimeException e) { * @return A response which can be polled to obtain the corresponding timestamps and offsets. */ private RequestFuture sendListOffsetRequest(final Node node, - final Map timestampsToSearch, + final Map timestampsToSearch, boolean requireTimestamp) { ListOffsetRequest.Builder builder = ListOffsetRequest.Builder .forConsumer(requireTimestamp, isolationLevel) -.setTargetTimes(timestampsToSearch); +.setTargetTimes(toListOffsetTopics(timestampsToSearch)); Review comment: I initially tried to do that but there's a couple of intermediate collections using `TopicPartition` in these methods and it makes it really hard to update them. For example: - https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L735 - https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L880 - https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L887 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8981: KAFKA-10235 Fix flaky transactions_test.py
junrao commented on pull request #8981: URL: https://github.com/apache/kafka/pull/8981#issuecomment-656214847 11 failures in system tests. http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-07-08--001.1594266883--chia7712--KAFKA-10235--a76224fff/report.html @chia7712 : Did this PR fix transaction tests as expected? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10240) Sink tasks should not throw WakeupException on shutdown
[ https://issues.apache.org/jira/browse/KAFKA-10240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154702#comment-17154702 ] Chris Egerton commented on KAFKA-10240: --- For some background: this was actually discussed years ago on [https://github.com/apache/kafka/pull/1511], but never implemented. > Sink tasks should not throw WakeupException on shutdown > --- > > Key: KAFKA-10240 > URL: https://issues.apache.org/jira/browse/KAFKA-10240 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, > 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > * When a task is scheduled for shutdown, the framework [wakes up the > consumer|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L159] > for that task. > * As is noted in the [Javadocs for that > method|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2348], > “If no thread is blocking in a method which can throw > {{org.apache.kafka.common.errors.WakeupException}}, the next call to such a > method will raise it instead.” > * It just so happens that, if the framework isn’t in the middle of a call to > the consumer and then the task gets stopped, the next call the framework will > make on the consumer may be to commit offsets, which will immediately throw a > {{WakeupException}}. > * Currently, the framework handles this by [immediately retrying the offset > commit|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L337-L339] > until it either throws a different exception or succeeds, and then throwing > the original {{WakeupException}}. If this synchronous commit of offsets > occurs during task shutdown (as opposed to in response to a consumer > rebalance), it's unnecessary to throw the {{WakeupException}} back to the > caller, and can cause alarming {{ERROR}}-level messages to get logged by the > worker. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10257) system test kafkatest.tests.core.security_rolling_upgrade_test fails
Jun Rao created KAFKA-10257: --- Summary: system test kafkatest.tests.core.security_rolling_upgrade_test fails Key: KAFKA-10257 URL: https://issues.apache.org/jira/browse/KAFKA-10257 Project: Kafka Issue Type: Bug Affects Versions: 2.7.0 Reporter: Jun Rao The test failure was reported in http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-07-08--001.1594266883--chia7712--KAFKA-10235--a76224fff/report.html Saw the following error in the log. {code:java} [2020-07-09 00:56:37,575] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.lang.IllegalArgumentException: Could not find a 'KafkaServer' or 'internal.KafkaServer' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133) at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98) at org.apache.kafka.common.security.JaasContext.loadServerContext(JaasContext.java:70) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:131) at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97) at kafka.network.Processor.(SocketServer.scala:780) at kafka.network.SocketServer.newProcessor(SocketServer.scala:406) at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:285) at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:284) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:251) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:248) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) at scala.collection.AbstractIterable.foreach(Iterable.scala:920) at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:248) at kafka.network.SocketServer.startup(SocketServer.scala:122) at kafka.server.KafkaServer.startup(KafkaServer.scala:297) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at kafka.Kafka$.main(Kafka.scala:82) at kafka.Kafka.main(Kafka.scala) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #8981: KAFKA-10235 Fix flaky transactions_test.py
chia7712 commented on pull request #8981: URL: https://github.com/apache/kafka/pull/8981#issuecomment-656223017 > Did this PR fix transaction tests as expected? Yep. core.transactions_test passes :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao merged pull request #8981: KAFKA-10235 Fix flaky transactions_test.py
junrao merged pull request #8981: URL: https://github.com/apache/kafka/pull/8981 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10235) Fix flaky transactions_test.py
[ https://issues.apache.org/jira/browse/KAFKA-10235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-10235. - Fix Version/s: 3.0.0 Resolution: Fixed merged the PR to trunk. > Fix flaky transactions_test.py > -- > > Key: KAFKA-10235 > URL: https://issues.apache.org/jira/browse/KAFKA-10235 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 3.0.0 > > > {code} > =hard_bounce.bounce_target=clients.check_order=False.use_group_metadata=False: > FAIL: copier-1 : Message copier didn't make enough progress in 30s. Current > progress: 0 > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", > line 134, in run > data = self.run_test() > File > "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", > line 192, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python2.7/dist-packages/ducktape/mark/_mark.py", line > 429, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line > 254, in test_transactions > num_messages_to_copy=self.num_seed_messages, > use_group_metadata=use_group_metadata) > File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line > 195, in copy_messages_transactionally > self.bounce_copiers(copiers, clean_shutdown) > File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line > 120, in bounce_copiers > % (copier.transactional_id, str(copier.progress_percent( > File "/usr/local/lib/python2.7/dist-packages/ducktape/utils/util.py", line > 41, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) > TimeoutError: copier-1 : Message copier didn't make enough progress in 30s. > Current progress: 0 > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set
ijuma commented on a change in pull request #8999: URL: https://github.com/apache/kafka/pull/8999#discussion_r452349346 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long now) { } /** - * Return the Set of nodes whose connection setup has timed out. + * Return the List of nodes whose connection setup has timed out. * @param now the current time in ms */ -public Set nodesWithConnectionSetupTimeout(long now) { +public List nodesWithConnectionSetupTimeout(long now) { return connectingNodes.stream() .filter(id -> isConnectionSetupTimeout(id, now)) -.collect(Collectors.toSet()); +.collect(Collectors.toCollection(LinkedList::new)); Review comment: Why are we using `LinkedList`? It's very rare where it should be used. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10254) 100% cpu usage by kafkaConsumer poll , when broker can‘t be connect
[ https://issues.apache.org/jira/browse/KAFKA-10254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154725#comment-17154725 ] Arvin Zheng commented on KAFKA-10254: - [~xiaotong.wang], see KAFKA-10134 > 100% cpu usage by kafkaConsumer poll , when broker can‘t be connect > > > Key: KAFKA-10254 > URL: https://issues.apache.org/jira/browse/KAFKA-10254 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: xiaotong.wang >Priority: Critical > Attachments: image-2020-07-09-19-24-20-604.png > > > steps > 1、start kafka broker > 2、start kafka consumer and subscribe some topic with some kafkaConsumer > instance and call kafkaConsumer.*poll(Duration.ofMillis(pollTimeout))* > and set auto.commit.enabled=false > 3、iptables to disable kafka broker ip in client vm or shutdown kafka brokers > 4、cpu go to 100% > > *why?* > > > left Vserison :2.3.1 > right Version:2.5.0 > > for 2.3.1 kafkaConsumer when kafka brokers go > down,updateAssignmentMetadataIfNeeded will block x ms and return empty > records , > !image-2020-07-09-19-24-20-604.png|width=926,height=164! > > for 2.5.0 > private Map>> pollForFetches(Timer > timer) { > *long pollTimeout = coordinator == null ? timer.remainingMs() :* > *Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), > timer.remainingMs());* > i check the source of kafka client ,poll timeout will be change to 0 ms ,when > heartbeat timeout ,so it will call poll without any block ,this will cause > cpu go to 100% > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
vvcephei commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452357020 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -215,7 +215,7 @@ public StateStore getGlobalStore(final String name) { } // package-private for test only -void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { +void initializeStoreOffsetsFromCheckpoint(final boolean taskDirIsEmpty) { Review comment: Was deceptively named. ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java ## @@ -566,8 +567,20 @@ public void shouldReviveCorruptTasks() { topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString()); expectLastCall().anyTimes(); +expect(consumer.assignment()).andReturn(taskId00Partitions); +consumer.pause(taskId00Partitions); +expectLastCall(); +final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L); + expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata)); +consumer.seek(t1p0, offsetAndMetadata); +expectLastCall(); +consumer.seekToBeginning(emptySet()); +expectLastCall(); replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader); - +taskManager.setPartitionResetter(tp -> { +assertThat(tp, is(empty())); +return emptySet(); +}); Review comment: This all amounts to checking that we really reset the consumer to the last committed position. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ## @@ -504,7 +504,7 @@ private void resetOffsetPosition(TopicPartition tp) { if (strategy == OffsetResetStrategy.EARLIEST) { offset = beginningOffsets.get(tp); if (offset == null) -throw new IllegalStateException("MockConsumer didn't have beginning offset specified, but tried to seek to beginning"); +throw new IllegalStateException("MockConsumer didn't have beginning offset for " + tp + " specified, but tried to seek to beginning"); Review comment: Just a quality-of-life improvement. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { log.debug("State store {} initialized from checkpoint with offset {} at changelog {}", store.stateStore.name(), store.offset, store.changelogPartition); } else { -// with EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file +// With EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file // and hence we are uncertain that the current local state only contains committed data; // in that case we need to treat it as a task-corrupted exception -if (eosEnabled && !storeDirIsEmpty) { + +// Note, this is a little overzealous, since we aren't checking whether the store's specific +// directory is nonempty, only if there are any directories for any stores. So if there are +// two stores in a task, and one is correctly written and checkpointed, while the other is +// neither written nor checkpointed, we _could_ correctly load the first and recover the second +// but instead we'll consider the whole task corrupted and discard the first and recover both. +if (store.stateStore.persistent() && eosEnabled && !taskDirIsEmpty) { Review comment: Bugfix: we shouldn't call this task corrupted for not having a checkpoint of a non-persistent store. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -462,7 +468,7 @@ public void flush() { */ @Override public void close() throws ProcessorStateException { -log.debug("Closing its state manager and all the registered state stores: {}", stores); +log.info("Closing its state manager and all the registered state stores: {}", stores); Review comment: ```suggestion log.debug("Closing its state manager and all the registered state stores: {}", stores); ``` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -648,7 +650,7 @@ void runOnce() { // only try to initialize the assigned tasks
[GitHub] [kafka] vvcephei commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
vvcephei commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452365162 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { log.debug("State store {} initialized from checkpoint with offset {} at changelog {}", store.stateStore.name(), store.offset, store.changelogPartition); } else { -// with EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file +// With EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file // and hence we are uncertain that the current local state only contains committed data; // in that case we need to treat it as a task-corrupted exception -if (eosEnabled && !storeDirIsEmpty) { + +// Note, this is a little overzealous, since we aren't checking whether the store's specific +// directory is nonempty, only if there are any directories for any stores. So if there are +// two stores in a task, and one is correctly written and checkpointed, while the other is +// neither written nor checkpointed, we _could_ correctly load the first and recover the second +// but instead we'll consider the whole task corrupted and discard the first and recover both. +if (store.stateStore.persistent() && eosEnabled && !taskDirIsEmpty) { Review comment: I'm not sure how this came up now for me. I don't think it was related to any changes in this PR, but the SuppressionDurabilityIntegrationTest fails every time without it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10131) Minimize use of --zookeeper flag in ducktape tests
[ https://issues.apache.org/jira/browse/KAFKA-10131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vinoth Chandar reassigned KAFKA-10131: -- Assignee: Ron Dagostino (was: Vinoth Chandar) > Minimize use of --zookeeper flag in ducktape tests > -- > > Key: KAFKA-10131 > URL: https://issues.apache.org/jira/browse/KAFKA-10131 > Project: Kafka > Issue Type: Improvement > Components: system tests >Reporter: Vinoth Chandar >Assignee: Ron Dagostino >Priority: Major > > Get the ducktape tests working without the --zookeeper flag (except for > scram). > (Note: When doing compat testing we'll still use the old flags.) > Below are the current usages > {code:java} > [tests]$ grep -R -e "--zookeeper" . > ./kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py:# Cannot > use --zookeeper because kafka-topics.sh is unable to connect to a TLS-enabled > ZooKeeper quorum, > ./kafkatest/tests/client/quota_test.py:cmd = "%s --zookeeper %s > --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d" % \ > ./kafkatest/services/console_consumer.py:cmd += " --zookeeper > %(zk_connect)s" % args > ./kafkatest/services/security/security_config.py:cmd = "%s > --zookeeper %s --entity-name %s --entity-type users --alter --add-config > %s=[password=%s]" % \ > ./kafkatest/services/zookeeper.py:la_migra_cmd += "%s > --zookeeper.acl=%s --zookeeper.connect=%s %s" % \ > ./kafkatest/services/zookeeper.py:cmd = "%s kafka.admin.ConfigCommand > --zookeeper %s %s --describe --topic %s" % \ > # Used by MessageFormatChangeTest, TruncationTest > ./kafkatest/services/kafka/kafka.py:cmd += "%s --zookeeper %s %s > --entity-name %s --entity-type topics --alter --add-config > message.format.version=%s" % \ > ./kafkatest/services/kafka/kafka.py:cmd += "%s --zookeeper %s %s > --entity-name %s --entity-type topics --alter --add-config > unclean.leader.election.enable=%s" % \ > # called by reassign_partitions.sh, ThrottlingTest, ReassignPartitionsTest > ./kafkatest/services/kafka/kafka.py:cmd += "--zookeeper %s " % > self.zk_connect_setting() > ./kafkatest/services/kafka/kafka.py:cmd += "--zookeeper %s " % > self.zk_connect_setting() > ./kafkatest/services/kafka/kafka.py:connection_setting = > "--zookeeper %s" % (self.zk_connect_setting()) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] omkreddy commented on pull request #8878: MINOR: Generator config-specific HTML ids
omkreddy commented on pull request #8878: URL: https://github.com/apache/kafka/pull/8878#issuecomment-656246737 I agree, it's a useful fix. I think its OK to break existing links. I have not seen anywhere using those links. Lets see what others think. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10174) Prefer --bootstrap-server ducktape tests using kafka_configs.sh
[ https://issues.apache.org/jira/browse/KAFKA-10174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154741#comment-17154741 ] Vinoth Chandar commented on KAFKA-10174: [https://github.com/apache/kafka/pull/8948] > Prefer --bootstrap-server ducktape tests using kafka_configs.sh > --- > > Key: KAFKA-10174 > URL: https://issues.apache.org/jira/browse/KAFKA-10174 > Project: Kafka > Issue Type: Sub-task >Reporter: Vinoth Chandar >Assignee: Vinoth Chandar >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] C0urante opened a new pull request #9003: KAFKA-10240: Stop throwing WakeupExceptions during sink task shutdown
C0urante opened a new pull request #9003: URL: https://github.com/apache/kafka/pull/9003 A benign `WakeupException` can be thrown by a sink task's consumer if it's scheduled for shutdown by the worker. This is caught and handled gracefully if the exception is thrown when calling `poll` on the consumer, but not if called `commitSync`, which is invoked by a task during shutdown and also when its partition assignment is updated. If thrown during a partition assignment update, the `WakeupException` is caught and handled gracefully as part of the task's `iteration` loop. If thrown during shutdown, however, it is not caught and instead leads to the scary-looking log message "Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted.". These changes catch the `WakeupException` during shutdown and handle it gracefully with a `TRACE`-level log message. A unit test is added to verify this behavior by simulating a thrown `WakeupException` during `Consumer::commitSync`, running through the `WorkerSinkTask::execute` method, and confirming that it does not throw a `WakeupException` itself. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #9003: KAFKA-10240: Stop throwing WakeupExceptions during sink task shutdown
C0urante commented on pull request #9003: URL: https://github.com/apache/kafka/pull/9003#issuecomment-656247517 @gharris1727 @chia7712 @ncliang would any of you be interested in reviewing this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10235) Fix flaky transactions_test.py
[ https://issues.apache.org/jira/browse/KAFKA-10235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-10235: Fix Version/s: (was: 3.0.0) 2.7.0 > Fix flaky transactions_test.py > -- > > Key: KAFKA-10235 > URL: https://issues.apache.org/jira/browse/KAFKA-10235 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 2.7.0 > > > {code} > =hard_bounce.bounce_target=clients.check_order=False.use_group_metadata=False: > FAIL: copier-1 : Message copier didn't make enough progress in 30s. Current > progress: 0 > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", > line 134, in run > data = self.run_test() > File > "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", > line 192, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python2.7/dist-packages/ducktape/mark/_mark.py", line > 429, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line > 254, in test_transactions > num_messages_to_copy=self.num_seed_messages, > use_group_metadata=use_group_metadata) > File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line > 195, in copy_messages_transactionally > self.bounce_copiers(copiers, clean_shutdown) > File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line > 120, in bounce_copiers > % (copier.transactional_id, str(copier.progress_percent( > File "/usr/local/lib/python2.7/dist-packages/ducktape/utils/util.py", line > 41, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) > TimeoutError: copier-1 : Message copier didn't make enough progress in 30s. > Current progress: 0 > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10213) Prefer --bootstrap-server in ducktape tests for Kafka clients
[ https://issues.apache.org/jira/browse/KAFKA-10213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vinoth Chandar reassigned KAFKA-10213: -- Assignee: Ron Dagostino (was: Vinoth Chandar) > Prefer --bootstrap-server in ducktape tests for Kafka clients > - > > Key: KAFKA-10213 > URL: https://issues.apache.org/jira/browse/KAFKA-10213 > Project: Kafka > Issue Type: Sub-task >Reporter: Vinoth Chandar >Assignee: Ron Dagostino >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] C0urante commented on pull request #8973: KAFKA-10218: Stop reading config topic in every subsequent tick if catchup fails once
C0urante commented on pull request #8973: URL: https://github.com/apache/kafka/pull/8973#issuecomment-656247489 @gharris1727 @chia7712 @ncliang would any of you be interested in reviewing this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)
vvcephei commented on pull request #8993: URL: https://github.com/apache/kafka/pull/8993#issuecomment-656247859 Looks like there was a spurious failure of the new test: ``` test_id: kafkatest.tests.streams.streams_application_upgrade_test.StreamsUpgradeTest.test_app_upgrade.to_version=2.5.0-SNAPSHOT.from_version=2.0.1.bounce_type=full status: FAIL run time: 1 minute 43.164 seconds Server connection dropped: Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py", line 134, in run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py", line 192, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/mark/_mark.py", line 429, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_application_upgrade_test.py", line 108, in test_app_upgrade self.restart_all_nodes_with(to_version) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_application_upgrade_test.py", line 173, in restart_all_nodes_with self.processor2.start_node(self.processor2.node) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/streams.py", line 305, in start_node node.account.create_file(self.CONFIG_FILE, prop_file) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/cluster/remoteaccount.py", line 588, in create_file with self.sftp_client.open(path, "w") as f: File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/paramiko-2.6.0-py2.7.egg/paramiko/sftp_client.py", line 372, in open t, msg = self._request(CMD_OPEN, filename, imode, attrblock) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/paramiko-2.6.0-py2.7.egg/paramiko/sftp_client.py", line 813, in _request return self._read_response(num) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/paramiko-2.6.0-py2.7.egg/paramiko/sftp_client.py", line 845, in _read_response raise SSHException("Server connection dropped: {}".format(e)) SSHException: Server connection dropped: ``` I'll re-run it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
ableegoldman commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r452370517 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ## @@ -104,17 +104,16 @@ static void closeStateManager(final Logger log, if (stateDirectory.lock(id)) { try { stateMgr.close(); - +} catch (final ProcessorStateException e) { +firstException.compareAndSet(null, e); +} finally { if (wipeStateStore) { log.debug("Wiping state stores for {} task {}", taskType, id); // we can just delete the whole dir of the task, including the state store images and the checkpoint files, // and then we write an empty checkpoint file indicating that the previous close is graceful and we just // need to re-bootstrap the restoration from the beginning Utils.delete(stateMgr.baseDir()); Review comment: Right, it's not a correctness issue but it's additional needless overhead to go through the whole cycle of initializing a task, getting a TaskCorrupted, wiping it then, and finally restarting it. Of course if we keep hitting an issue during `closeDirty` then we might never wipe the state, which does seem like a real problem. For example if there's some issue with the state, like the files are actually corrupted or something This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)
vvcephei commented on pull request #8993: URL: https://github.com/apache/kafka/pull/8993#issuecomment-656249347 Ah, and the other smoke test usages are messed up: ``` test_id: kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_rebalance_simple status: FAIL run time: 1 minute 28.121 seconds __init__() takes exactly 4 arguments (3 given) Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py", line 134, in run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py", line 192, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_eos_test.py", line 41, in test_rebalance_simple self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka), TypeError: __init__() takes exactly 4 arguments (3 given) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10258) Get rid of use_zk_connection flag in kafka.py public methods
Vinoth Chandar created KAFKA-10258: -- Summary: Get rid of use_zk_connection flag in kafka.py public methods Key: KAFKA-10258 URL: https://issues.apache.org/jira/browse/KAFKA-10258 Project: Kafka Issue Type: Sub-task Reporter: Vinoth Chandar -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
ableegoldman commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r452372357 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ## @@ -784,7 +784,7 @@ public void close() { } @Test -public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws IOException { +public void shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws IOException { final long checkpointOffset = 10L; final Map offsets = mkMap( Review comment: Sorry yeah the relevant part doesn't show up on github. Basically we register ``` stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback); stateMgr.registerStore(persistentStoreTwo, persistentStoreTwo.stateRestoreCallback); stateMgr.registerStore(nonPersistentStore, nonPersistentStore.stateRestoreCallback); ``` but only write the checkpoint for the `persistentStorePartition`, nonPersistentStorePartition` and `irrelevantPartition`. I think the point of the `irrelevantPartition` is to make sure that we detect that the `persistentStoreTwoPartition` offset is missing even though the checkpoint technically has the correct number of offsets in total. ie, that we actually map the offsets to a registered changelog This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vinothchandar commented on pull request #8948: KAFKA-10174: Prefer --bootstrap-server for configs command in ducker tests
vinothchandar commented on pull request #8948: URL: https://github.com/apache/kafka/pull/8948#issuecomment-656251801 @cmccabe this PR is a sub task under KAFKA-10131, which tracks the bigger goal. I filed a new issue explicitly targetting removal of `use_zk_connection` . For this pr, the public methods affects `alter_message_format`/`set_unclean_leader_election` don't offer this flag.. So this PR should be fine for this scope? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r452374925 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -994,30 +1023,29 @@ public void onSuccess(ClientResponse response, RequestFuture f * value of each partition may be null only for v0. In v1 and later the ListOffset API would not * return a null timestamp (-1 is returned instead when necessary). */ -private void handleListOffsetResponse(Map timestampsToSearch, +private void handleListOffsetResponse(Map timestampsToSearch, ListOffsetResponse listOffsetResponse, RequestFuture future) { Map fetchedOffsets = new HashMap<>(); Set partitionsToRetry = new HashSet<>(); Set unauthorizedTopics = new HashSet<>(); -for (Map.Entry entry : timestampsToSearch.entrySet()) { +Map partitionsData = byTopicPartitions(listOffsetResponse.responseData()); Review comment: We now added logic in the AdminClient to handle partial responses from brokers (based on https://github.com/apache/kafka/pull/8295#discussion_r449575550). Shouldn't we do the same here instead of assuming the response is always complete? I'm not even sure if we should retry if a resource is missing from the response but we could at least log it instead of hitting a NPE. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
ableegoldman commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452381474 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { log.debug("State store {} initialized from checkpoint with offset {} at changelog {}", store.stateStore.name(), store.offset, store.changelogPartition); } else { -// with EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file +// With EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file // and hence we are uncertain that the current local state only contains committed data; // in that case we need to treat it as a task-corrupted exception -if (eosEnabled && !storeDirIsEmpty) { + +// Note, this is a little overzealous, since we aren't checking whether the store's specific +// directory is nonempty, only if there are any directories for any stores. So if there are +// two stores in a task, and one is correctly written and checkpointed, while the other is +// neither written nor checkpointed, we _could_ correctly load the first and recover the second +// but instead we'll consider the whole task corrupted and discard the first and recover both. +if (store.stateStore.persistent() && eosEnabled && !taskDirIsEmpty) { Review comment: I like my fix better :P But seriously, no need to block this PR on mine if it's suddenly causing tests to fail. Mysterious.. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
mjsax commented on pull request #8996: URL: https://github.com/apache/kafka/pull/8996#issuecomment-656261242 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
ableegoldman commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r452372357 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ## @@ -784,7 +784,7 @@ public void close() { } @Test -public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws IOException { +public void shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws IOException { final long checkpointOffset = 10L; final Map offsets = mkMap( Review comment: Sorry yeah the relevant part doesn't show up on github. Basically we register ``` stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback); stateMgr.registerStore(persistentStoreTwo, persistentStoreTwo.stateRestoreCallback); stateMgr.registerStore(nonPersistentStore, nonPersistentStore.stateRestoreCallback); ``` but only write the checkpoint for the `persistentStorePartition`, nonPersistentStorePartition` and `irrelevantPartition` . I think the point of the `irrelevantPartition` is to make sure that we detect that the `persistentStoreTwoPartition` offset is missing even though the checkpoint technically has the correct number of offsets in total. ie, that we actually map the offsets to a registered changelog This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
mjsax commented on pull request #8996: URL: https://github.com/apache/kafka/pull/8996#issuecomment-656261358 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
ableegoldman commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r452372357 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ## @@ -784,7 +784,7 @@ public void close() { } @Test -public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws IOException { +public void shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws IOException { final long checkpointOffset = 10L; final Map offsets = mkMap( Review comment: Sorry yeah the relevant part doesn't show up on github. Basically we register ``` stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback); stateMgr.registerStore(persistentStoreTwo, persistentStoreTwo.stateRestoreCallback); stateMgr.registerStore(nonPersistentStore, nonPersistentStore.stateRestoreCallback); ``` but only write the checkpoint for the `persistentStorePartition`, `nonPersistentStorePartition` and `irrelevantPartition` . I think the point of the `irrelevantPartition` is to make sure that we detect that the `persistentStoreTwoPartition` offset is missing even though the checkpoint technically has the correct number of offsets in total. ie, that we actually map the offsets to a registered changelog This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
cadonna commented on pull request #8902: URL: https://github.com/apache/kafka/pull/8902#issuecomment-656263751 Unrelated test failure: ``` org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
ableegoldman commented on pull request #8902: URL: https://github.com/apache/kafka/pull/8902#issuecomment-656265143 Aw...the EosBetaUpgradeIntegrationTest failed again?? :/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
mjsax commented on a change in pull request #8902: URL: https://github.com/apache/kafka/pull/8902#discussion_r452391941 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,24 +45,23 @@ protected ProcessorNode currentNode; private long currentSystemTimeMs; -final StateManager stateManager; Review comment: I see. Would it not be sufficient to just keep a ("duplicate") reference of `ProcessorStateManager` within `ProcessorContextImpl`? Just to clarify: I am ok with the proposed changes. Just wondering if it's really the best structure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set
dajac commented on a change in pull request #8999: URL: https://github.com/apache/kafka/pull/8999#discussion_r452393587 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long now) { } /** - * Return the Set of nodes whose connection setup has timed out. + * Return the List of nodes whose connection setup has timed out. * @param now the current time in ms */ -public Set nodesWithConnectionSetupTimeout(long now) { +public List nodesWithConnectionSetupTimeout(long now) { return connectingNodes.stream() .filter(id -> isConnectionSetupTimeout(id, now)) -.collect(Collectors.toSet()); +.collect(Collectors.toCollection(LinkedList::new)); Review comment: We only add elements to the List and then iterate over it. A LinkedList seems slightly better for this case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
mjsax commented on a change in pull request #8902: URL: https://github.com/apache/kafka/pull/8902#discussion_r452394510 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -103,8 +104,12 @@ public void init(final ProcessorContext context, @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { +final String storeName = name(); +final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); serdes = new StateSerdes<>( -ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), + changelogTopic != null ? +changelogTopic : + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName), Review comment: Ah thanks. I missed this case. However, should we move both `null` checks into `ProcessorContextUtils.changelogFor()` for this case? It seem, we do the same "outer" `null`-check each time we call the method, so why not do it at a single place in the code? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set
ijuma commented on a change in pull request #8999: URL: https://github.com/apache/kafka/pull/8999#discussion_r452395964 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long now) { } /** - * Return the Set of nodes whose connection setup has timed out. + * Return the List of nodes whose connection setup has timed out. * @param now the current time in ms */ -public Set nodesWithConnectionSetupTimeout(long now) { +public List nodesWithConnectionSetupTimeout(long now) { return connectingNodes.stream() .filter(id -> isConnectionSetupTimeout(id, now)) -.collect(Collectors.toSet()); +.collect(Collectors.toCollection(LinkedList::new)); Review comment: Not at all. This is a perfect case for ArrayList. LinkedList iteration is very slow due to pointer chasing (in comparison). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set
ijuma commented on a change in pull request #8999: URL: https://github.com/apache/kafka/pull/8999#discussion_r452396471 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long now) { } /** - * Return the Set of nodes whose connection setup has timed out. + * Return the List of nodes whose connection setup has timed out. * @param now the current time in ms */ -public Set nodesWithConnectionSetupTimeout(long now) { +public List nodesWithConnectionSetupTimeout(long now) { return connectingNodes.stream() .filter(id -> isConnectionSetupTimeout(id, now)) -.collect(Collectors.toSet()); +.collect(Collectors.toCollection(LinkedList::new)); Review comment: FYI https://twitter.com/joshbloch/status/583813919019573248 :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set
dajac commented on a change in pull request #8999: URL: https://github.com/apache/kafka/pull/8999#discussion_r452400759 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long now) { } /** - * Return the Set of nodes whose connection setup has timed out. + * Return the List of nodes whose connection setup has timed out. * @param now the current time in ms */ -public Set nodesWithConnectionSetupTimeout(long now) { +public List nodesWithConnectionSetupTimeout(long now) { return connectingNodes.stream() .filter(id -> isConnectionSetupTimeout(id, now)) -.collect(Collectors.toSet()); +.collect(Collectors.toCollection(LinkedList::new)); Review comment: That’s good to know. I always thought that the iteration was more or less equivalent. I have learnt something today. Let me update that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-6453) Reconsider timestamp propagation semantics
[ https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-6453: -- Assignee: James Galasyn (was: Victoria Bialas) > Reconsider timestamp propagation semantics > -- > > Key: KAFKA-6453 > URL: https://issues.apache.org/jira/browse/KAFKA-6453 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: James Galasyn >Priority: Major > Labels: needs-kip > > Atm, Kafka Streams only has a defined "contract" about timestamp propagation > at the Processor API level: all processor within a sub-topology, see the > timestamp from the input topic record and this timestamp will be used for all > result record when writing them to an topic, too. > The DSL, inherits this "contract" atm. > From a DSL point of view, it would be desirable to provide a different > contract to the user. To allow this, we need to do the following: > - extend Processor API to allow manipulation timestamps (ie, a Processor can > set a new timestamp for downstream records) > - define a DSL "contract" for timestamp propagation for each DSL operator > - document the DSL "contract" > - implement the DSL "contract" using the new/extended Processor API -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-6453) Document timestamp propagation semantics
[ https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6453: --- Summary: Document timestamp propagation semantics (was: Reconsider timestamp propagation semantics) > Document timestamp propagation semantics > > > Key: KAFKA-6453 > URL: https://issues.apache.org/jira/browse/KAFKA-6453 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: James Galasyn >Priority: Major > Labels: needs-kip > > Atm, Kafka Streams only has a defined "contract" about timestamp propagation > at the Processor API level: all processor within a sub-topology, see the > timestamp from the input topic record and this timestamp will be used for all > result record when writing them to an topic, too. > The DSL, inherits this "contract" atm. > From a DSL point of view, it would be desirable to provide a different > contract to the user. To allow this, we need to do the following: > - extend Processor API to allow manipulation timestamps (ie, a Processor can > set a new timestamp for downstream records) > - define a DSL "contract" for timestamp propagation for each DSL operator > - document the DSL "contract" > - implement the DSL "contract" using the new/extended Processor API -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-6453) Document timestamp propagation semantics
[ https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6453: --- Description: Atm, Kafka Streams only has a defined "contract" about timestamp propagation at the Processor API level: all processor within a sub-topology, see the timestamp from the input topic record and this timestamp will be used for all result record when writing them to an topic, too. The DSL, inherits this "contract" atm. >From a DSL point of view, it would be desirable to provide a different >contract to the user. To allow this, we need to do the following: - extend Processor API to allow manipulation timestamps (ie, a Processor can set a new timestamp for downstream records) - define a DSL "contract" for timestamp propagation for each DSL operator - document the DSL "contract" - implement the DSL "contract" using the new/extended Processor API Changing the DSL contract etc was done via KIP-258. This ticket is about documenting the contract. was: Atm, Kafka Streams only has a defined "contract" about timestamp propagation at the Processor API level: all processor within a sub-topology, see the timestamp from the input topic record and this timestamp will be used for all result record when writing them to an topic, too. The DSL, inherits this "contract" atm. >From a DSL point of view, it would be desirable to provide a different >contract to the user. To allow this, we need to do the following: - extend Processor API to allow manipulation timestamps (ie, a Processor can set a new timestamp for downstream records) - define a DSL "contract" for timestamp propagation for each DSL operator - document the DSL "contract" - implement the DSL "contract" using the new/extended Processor API > Document timestamp propagation semantics > > > Key: KAFKA-6453 > URL: https://issues.apache.org/jira/browse/KAFKA-6453 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: James Galasyn >Priority: Major > Labels: needs-kip > > Atm, Kafka Streams only has a defined "contract" about timestamp propagation > at the Processor API level: all processor within a sub-topology, see the > timestamp from the input topic record and this timestamp will be used for all > result record when writing them to an topic, too. > The DSL, inherits this "contract" atm. > From a DSL point of view, it would be desirable to provide a different > contract to the user. To allow this, we need to do the following: > - extend Processor API to allow manipulation timestamps (ie, a Processor can > set a new timestamp for downstream records) > - define a DSL "contract" for timestamp propagation for each DSL operator > - document the DSL "contract" > - implement the DSL "contract" using the new/extended Processor API > Changing the DSL contract etc was done via KIP-258. This ticket is about > documenting the contract. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
mjsax commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452408268 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { log.debug("State store {} initialized from checkpoint with offset {} at changelog {}", store.stateStore.name(), store.offset, store.changelogPartition); } else { -// with EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file +// With EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file // and hence we are uncertain that the current local state only contains committed data; // in that case we need to treat it as a task-corrupted exception -if (eosEnabled && !storeDirIsEmpty) { + +// Note, this is a little overzealous, since we aren't checking whether the store's specific +// directory is nonempty, only if there are any directories for any stores. So if there are +// two stores in a task, and one is correctly written and checkpointed, while the other is +// neither written nor checkpointed, we _could_ correctly load the first and recover the second +// but instead we'll consider the whole task corrupted and discard the first and recover both. Review comment: Sound like something we should fix. Can you file a ticket? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
mjsax commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452410965 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -95,7 +96,7 @@ * | | Assigned (3)| <+ * | +-+---+ | * || | - * || | + * ||--+ Review comment: Where does this self-transition happen exactly? And could/should we detect this case and not call `setState()` for this case instead of allowing the transition? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10253) Kafka Connect gets into an infinite rebalance loop
[ https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154790#comment-17154790 ] Chris Egerton commented on KAFKA-10253: --- [~klalafaryan] can you provide the configurations for all three of your workers? It would also be helpful if you could restart one of the workers and capture the logs for it from startup through the first few iterations of this rebalance loop. > Kafka Connect gets into an infinite rebalance loop > -- > > Key: KAFKA-10253 > URL: https://issues.apache.org/jira/browse/KAFKA-10253 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Konstantin Lalafaryan >Priority: Blocker > > Hello everyone! > > We are running kafka-connect cluster (3 workers) and very often it gets into > an infinite rebalance loop. > > {code:java} > 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= > kafka-connect] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,733 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Was selected to perform assignments, but do not have latest > config found in sync request. Returning an empty configuration to trigger > re-sync. > (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Successfully joined group with generation 305655831 > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Joined group at generation 305655831 with protocol version 2 > and got assignment: Assignment{error=1, > leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', > leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], > taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= > kafka-connect] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,736 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Was selected to perform assignments, but do not have latest > config found in sync request. Returning an empty configuration to trigger > re-sync. > (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Successfully joined group with generation 305655832 > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Joined group at generation 305655832 with protocol version 2 > and got assignment: Assignment{error=1, > leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', > leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], > taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= > kafka-connect] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,740 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Was selected to perform assignments, but do not have latest > config found in sync request. Returning an empty configuration to trigger > re-sync. > (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Successfully joined gr
[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
cadonna commented on a change in pull request #8902: URL: https://github.com/apache/kafka/pull/8902#discussion_r452415850 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -103,8 +104,12 @@ public void init(final ProcessorContext context, @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { +final String storeName = name(); +final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); serdes = new StateSerdes<>( -ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), + changelogTopic != null ? +changelogTopic : + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName), Review comment: I would not put any code that is not related to casts of `ProcessorContext` into `ProessorContextUtils`. I think the goal of `ProessorContextUtils` is to contain all code of which we want to get rid of in the future once the casts are fixed. We could move the `null` check into the constructor of `StateSerde` since we do also there a `null` check. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org